Spark实践 (3): Spark SQL 与数据仓库

Spark SQL 基础

使用 Spark SQL 有 2 种方式,一种是通过写 SQL 来进行计算,另外一种是在 Spark 程序中,通过领域 API 的形式来操作数据(被抽象为 DataFrame)。

分布式 SQL 引擎

作为分布式引擎,有两种运行方式,一种是 JDBC/ODBC Server,另一种是使用 Spark SQL 命令行。在正式环境下,使用前者比较好。

DataFrame

通过写 SQL 来使用 Spark SQL 和 hive 区别不大,这里不再详细介绍,稍微提一下的是,有一些 hive 的特性是 Spark SQL 不支持的,最主要的是 Hive 的 bucker 表,使用散列的方式对 hive 表进行分区。

DataFrame 具有和 RDD 类似的概念,但还增加了列的概念。

在 Spark 中使用 DataFrame 的过程,可以分为 4 步,

  1. 初始化环境,一般是创建一个 SQLContext 对象。
  2. 创建一个 DataFrame,可以来源于 RDD 或其他数据源。
  3. 调用 DataFrame 操作,是一种领域特定的 API,可以实现所有的 SQL 功能。
  4. 可以直接通过函数执行 SQL 语句。
  • 创建 SQLContext。

    val sc: SparkContext
    val sqlContext = new SQLContext(sc)
    
  • 创建 DataFrame

环境初始化之后,就可以创建 DataFrame 了,主要有两种创建方式。

  1. 从 RDD 创建,又分 2种:

    • 使用 Scala 反射。

    • 程序指定,略繁杂,但是可以运行时指定。

  2. 从其他数据源创建。

使用反射的方法从 RDD 创建 DataFrame

这个方法是先定义一个 case class,参数名即为列名,然后将 RDD 的成员转换成 case class 类型,包含 case class 的 RDD 可以通过反射方式被隐式转换成 DataFrame,case class 的参数名会成为表的列名,然后就可以注册成一张表。

这种方法前提是在写程序之前就已经知道了数据格式,可以预先设定表的模式。

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkSQLSimpleExample")
    val sc = new SparkContext()

    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    case class Person(name: String, age: Int)
    val rdd = sc.textFile("path/to/file").map(_.split(","))
    // 包含了 case class 的 RDD
    val rddContainingCaseClass = rdd.map(p => Person(p(0), p(1).trim.toInt))
    // 被隐式转换成 DataFrame
    val people = rddContainingCaseClass.toDF()
    // 将 DataFrame 的内容打印到标准输出
    people.show()
  }

使用程序动态从 RDD 创建 DataFrame

当 case class 无法提前知道数据格式时,可以在运行时动态指定表模式来从 RDD 创建 DataFrame。具体步骤如下:

  1. 从原来的 RDD 创建一个新的 RDD,成员是 Row 类型,包含所有列。
  2. 创建一个 StructType 类型的表模式,其结构与步骤 1 中创建的 RDD 的 Row 结构相匹配。
  3. 使用 SQLContext.createDataFrame 方法将表模式应用到步骤 1 创建的 RDD 上。

    val sqLContext = new SQLContext(sc)
    // 普通的 RDD
    val people = sc.textFile("path/to/file")
    // 字符串格式的表模式
    val schemaString = "name age"
    // 根据字符串格式的表模式创建结构化的表模式,用 StructType 保存
    val schema =
          StructType(
            schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))
          )
    // 将普通 RDD 的成员转换成 Row 对象
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    // 将模式作用到 RDD 上,生成 DataFrame
    val peopleDataFrame = sqLContext.createDataFrame(rowRDD, schema)
    peopleDataFrame.show()
    

从其他数据源生成 DataFrame

Spark 提供了统一的接口,可以很方便地从其他数据源创建 DataFrame,例如:

val df = sqlContext.read.json("path/to/file.json")
df.show()

DataFrame 基本操作

// select * from 
df.show()  

// select name from 
df.select("name").show()

// select name, age + 1 from
df.select(df("name"), df("age") + 1).show()

// select * from  xxx where age > 21
df.filter(df("age") > 21).show()

// select age, count(*) from xxx group by age
df.groupBy("age").count().show()

// 使用 registerTempTable 方法将 Dataframe 注册成一张表:
df.registerTempTable("people")

// 之后可以使用纯 SQL 来访问
val result = sqlContext.sql("SELECT * FROM people")

DataFrame 数据源

DataFrame 支持非常多类型的数据源,包括 Hive、Avro、Parquet、ORC、JSON、JDBC。而 Spark 提供了统一的读写接口。

通过数据源加载数据时,默认的类型是 Parquet,这是一种大数据计算中最常用的列式存储格式:

val df = sqlContext.read.load("path/to/file.parquet")

对于其他类型,可以使用 format 指定:

val df = 
    sqlContext.read.format("json").load("path/to/file.json")

保存数据时和加加载数据方法类似。

Spark SQL 原理和运行机制

Catalyst 执行优化器

Catalyst 是 Spark SQL 执行优化器的代号,所有 Spark SQL 语句最终都能通过它来解析、优化,最终生成可以执行的 Java 字节码。

Catalyst 最主要的数据结构是树,所有 SQL 语句都会用树结构来存储,树中的每个节点有一个类(class),以及 0 或多个子节点。Scala 中定义的新的节点类型都是 TreeNode 这个类的子类。

Catalyst 另外一个重要的概念是规则。基本上,所有优化都是基于规则的。可以用规则对树进行操作,树中的节点是只读的,所以树也是只读的。规则中定义的函数可能实现从一棵树转换成一颗新树。

整个 Catalyst 的执行过程可以分为以下 4 个阶段:

  • 分析阶段,分析逻辑树,解决引用
  • 逻辑优化阶段
  • 物理计划阶段,Catalyst 会生成多个计划,并基于成本进行对比
  • 代码生成阶段