Spark 算子

spark 的算子操作全都建立在 rdd (弹性分布式数据集) 的基础上,rdd 是一组操作的集合,其内并不包含任何数据,这很好地体现了大数据处理计算移动而数据不动的原则,提供了更高的执行速度。每个 rdd 都包含了以下五个内容

    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  • an HDFS file)

此外, spark 的算子包含两类:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)

Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。

默认情况下,每一个被转换过的 rdd 都会在遇到 action 算子时被重新计算,不过也可以使用 persist 或 cache 将 rdd 持久化,持久化之后 spark 会将此 rdd 的相关数据进行保存以方便下次访问。

就实际上说 rdd 中并不包含数据,但为了叙述方便,数据存储在 rdd 中

转换 (transformation) 算子

map

map 算子用于对 rdd 中的元素进行遍历操作,代码示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object demo1 {
def main(args: Array[String]): Unit = {
//创建spark配置对象

val conf=new SparkConf()
.setMaster("local")
.setAppName("Demo1")
//创建Spark上下文对象

val sc = new SparkContext(conf)
val list = List(1,2,3,4,5)
//将 scala list 序列化成 rdd

val list_rdd = sc.parallelize(list)
val rdd2 = list_rdd.map(x=>x*2)
rdd2.foreach(println)

}

}

flatMap 就是在 map 的基础上再进行数据扁平化。

filter

filter 算子用来对 rdd 中的数据进行过滤操作,当 filter 内的表达式为 true 时保留元素,为 false 时过滤元素,处理完成后返回一个新的 rdd,具体操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object demo2Filter {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setMaster("local")
.setAppName("Filter")

val sc = new SparkContext(conf)
//读取文件

val data = sc.textFile("data/students.txt")
//过滤数据

val data1 = data.filter(line => line.split(",")(3).equals("男"))
data1.foreach(println)

}
}

sample

sample ,顾名思义,就是对数据抽样的意思,sample算子总共包含 3 个参数,分别是

withReplacement,fraction,seed,其中 withReplacement 的作用是抽样时是否放回,当此参数为 true 时抽样会放回,也就是说会取到相同的元素,为 false 时抽样不放回,也就是说不会取到同一个元素。fraction 是抽取比例,要求的数据是小数。seed 是生成随机数的种子,抽取元素时需要利用随机数来决定抽取哪个元素,当随机数种子不一样时生成的随机数也会不一样从而取到的元素也就会不一样。此外,经本人的不完全测试,当取样模式为放回取样时,取得的数据量并不是严格符合抽取比例的,不放回抽样则符合抽取比例。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("demo3")

val sc = new SparkContext(conf)

val list = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

// val data = sc.textFile("data/students.txt")

val sample_data = list.sample(false,0.4,100)

sample_data.foreach(println)
}

groupByKey

groupByKey 用来进行聚合操作,具体操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("demo4GroupByKey")

val sc = new SparkContext(conf)
//读取的数据已被序列化,不需要再次序列化
val data = sc.textFile("data/students.txt")
//val kvRDD = sc.parallelize(data)
val kvRDD = data.map(line=>{
var split = line.split(",")
(split(3),1)
})
val groupByKeyRDD = kvRDD.groupByKey()

groupByKeyRDD.map(kv=>{
val gender = kv._1
//用以对集合中的元素求和

val num = kv._2.sum
(gender,num)
}).foreach(println)
}

reduceByKey

reduceByKey 和 groupByKey 相似,都是对 RDD 中的元素进行聚合,所不同的是, groupByKey 需要先对 RDD 进行转化将其变成 groupByKey 的形式然后再进行聚合操作,而 reduceByKey 则是在转化的同时进行聚合。具体示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object demo5reduceByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("demo5reduceByKey")

val sc = new SparkContext(conf)

val data = sc.textFile("data/students.txt")

val kvRDD = data.map(line=>{
var gender = line.split(",")(3)
(gender,1)
})

kvRDD.reduceByKey((x,y)=>x+y).foreach(println)

}
}

union

union 主要用来将两个 RDD 连接起来生成一个新的 RDD,新生成的 RDD 的分区数将会等于之前两个 RDD分区数之和。具体用法示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def main(args: Array[String]): Unit = {


val conf = new SparkConf()
.setMaster("local")
.setAppName("demo6")

val sc = new SparkContext(conf)

val list = List(1, 2, 3, 4, 5)
val list1 = List(6, 7, 8, 9, 10)
// 将列表序列化,并设置生成的 RDD的分区数

val list_rdd = sc.parallelize(list, 2)
val list1_rdd = sc.parallelize(list1)

val rdd = list_rdd.union(list1_rdd)

println("list_RDD:" + list_rdd.getNumPartitions)
println("list1_RDD:" + list1_rdd.getNumPartitions)
println("RDD:" + rdd.getNumPartitions)

}

join

join 用来将两个 kv 格式的 RDD 进行笛卡尔积,这和 sql 中的 join 作用是类似的,进行 join 后两个表中具有相同 key 的项将会被一一配对。具体使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setMaster("local")
.setAppName("demo7")

