Spark实践 (2): Spark 内核

Spark 核心数据结构 RDD

RDD 全称是“弹性分布式数据集”。首先,它是一个数据集;其次,RDD 是分布式存储的。里面的成员被水平切割成小的的数据块,分散在集群的多个节点上,便于对 RDD 里面的数据进行并行计算。最后,RDD 的分布式弹性的,不是固定不变的。RDD 的一些操作可以被拆分成对各数据块直接计算,不涉及其他节点,比如 map。这样的操作一般在数据块所在的节点上直接进行,不影响 RDD 的分布,除非某个节点故障需要转换到其他节点上。但是在有些操作中,例如 groupBy,必须要访问 RDD 的所有数据块。

RDD 还具有的特点是:

  1. RDD 是只读的,一旦生成,内容就不能修改了。这样的好处是让整个系统的设计相对简单,比如并行计算时不用考虑数据互斥的问题。
  2. RDD 可指定缓存在内存中。一般计算都是流水式生成、使用 RDD,新的 RDD 生成之后,旧的不再使用,并被 Java 虚拟机回收掉。但如果后续有许多计算依赖某个 RDD,我们可以让这个 RDD 缓存在内存中,避免重复计算(尤其适用于机器学习)。
  3. RDD 可以通过重新计算得到。RDD 的高可靠性不是通过复制来实现的,而是通过记录足够的计算过程。

RDD 的定义

一个 RDD 对象,包含如下的 5 个核心属性。

  • 一个分区列表,每个分区里是 RDD 的部分数据(或者称数据块)。
  • 一个依赖列表,存储依赖的其他 RDD。
  • 一个名为 compute 的计算函数,用于计算各 RDD 各分区的值。
  • 分区器(可选),用于键/值类型的 RDD,比如某个 RDD 是按散列来分区。
  • 计算各分区时优先的位置列表(可选),比如从 HDFS 上的文件生成 RDD 时,RDD 分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。

RDD 的 Transformation

RDD 的 Transformation 是指由一个 RDD 生成新 RDD 的过程,比如 flapMap。filter 操作都会返回一个新的 RDD 对象,类型是 MapPartitionsRDD,它是 RDD 子类。

在 Spark 中,RDD 是有依赖关系的,这种依赖关系有两种类型。

  • 窄依赖。依赖上级 RDD 的部分分区。
  • Shuffle 依赖上级 RDD 的所有分区。

使用窄依赖时,可以精确知道依赖的上级 RDD 的分区。一般情况下,会选择与自己在同一节点的上级 RDD 分区,这样计算过程都在同一节点进行,没有网络 IO 开销,非常高效,常见的 map、flatMap、filter操作都是这一类。而 Shuffle 依赖则无法精确定位依赖的上级 RDD 的分区,相当于依赖索引分区,计算时涉及所有节点之间的数据传输,开销巨大。所以,以 Shuffle 依赖为分隔,Task 被分成 Stage,方便计算时的管理。

RDD 的 Action

一次 Action 调用之后,不在生成新的 RDD,结果返回到 Driver 程序。

Shuffle

Shuffle 概念来源于 Hadoop MapReduce,当对一个 RDD 的某个结果分区进行操作而无法精确知道依赖前一个 RDD 的哪个分区时,依赖关系变成了依赖前一个 RDD 的所有分区。Shuffle 本身是一个非常耗资源的操作,它的结果是一次调度的 Stage 的结果,而一次 Stage 包含许多 Task,缓存下来比较划算。Shuffle 使用的本地磁盘目录由 spark.local.dir 属性项指定。

SparkContext

SparkContext 是 Spark 程序最主要的入口,用于和 Spark 集群连接。所有的 Spark 程序都必须创建 SparkContext。进行流式计算时使用 StreamingContext,进行 SQL 计算时使用 SQLContext,都会创建一个 SparkContext。每个 JVM 只允许启动一个 SparkContext。

SparkConf 配置

SparkContext 可以无参数配置,也可以自定义配置。SparkContext 在构造的过程中,已经完成了各项服务的启动。最重要的初始化操作之一是启动 Task 调度器和 DAG 调度器。

DAG 调度与 Task 调度的区别是,DAG 是高层级的调度,为每个 Job 绘制一个有向无环图,跟踪各 Stage 的输出。计算完成 Job 的最短路径,并将 Task 提交给 Task 调度器执行,而 Task 调度器只负责接收 DAG 调度器的请求,负责 Task 的实际调度执行,所以 DAGScheduler 的初始化必须在 Task 调度器之后。

DAG 与 Task 这种分离设计的好处是,Spark 可以灵活设计自己的 DAG 调度,同时还能与其他资源调度系统结合,比如 YARN、Mesos。

DAG 调度

SparkContext 在初始化时,创建了 DAG 调度与 Task 调度来负责 RDD Action 操作的调度执行。

DAGScheduler

DAGScheduler 负责 Spark 的最高级别的任务调度,调度的粒度是 Stage,它为每个 Job 的所有 Stage 计算一个有向无环图,控制它们的并发,并找到一个最佳路径来执行它们。具体的执行过程是将 Stage 下的 Task 集提交给 TaskScheduler 对象,由它来提交到集群上去申请资源并最终完成执行。

TaskScheduler

相比 DAGScheduler 而言,TaskScheduler 是低级别的调度接口,允许实现不同的 Task 调度器,除了自带的之外,还可以使用 Yarn 和 Mesos 调度器。每个 TaskScheduler 对象只服务于一个 SparkContext 的 Task 调度。TaskScheduler 从 DAGScheduler 的每个 Stage 接收一组 Task,并负责将它们发送到集群上,运行它们,如果出错还会重试,最后返回消息给 DAGScheduler。