Spark实践 (1): Spark 工作机制

Spark 工作机制主要包括调度管理、内存管理、容错机制。

调度管理

Spark 调度管理按照场景可以分为2类,一类是Spark程序之间的调度,这是最主要的调度场景;另外一类是Spark程序内部的调度。

Driver 程序

在集群模式下,用户编写的 Spark 程序称为 Driver 程序。每个 Driver 程序包含一个代表集群环境的 SparkContenxt 对象并与之连接,程序的执行从 Driver 程序开始,中间过程会调用 RDD 操作,这些操作通过集群资源管理器来调度执行,一般在 Worker 节点上执行,所有操作执行结束后回到 Driver 程序中,在 Driver 程序中结束。

SparkContext 对象

每个驱动程序里都有一个 SparkContext 对象,担负着与集群沟通的职责,其工作过程如下:

  1. SaprkContext 对象联系集群管理器、分配CPU、内存等资源。
  2. 集群管理器在工作节点上启动一个执行器。
  3. 程序代码会被分发到相应的工作节点上。
  4. SparkContext 分发任务(Task)至各执行器执行。

集群管理器

集群管理器负责集群的资源调度。Spark 支持 3 种集群部署方式,每种部署对应一种资源管理器。

  1. Standalone 模式(资源管理器是Master结点)。最简单的一种集群模式,不依赖于其他系统,调度策略相对单一,只支持先出先进。
  2. Hadoop Yarn。
  3. Apache Mesos。

其他相关名称

  • Job: 一次 RDD Action 对应一次 Job,会提交至资源管理器调度执行。
  • Stage: Job 在执行过程中被分为多个阶段。介于 Job 和 Task 之间,是按 Shuffle 分隔的 Task 集合。
  • 执行器: 每个 Spark 程序在每个节点上启动一个进程,专属于一个 Spark 程序,与 Spark 程序有相同的生命周期,负责 Spark 在节点上启动的 Task,管理内存和磁盘。如果一个节点上有多个 Spark 程序在执行,那么相应的就会启动多个执行器。
  • Task: 在执行器上执行的最小单元。比如 RDD Transformation 操作时对 RDD 内每个分区计算都会对应一个 Task。

Spark 程序之间的调度

主要分为两种,

  1. 静态资源分配
  2. 动态资源分配

Spark 程序内部的调度

当 Spark 为多个用户同时提供服务时,我们可以考虑配置 Spark 程序内部的调度。

在 Spark 程序内部,不同线程提交的 Job 可以并行执行。Spark 的调度器是线程安全的,因此可以支持这种需要同时处理多个请求的服务型应用。

默认情况下,Spark的调度器以 FIFO 的方式运行 Job,前面运行的 Job 优先获得所有资源。从 Spark 0.8 开始,可以开始采用“循环”(round robin)的方式为不同 Job 之间的 Task 分配资源,这样所有的 Job 可以获取差不多相同的资源。这种模式特别适用于多用户的场景。

如果想要开启程序的公平调度,只需要在 SparkContext 中设置 Spark.scheduler.mode 的值为 FAIR:

var conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
var sc = new SparkContext(conf);

公平调度池

公平调度支持对多个 Job 进行分组,这个分组称为调度池,每个调度池可以设置不同的调度选项,当我们想要为一些更重要的 Job 设置更高的优先级时,这个功能就非常有用了。我们可以为不同的用户设置不同的调度池。然后让各个调度池平等地共享资源,而不是按 Job 来共享资源。

指定让 Job 进入那个调度池的具体方法是提交任务的线程在 SparkContext 中设置 spark.scheduler.pool

sc.setLocalProperty("spark.scheduler.pool", "pool1")

这样设置之后,这个线程提交的所有 Job 会使用这个调度池。设置按线程来进行,这样可以很方便地让一个线程下的所有 Job 都在同一个用户下。如果要清空当前线程的调度池设置,可以这样设置

sc.setLocalProperty("spark.scheduler.pool",null)

调度池的默认行为

默认情况下,所有调度池平均共享集群的资源,默认调度池也是。但在每个调度池内部,各个 Job 是按 FIFO 的顺序来执行的。

调度池的配置

  • schedulingMode。(FIFO 或者 FAIR)
  • weight。(用于控制调度池相对于其他调度池的权重)
  • minShare。(最小资源值( core 的数量))

内存管理

相比 Hadoop MapReduce,Spark 计算具有巨大的性能优势,其中很大一部分是因为 Spark 对于内存的充分利用,以及提供的缓存机制。

RDD 持久化

如果一个 RDD 不止一次被用到,那么就可以持久化它,以大幅提升程序的性能。持久化的方法是调用 persist() 函数,除了持久化至内存中,还可以在 persist() 中指定 storage level 参数使用其他的类型。

共享变量

Spark 大部分操作都是 RDD 操作,通过传入函数给 RDD 操作函数来计算。这些函数在不同的节点上并发执行,内部的变量有不同的作用域,不能互相访问。Spark 提供 2 种共享变量–广播变量和计数器。

  1. 广播变量

一个只读对象,在所有节点上都有一份缓存,创建方法如下:

val broadcastVar = sc.broadcast(Array(1, 2, 3))
  1. 计数器

计数器只能增加,可以用于计算或者求和。计数器变量的创建方法是:

SparkContext.accumulator(v, name) 

v 是初始值,name 是名称。注意,只有 Driver 程序可以读这个计算器变量,RDD 操作中读取计数器变量是无意义的。

容错机制

Spark 以前的集群容错处理模型,像 MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行 DAG 里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程中需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。

RDD 也是一个 DAG,每一个 RDD 都会记住创建该数据集需要哪些操作,跟踪记录 RDD 的继承关系,这个关系在 Spark 里面叫 lineage。由于创建 RDD 的操作是相对粗粒度的变换,即单一的操作应用于许多数据元素,而不需存储真正的数据。当一个 RDD 的某个分区丢失时, RDD 有足够的信息记录其如何通过其他 RDD 进行计算,且只需重新计算该分区。

RDD 之间的依赖分为两种。

  • 窄依赖。父分区对应一个子分区。
  • 宽依赖。父分区对应多个子分区。

对应窄依赖,只需要通过重新计算丢失的那一块数据来恢复,容错成本较小。但如果是宽依赖,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算则成了冗余计算。