Spark shuffle

shuffle 指的是数据从 map task 输出到 reduce task 输入的这段过程,shuffle 在 Spark 中主要发生在两个 stage 之间的阶段。 shuffle 是连接 Map 和 Reduce 之间的桥梁,Map 的输出要用到 Reduce 必须要经过 shuffle 这个阶段,shuffle 的性能高低直接影响了整个程序的性能和吞吐量。

在 Spark 中负责 shuffle 过程的执行、计算和处理的组建主要就是 ShuffleManager,也即 shuffle 管理器。ShuffleManager 随着 Spark 的发展有两种实现的方式,分别是 HashShuffleManager 和 SortShuffleManager 。因此, Spark 的 shuffle 方式也就有 HashShuffle 和 SortShuffle 两种方式。

SparkShuffle 的方式变迁如下所示:

  • Spark 0.8及以前 Hash Based Shuffle

  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

  • Spark 0.9 引入ExternalAppendOnlyMap

  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

  • Spark 1.4 引入Tungsten-Sort Based Shuffle

  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle

  • Spark 2.0 Hash Based Shuffle退出历史舞台

HashShuffle

HashShuffleManager 的运行机制主要分为两种,一种是普通运行机制,另一种是合并运行机制。

合并运行机制会将 shuffle 过程中产生的小文件进行合并来减少小文件的数量,需要注意的是 HashShuffle 不具有排序功能。

普通机制的 HashShuffle

5d2c4c6966f3066952

1.shuffle write阶段

主要就是在一个 stage 结束之后,为了下一个 stage 可以执行 shuffle 类的算子,而将每个 task 处理的数据按 key 进行分区,所谓分区就是按照分区数对 key 进行 hash 从而将相同的 key 写入到同一个磁盘文件中,而每一个磁盘文件都将对应 reduce 端的一个 task ,这里的 task 和分区数是等效的,因为 Spark 在进行任务调度时会为每个分区分发一个 task。在写入磁盘之前会先写入缓冲区,等到缓冲区写满之后再溢写到磁盘中去。

在 shuffle write 阶段,map 端的每个 task 都要创建 reduce 端 task 数量的磁盘文件,也就是说 map 端总计要创建 m*n 个磁盘文件,m 是 map 端 task 数量,n 是 reduce 端 task 数量。由此可见,未经优化的 shuffle write 所产生的磁盘文件数量是惊人的。

2.shuffle read 阶段

shuffle read,通常就是一个 stage 刚开始要做的事情。在上一个 stage 中shuffle write 为下一个 stage 中的每一个 task 都创建了一个磁盘文件,此时该 stage 的每个 task 都需要将上一个 stage 的计算结果中属于自己的那一个磁盘文件拉取过来,并进行聚合。

shuffle read 是一边拉取一边聚合的,拉取时是分批拉取的,先拉取到 buffer 缓冲区,缓冲区满之后进行聚合然后在拉取下一批数据。buffer 默认的大小是 32k ,每个 task 创建一个 buffer。

hash shuffle 普通机制存在以下问题:

1)会产生大量小文件,拉取数据时会产生大量 Io,影响性能。

2)拉取数据时大量的 IO 操作可能会导致 OOM

总的来说都是因为产生的小文件太多

合并机制的 hash shuffle

合并机制就是复用文件对象,开启合并机制的配置是 spark.shuffle.consolidateFiles。 该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffle ,那么建议打开这个配置。

5d2c78aa9eafe52715

开启 consolidate 机制之后,在shuffle write 过程中,task 就不是为下游每个 task 创建一个文件了。此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 对应一批磁盘文件,磁盘文件的数量和下游 stage 的 task 数量是相等的。每个 Executor 中同时只能并行执行 Core 数量个 task ,假设 Executor 的 Core 数量为 1,则每次只能并行执行 1 个 task,启用 consolidate 后,第一批执行的 task 会创建一个 shuffleFileGroup ,也就是对应着下一个 stage 的 task 数量的的文件,当第一批 task 执行完成后,下一批 task 仍会沿用之前的 shuffleFileGroup,并不会产生新的文件。也就是说每个 executor 从始至终产生的文件数是不变的,等于 C*T,C 为 executor 中 Core 的数量,T 为下一阶段 task 的数量。

相对来说,合并机制下的 HashShuffle 产生的文件数量会比普通机制下的 HashShuffle 少得多,但合并机制也会有产生大量文件的情况,比如说单个 executor 中的 core 数量太多,或者数据分区太多(task 太多),都会使 C*T 的值增大。

SortShuffle 的运行机制主要分为两种,一种是普通运行机制,一种是 bypass 运行机制。 当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为200),就会启动 bypass 机制。

SortShuffle 普通机制

5d2c87de9902775960

上图说明了普通 Sortshuffle 的原理。在该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的 shuffle 算子,可能会选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存,接着,每写一条数据后就会判断是否已经到了某个阈值,如果到了就尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

注意:

shuffle 中的定时器:定时器会定时检查内存数据结构的大小,如果内存数据结构空间不够,就会申请额外的内存。申请的大小满足如下公式:

申请的内存 = 当前使用内存*2 - 当前内存结构内存

也就是说数据结构的内存是动态变化的,当内存数据结构的内存不够时会申请内存数据结构存储的数据大小 2 - 内存数据结构大小* 的内存,申请到了内存数据结构就扩容,申请不到就溢写到磁盘

在溢写到磁盘之前,会先根据 key 对内存数据结构中的数据进行排序

排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会议每批 10000 条数据的形式分批写入磁盘文件。写入时会利用 buffer 缓冲流,减少 IO 次数,提高性能。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后,会将之前的所有临时磁盘文件进行合并,这就是 merge 的过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件中,也就是说这个文件内存放了下一个 stage 所有 task 所需的数据,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据文件中的 start offset 和 end offset。

SortShuffle 由于最后有一个 merge 的过程,因此小文件的数量大大减少,相较于之前一个 task 产生多个文件,SortShuffle 只会产生两个文件(一个数据文件,一个索引文件)。

SortShuffle 的 byPass 机制

5d2c9b269eec598477

bypass 运行机制的触发条件如下:

1)shuffle read task 数量小于 spark.shuffle.sort.bypassMergeThreshold参数的值。

2)不是聚合类的 shuffle 算子(比如 reduceByKey)

此时 task 会为每个 reduce 端的 task 都创建一个 临时磁盘文件,,并将数据按 key 进行 hash 写入相应的文件中。当然,写入磁盘时也是使用的缓冲流,缓冲区满了之后再溢写到磁盘中,最后将小文件合并成一个文件,并创建一个索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffle 是一摸一样的,只是会在最后进行文件合并并创建一个索引文件。两者的区别也就在于磁盘文件的多少,但也正是这点差别让 SortShuffle 的 read 性能更好。

该机制和普通 Sort Shuffle 的区别在于:

1)磁盘写机制不同

普通机制下是将数据在内存中排序后分批(10000条)写到磁盘,而 bypass 会根据 hash 将数据写入到不同文件

2)排序

bypass 机制下不会进行排序,在 shuffle write 过程中,不需要进行数据的排序操作,也就省掉了这部分的性能开销。

总结:

Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。

shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle,

HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

-----------本文结束感谢您的阅读-----------
0%