Spark 广播变量

本文主要介绍两个内容,一是为什么要使用 Spark 广播变量,二是广播变量的用法。

为什么要使用广播变量

Spark 算子在执行时会将整个算子打包成 task 任务发送到指定 Executor 中执行,这里 task 任务的数量是由 RDD 的分区数决定的,1 个 RDD 对应 1 个 task。这种执行方法一般来说是没有问题的,但如果存在本地变量时,该变量也会被封装发送出去,特别是当 RDD 分区过多时,1 个 executor 内可能会存在多个 task ,也就是说同一个 executor 内会存在多个相同变量,这样会造成 shuffle 时间过长以及严的资源浪费。而使用广播变量时,代码在执行时只会封装一个变量的引用而不是变量本身,当 task 需要使用该变量时会向 executor 请求该变量,此时 executor 再向 driver 端请求然后拉取变量。

由此可以看出在使用大变量时广播变量的优越性。

广播变量的用法

广播变量的使用十分简单,首先在算子代码块外部对代码进行广播,然后在算子内进行使用即可。需要注意的是,网络传输的数据必须要实现 Serializable 接口,为实现该接口的数据是无法进行网络传输的,比如说网络连接对象就无法进行网络传输。

广播变量的具体示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {

val conf= new SparkConf()
.setMaster("local")

val sc = new SparkContext(conf)

val studentRDD = sc.textFile("data/student.txt")

val list = List(1,2,3,4,5,6,7)
//将变量进行广播

val bro = sc.broadcast(list)

studentRDD.map(line=>{
//使用变量,.value 用于将广播变量的值取出

val list = bro.value
})

}

此外,还可以使用广播变量将小表广播出去,进行 mapjoin 从而避免 shuffle,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def main(args: Array[String]): Unit = {

val conf= new SparkConf()
.setMaster("local")

val sc = new SparkContext(conf)

val studentRDD = sc.textFile("data/student.txt")

val courseRDD = sc.textFile("data/course.txt")

val broCourse = courseRDD
.map(line=>{
val id = line.split(",")(0)
(id,line)
})
//注意 driver 内存,防止 oom

.collectAsMap()

//广播小表
val bro = sc.broadcast(broCourse)
//对大表进行map
studentRDD.map(line=>{
val course = bro.value
val id = line.split(",")(0)
line+course(id)


})

}
-----------本文结束感谢您的阅读-----------
0%