本文主要介绍两个内容,一是为什么要使用 Spark 广播变量,二是广播变量的用法。
为什么要使用广播变量
Spark 算子在执行时会将整个算子打包成 task 任务发送到指定 Executor 中执行,这里 task 任务的数量是由 RDD 的分区数决定的,1 个 RDD 对应 1 个 task。这种执行方法一般来说是没有问题的,但如果存在本地变量时,该变量也会被封装发送出去,特别是当 RDD 分区过多时,1 个 executor 内可能会存在多个 task ,也就是说同一个 executor 内会存在多个相同变量,这样会造成 shuffle 时间过长以及严的资源浪费。而使用广播变量时,代码在执行时只会封装一个变量的引用而不是变量本身,当 task 需要使用该变量时会向 executor 请求该变量,此时 executor 再向 driver 端请求然后拉取变量。
由此可以看出在使用大变量时广播变量的优越性。
广播变量的用法
广播变量的使用十分简单,首先在算子代码块外部对代码进行广播,然后在算子内进行使用即可。需要注意的是,网络传输的数据必须要实现 Serializable 接口,为实现该接口的数据是无法进行网络传输的,比如说网络连接对象就无法进行网络传输。
广播变量的具体示例如下:
1 | def main(args: Array[String]): Unit = { |
此外,还可以使用广播变量将小表广播出去,进行 mapjoin 从而避免 shuffle,代码示例如下:
1 | def main(args: Array[String]): Unit = { |