Spark 计算过程解读

引言

Spark 是一个分布式的内存计算框架,其特点是能处理大规模数据,计算速度快,Spark 延续了 Hadoop 的 MapReduce 计算模型,相比之下 Spark 的计算过程保持在内存中,减少了硬盘读写,能够将多个操作进行合并计算,因此提升了计算速度,同时 Spark 也提供了更丰富的计算 API。

MapReduce 是 Hadoop 和 Spark 的计算模型,其特点是 Map 和 Reduce 过程的高度可并行化;过程间耦合度低,单个过程的失败后可以重新计算,而不会导致整体失败;最重要的是数据处理中的计算逻辑可以很好的转换为 Map 和 Reduce 操作。对于一个数据集来说,Map 对每条数据做相同的转换操作,Reduce 可以按条件对数据分组,然后在分组上做操作。除了 Map 和 Reduce 操作之外,Spark 还延伸出了如 filter,flatMap,count,distinct 等更丰富的操作。

RDD 是 Spark 中最基本也是最重要的数据结构,可以直观的认为 RDD 就是要处理的数据集。RDD 是分布式的数据集,每个 RDD 都支持 MapReduce 类操作,经过 MapReduce 操作后会产生新的 RDD,而不会修改原有 RDD。RDD 的数据集是分区的,因此可以把每个数据分区放到不同的分区上进行计算,而实际上大多数 MapReduce 操作都是在分区上进行计算的。Spark 不会把每一个 MapReduce 操作都发起计算,而是尽量的把操作累计起来一起计算。Spark 把操作划分为转换和动作,对 RDD 进行的转换操作会叠加起来,直到对 RDD 进行动作操作时才会发起计算。这种特性使得 Spark 可以减少中间结果的吞吐,可以快速的进行多次迭代计算。

和 MapReduce 相比

MapReduce 比较坑的地方:

  1. 仅支持 Map 和 Reduce 操作。
  2. 处理效率低,Map 中间结果要写磁盘,Reduce 写 HDFS,多个 MR 之间通过 HDFS 交互数据,任务调度和启动的开销大,无法充分利用内存,Map 端和 Reduce 端均需要排序。
  3. 不适合迭代计算(如机器学习,图计算等),交互式处理(数据挖掘)和流式处理(点击日志分析)。

Spark 的特点:

  1. 高效(比 MapReduce 快 10 ~ 100 倍)。
  2. 内存计算引擎,提供 Cache 机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的 IO 开销。
  3. DAG 引擎,减少多次计算之间中间结果写到 HDFS 上的开销。
  4. 使用多线程池模型来减少 task 启动开销,shuffle 过程中避免不必要的 sort 操作以及减少磁盘 IO 操作。

系统结构

Spark 自身只对计算负责,其计算资源的管理和调度由第三方框架来实现。常用的有 Yarn 和 Mesos。Spark on Yarn 的系统结构图如下:

图中共分为三大部分:Spark Driver,Worker,Cluster manager。其中 Driver program 负责将 RDD 转换为任务,并进行任务调度。Worker 负责任务的执行。Yarn 负责计算资源的维护和分配。Driver 可以运行在用户程序中,或者运行在其中一个 Worker 上。Spark 中的每一个应用对应着一个 Driver。这个 Driver 可以接收 RDD 上的计算请求,每个动作(action)类型的操作被作为一个 Job 进行计算。Spark 会根据 RDD 的依赖关系构建计算阶段(Stage)的有向无环图,每个阶段有与分区数相同的任务(Task)。这些任务将在每个分区(Partition)上进行计算,任务划分完成后 Driver 将任务提交到运行于 Worker 上的 Executor 中进行计算,并对任务的成功、失败进行记录和重启等处理。

Worker 一般对应一台物理机,每个 Worker 上可以运行多个 Executor,每个 Executor 都是独立的 JVM 进程,Driver 提交的任务就是以线程的形式运行在 Executor 中的。如果使用 YARN 作为资源调度框架的话,其中一个 Worker 上还会有 Executor launcher 作为 Yarn 的 Application Master,用于向 Yarn 申请计算资源,并启动、监测、重启 Executor。

计算过程

以 RDD 到输出结果的整个计算过程为主线,探究 Spark 的计算过程。这个计算过程可以分为:

  1. RDD 构建:构建 RDD 之间的依赖关系,将 RDD 转换为阶段的有向无环图。
  2. 任务调度:根据空闲计算资源情况进行任务提交,并对任务的运行状态进行监测和处理。
  3. 任务计算:搭建任务运行环境,执行任务并返回任务结果。
  4. Shuffle 过程:两个阶段之间有宽依赖时,需要进行 Shuffle 操作。
  5. 计算结果收集:从每个任务收集并汇总结果。

RDD 构建和转换

RDD 按照其作用可以分为两种类型,一种是对数据源的封装,可以把数据源转换为 RDD,这种类型的 RDD 包括 NewHadoopRDD,ParallelCollectionRDD,JdbcRDD 等。另一种是对 RDD 的转换,从而实现一种计算方法,这种类型的 RDD 包括 MapperRDD,ShuffledRDD,FilteredRDD等。数据源类型的 RDD 不依赖于其他 RDD,计算类的 RDD 拥有自己 RDD 依赖。

RDD 有三个要素:分区,依赖关系,计算逻辑。分区是保证 RDD 分布式的特性,分区可以对 RDD 的数据进行划分,划分后的分区可以分布到不同的 Executor 中,大部分对 RDD 的计算都是在分区上进行的。依赖关系维护着 RDD 的计算过程,每个计算类型的 RDD 在计算时,会将所依赖的 RDD 作为数据源进行计算。根据一个分区的输出是否被多分区使用,Spark 将依赖分为窄依赖和宽依赖。RDD 的计算逻辑是其功能的体现,其计算过程是所依赖的 RDD 为数据源进行的。