val sc = new SparkContext(conf)

val studentRDD = sc.textFile("data/students.txt")
val scoreRDD = sc.textFile("data/score.txt")

// 将两个 RDD 转换成 kv 格式

val studentKvRDD = studentRDD.map(line=>{
var id = line.split(",")(0)
(id,line)
})

val scoreKvRDD = scoreRDD.map(line=>{
var id = line.split(",")(0)
(id,line)
})
//将两个 RDD 进行聚合

val RDD = studentKvRDD.join(scoreKvRDD).map(line=>{
var id = line._1
var stu = line._2._1
var score = line._2._2

var stuSplit = stu.split(",")
var name = stuSplit(1)
var scoreSplit = score.split(",")
val s = scoreSplit(2)
(name,s.toInt)


})
//统计同一 key 下的分数总和

RDD.reduceByKey(_+_).foreach(println)

// val group = RDD.groupByKey()
// group.map(line=>{
// var name = line._1
// var s = line._2.sum
// (name,s)
// }).foreach(println(_))
// group.foreach(println(_))
}

mapValue

mapValue 用来对多次传入的 kv 格式数据进行处理,传入一行返回一行,用法示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def main(args: Array[String]): Unit = {

//1、创建spark上下文对象
val conf = new SparkConf()
.setMaster("local")
.setAppName("Demo4Map")

val sc = new SparkContext(conf)

val list = List(1, 2, 3, 4, 5, 6, 7, 8)

val rdd = sc.parallelize(list)

rdd
.map((_, 1))
.mapValues(v => v + 1) //对value进行操作 传入一行返回一行
.foreach(println)


}

sort

顾名思义,sort 就是用来对 RDD 的数据进行排序的,sort 有两个参数,第一个是要进行排序的 RDD ,第二个是进行排序的方式,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def main(args: Array[String]): Unit = {

//1、创建spark上下文对象
val conf = new SparkConf()
.setMaster("local")
.setAppName("Demo4Map")

val sc = new SparkContext(conf)

val list = List(1, 2, 3, 23, 4, 5, 5, 6, 5, 7, 8)

val rdd = sc.parallelize(list)

/**
* sortBy
* 第一个参数指定排序字段
* 第二个参数指定排序规则
*
*
*/
rdd
.map((_, 1))
.sortBy(t => t._1, ascending = false)
.foreach(println)

rdd
.map((_, 1))
.sortByKey(ascending = true) //通过key进行排序
.foreach(println)
}

persist

spark 中的 RDD 是一组计算关系,并不会存储数据,对于一些需要经常使用的 RDD 如果将 RDD 的计算结果持久化将会极大地提高效率。persist 就是这样的一种方式,persist 中的持久化有以下几种方式

1.MEMORY_ONLY

使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。

2.MEMORY_AND_DISK

使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。

3.MEMORY_ONLY_SER

基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

4.MEMORY_AND_DISK_SER

基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

5.DISK_ONLY

使用未序列化的Java对象格式,将数据全部写入磁盘文件中。

6.MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

对于以上持久化策略,优先推荐 MEMORY_ONLY,MEMORY_AND_DISK_SER 两种方式

具体示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var rdd = sc.textFile("data/words.txt")
var wordsRDD = rdd.flatMap(_.split(","))

/**
* rdd 默认不保存数
* 对多次使用的rdd 进行缓存
*
*/

//cahce 将数据缓存到内存 相当于 persist(StorageLevel.MEMORY_ONLY)
wordsRDD = wordsRDD.cache()

/**
* StorageLevel 指定持久化方式
*
* DISK_ONLY rdd 数据写入磁盘(Excutor所借节点的磁盘)
*
*
*/
wordsRDD = wordsRDD.persist(StorageLevel.DISK_ONLY)

checkPoint

对于数据持久化,不论是保存到内存还是磁盘均有数据丢失的风险,checkPoint 则通过将数据持久化到 hdfs 很好地解决了这个问题。在对当前 RDD 进行 checkPoint 时会导致该 RDD 的父类 RDD 被清空,官方推荐在进行 checkPoint 时先对当前 RDD 进行持久化。具体示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def main(args: Array[String]): Unit = {

//1、创建spark上下文对象
val conf = new SparkConf()
.setMaster("local")
.setAppName("Demo4Map")

val sc = new SparkContext(conf)

//设置checkpoint数据存储地址
sc.setCheckpointDir("data/checkpoint")

/**
* 转换算子 默认懒执行
*
*/

var rdd = sc.textFile("data/words.txt")


/**
* 为了避免重复计算,在checkpoint之前先对rdd进行cache
*
*/
rdd = rdd.cache()
/**
* 切断rdd之间的依赖关系
* 将数据写d到hdfs 数据不会丢失
*
* checkpoint 在spark streaming 用得较多
*/
rdd.checkpoint()

val tRDD = rdd.map((_, 1))

tRDD.reduceByKey(_ + _).foreach(println)


}
-----------本文结束感谢您的阅读-----------
0%