RDD 的依赖关系

Spark 在遇到动作类操作时,就会发起计算 Job,把 RDD 转换为任务,并发送任务到 Executor 上执行。从 RDD 到任务的转换过程是在 DAGScheduler 中进行的。其总体思路是根据 RDD 的依赖关系,把窄依赖合并到一个阶段中,遇到宽依赖则划分出新的阶段,最终形成一个阶段的有向无环图,并根据图的依赖关系先后提交阶段。每个阶段按照分区数量划分为多个任务,最终任务被序列化并提交到 Executor 上执行。

RDD 到 Task 的构建过程

当 RDD 的动作类操作被调用时,RDD 将调用 SparkContext 开始提交 Job,SparkContext 将调用 DAGScheduler 将 RDD 转换为阶段的有向无环图,然后首先将有向无环图中没有未完成的依赖的阶段进行提交。在阶段被提交时,每个阶段将产生与分区数量相同的任务,这些任务称之为 TaskSet。任务的类型分为 ShuffleMapTask 和 ResultTask,如果阶段的输出将用于下个阶段的输入,也就是需要进行 Shuffle 操作,则任务类型为 ShuffleMapTask。如果阶段的输入即为 Job 结果,则任务类型为 ResultTask。任务创建完成后会交给 TaskSchedulerImpl 进行 TaskSet 级别的调度执行。

Spark的任务调度

Spark 中的任务调度实际上分了三个层次。第一层次是基于阶段的有向无环图进行 Stage 的调度,第二层次是根据调度策略(FIFO,FAIR)进行 TaskSet 调度,第三层次是根据数据本地性(Process,Node,Rack)在 TaskSet
内进行调度。

任务计算

任务的计算过程是在 Executor 上完成的,Executor 监听来自SchedulerBackend 的指令,接收到任务时会启动 TaskRunner 线程进行任务执行。在 TaskRunner 中首先将任务和相关信息反序列化,然后根据相关信息获取任务所依赖的 Jar 包和所需文件,完成准备工作后执行任务的 run 方法,实际上就是执行 ShuffleMapTask 或 ResultTask 的 run 方法。任务执行完毕后将结果发送给 Driver 进行处理。

在 Task.run 方法中可以看到 ShuffleMapTask 和 ResultTask 有着不同的计算逻辑。ShuffleMapTask 是将所依赖 RDD 的输出写入到 ShuffleWriter 中,为后面的 Shuffle 过程做准备。ResultTask 是在所依赖 RDD 上应用一个函数,并返回函数的计算结果。在这两个 Task 中只能看到数据的输出方式,而看不到应有的计算逻辑。实际上计算过程是包含在 RDD 中的,调用 RDD.Iterator 方法获取 RDD 的数据将触发这个 RDD 的计算动作(RDD.Iterator),由于此 RDD 的计算过程中也会使用所依赖 RDD 的数据。从而 RDD 的计算过程将递归向上直到一个数据源类型的 RDD,再递归向下计算每个 RDD 的值。需要注意的是,以上的计算过程都是在分区上进行的,而不是整个数据集,计算完成得到的是此分区上的结果,而不是最终结果。

从 RDD 的计算过程可以看出,RDD 的计算过程是包含在 RDD 的依赖关系中的,只要 RDD 之间是连续窄依赖,那么多个计算过程就可以在同一个 Task 中进行计算,中间结果可以立即被下个操作使用,而无需在进程间、节点间、磁盘上进行交换。

Shuffle 过程

Shuffle 是一个对数据进行分组聚合的操作过程,原数据将按照规则进行分组,然后使用一个聚合函数应用于分组上,从而产生新数据。Shuffle 操作的目的是把同组数据分配到相同分区上,从而能够在分区上进行聚合计算。为了提高 Shuffle 性能,还可以先在原分区对数据进行聚合(mapSideCombine),然后再分配部分聚合的数据到新分区,第三步在新分区上再次进行聚合。

在划分阶段时,只有遇到宽依赖才会产生新阶段,才需要 Shuffle 操作。宽依赖与窄依赖取决于原分区被新分区的使用关系,只要一个原分区会被多个新分区使用,则为宽依赖,需要 Shuffle。否则为窄依赖,不需要 Shuffle。

以上也就是说只有阶段与阶段之间需要 Shuffle,最后一个阶段会输出结果,因此不需要 Shuffle。Shuffle 是通过 Map 阶段的 ShuffleMapTask 与 Reduce 阶段的 ShuffledRDD 配合完成的。其中 ShuffleMapTask 会把任务的计算结果写入 ShuffleWriter,ShuffledRDD 从 ShuffleReader 中读取数据,Shuffle 过程会在写入和读取过程中完成。以 HashShuffle 为例,HashShuffleWriter 在写入数据时,会决定是否在原分区做聚合,然后根据数据的Hash 值写入相应新分区。HashShuffleReader 再根据分区号取出相应数据,然后对数据进行聚合。

计算结果收集

ResultTask 任务计算完成后可以得到每个分区的计算结果,此时需要在 Driver上对结果进行汇总从而得到最终结果。

RDD 在执行 collect,count 等动作时,会给出两个函数,一个函数在分区上执行,一个函数在分区结果集上执行。例如 collect 动作在分区上(Executor中)执行将 Iterator 转换为 Array 的函数,并将此函数结果返回到 Driver。Driver 从多个分区上得到 Array 类型的分区结果集,然后在结果集上(Driver中)执行合并 Array 的操作,从而得到最终结果。