Post

Spark入门教程

1.简介

Apache Spark是一个开源的分布式计算框架,旨在提供快速、通用、易用的数据处理和分析技术。它可以在集群中处理大规模数据,支持多种数据处理模式,如批处理、交互式查询、流处理等。Spark还提供了丰富的API,包括Scala、Java、Python和R等语言的API,同时支持SQL查询和机器学习算法。Spark使用内存计算技术,在处理大规模数据时比Hadoop MapReduce更快,可以提高数据处理的效率。Spark还有一个名为Spark Streaming的库,可以用于实时数据处理和流处理。

2.下载

下载页面:https://spark.apache.org/downloads.html

Spark版本支持的Scala版本见页面上的说明,应该与使用的Scala版本保持一致。例如,Spark 3.4.0对应Scala 2.12。

示例代码:https://github.com/apache/spark/tree/master/examples

3.快速入门

https://spark.apache.org/docs/latest/quick-start.html

3.1 Spark Shell

本节使用交互式Spark Shell介绍基本API。

首先启动Shark Shell,在Spark解压目录下运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ ./bin/spark-shell 
Spark context Web UI available at http://10.2.42.35:4040
Spark context available as 'sc' (master = local[*], app id = local-1686277810757).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_332_fiber)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

实际上这是一个Scala Shell,并自动创建了一个SparkSession对象spark和一个SparkContext对象sc

Spark的主要抽象是一个分布式集合,称为Dataset。可以使用spark.read.textFile()从本地或HDFS读取文本文件来创建一个Dataset[String],每行对应一个元素。

下面从Spark目录下的README文件创建一个Dataset:

1
2
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

对于Dataset可以执行查询、过滤、转换、聚合等多种操作,详见Dataset API文档。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Number of items in this Dataset
scala> textFile.count()
res0: Long = 125

// First item in this Dataset
scala> textFile.first()
res1: String = # Apache Spark

scala> val linesWithSpark = textFile.filter(_.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

// How many lines contain "Spark"?
scala> textFile.filter(_.contains("Spark")).count()
res2: Long = 20

// find the line with the most words
scala> textFile.map(_.split(" ").size).reduce(Math.max(_, _))
res3: Int = 16

// implement MapReduce
scala> val wordCounts = textFile.flatMap(_.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [key: string, count(1): bigint]

scala> wordCounts.collect()
res4: Array[(String, Long)] = Array(([![PySpark,1), (online,1), (graphs,1), (API,1), (["Building,1), (documentation,3), (command,,2), (abbreviated,1), ...)

Spark还支持将一个Dataset放在内存缓存中,这在数据被重复访问时是非常有用的。例如:

1
2
3
4
5
6
7
8
scala> linesWithSpark.cache()
res5: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res6: Long = 20

scala> linesWithSpark.count()
res7: Long = 20

3.2 自包含应用

下面使用Spark API编写一个自包含的应用。

SimpleApp.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

这个程序分别统计指定文件中包含 “a” 和 “b” 的行数,注意需要将YOUR_SPARK_HOME替换为Spark安装目录。和Spark Shell不同的是,在程序中需要使用SparkSession.builder初始化SparkSession。

可以使用sbt (Scala)、Maven (Java)或pip (Python)引入Spark依赖。以Maven为例:

1
2
3
4
5
6
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.4.0</version>
    <scope>provided</scope>
</dependency>

之后使用以下命令打包并执行:

1
2
3
4
5
6
7
$ mvn package
...
[INFO] Building jar: .../target/spark-demo-1.0.jar

$ YOUR_SPARK_HOME/bin/spark-submit --class SimpleApp --master "local[4]" target/spark-demo-1.0.jar
...
Lines with a: 72, Lines with b: 39

4.RDD

https://spark.apache.org/docs/latest/rdd-programming-guide.html

每个Spark程序都由一个驱动程序(driver program)组成。驱动程序运行用户的main方法,并在集群上执行不同的并行操作

早期的Spark提供的主要抽象是弹性分布式数据集(resilient distributed dataset, RDD)。RDD是跨集群节点分区的元素集合,可以并行操作。

4.1 初始化Spark

Spark程序必须做的第一件事是创建一个SparkContext对象,用于告诉Spark如何访问集群:

1
2
val spark = SparkSession.builder.appName(name).master(master).getOrCreate()
val sc = spark.sparkContext

其中master是Spark集群的Master URL,如果运行在本地模式则为 “local” 。

4.1.1 使用Spark Shell

在Spark Shell中,已经自动创建了一个SparkContext,变量名为sc。可以使用--master选项指定Master URL,例如:

1
$ ./bin/spark-shell --master local[4]

完整选项列表见spark-shell --help

4.2 弹性分布式数据集(RDD)

RDD是Spark的核心概念,是一个容错的、可以并行操作的元素集合。有两种方式创建RDD:并行化一个现有的集合,或者读取一个外部存储系统(例如HDFS)上的数据集。

4.2.1 并行化集合

可以使用sc.parallelize()将Scala的Seq[T]转换为RDD[T]。例如:

1
2
3
4
5
scala> val data = Array.range(1, 6)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

RDD的一个重要参数是并行度(parallelism),即分区(partition)/分片(slice)个数。Spark会为集群的每个分区运行一个任务。通常Spark会根据集群自动设置分区数,也可以手动指定:

1
2
3
4
5
scala> val distData = sc.parallelize(data, 10)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> distData.getNumPartitions
res0: Int = 10

4.2.2 外部数据集

Spark可以从任何Hadoop支持的存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase等,支持文本文件以及任何Hadoop InputFormat

可以使用sc.textFile()读取文本文件创建RDD[String],每行对应一个元素。该方法的参数可以是本地路径或文件URI(例如hdfs://)。例如:

1
2
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[3] at textFile at <console>:23

注:

  • Spark所有基于文件的输入方法都支持单个文件、目录、压缩文件和通配符。
  • 对于其他的Hadoop文件格式,可以使用sc.newAPIHadoopFile()从HDFS读取一个文件并返回RDD[(K, V)],InputFormat指定读取文件时每个元素的key和value。例如,文本格式(TextInputFormat)的key是行号(LongWritable),value是一行(Text):
1
2
3
4
5
6
7
8
scala> import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.io.{LongWritable, Text}

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> val distFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://...")
distFile: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = README.txt NewHadoopRDD[4] at newAPIHadoopFile at <console>:27

注意:新接口newAPIHadoopFile()使用的InputFormat是org.apache.hadoop.mapreduce.InputFormat,文本格式是org.apache.hadoop.mapreduce.lib.input.TextInputFormat;旧接口hadoopFile()使用的InputFormat是org.apache.hadoop.mapred.InputFormat,文本格式是org.apache.hadoop.mapred.TextInputFormat

4.2.3 RDD操作

RDD支持两种类型的操作:

  • 转换(transformation):从现有数据集创建新数据集。例如mapfilterflatMapmapPartitionssampleunion等。
  • 动作(action):在对数据集进行计算后返回一个值。例如reducecollectcountfirsttakeforeachsaveAsTextFile等。

在Spark中所有的transformation操作都是懒惰执行的,即不会立即计算结果,而是只记住转换操作,只有当action操作需要返回结果时才计算。

默认情况下,每次对RDD执行action操作都会重新计算。可以使用persist()cache()方法将其放在内存中,从而加快查询。

基本操作

下面的代码展示了RDD的基本操作(文本文件来自To be, or not to be):

1
2
3
4
5
6
7
8
scala> val lines = sc.textFile("to-be-or-not-to-be.txt")
lines: org.apache.spark.rdd.RDD[String] = to-be-or-not-to-be.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> val lineLengths = lines.map(_.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:23

scala> val totalLength = lineLengths.reduce(_ + _)
totalLength: Int = 1453

第一行从外部文件创建了一个RDD lines,该数据集并没有立刻被加载到内存中,lines只是一个指向文件的指针。第二行定义了lineLengths作为map操作的结果。同样,lineLengths也不会被立即计算。最后,对lineLengths执行reduce操作,这是一个action。此时,Spark将会把计算分解成多个任务,运行在不同的机器上。每个机器执行自己部分的map和局部reduce,最后将结果返回driver程序。

常用操作:

完整列表参考RDD API文档

将函数传递给Spark

Spark的API很大程度上依赖于在驱动程序中将函数传递到集群上运行。有两种推荐的方法:

  • 匿名函数语法。例如map(line => line.length())reduce(_ + _)
  • 单例object中的静态方法。例如:
1
2
3
4
5
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意,虽然也可以传递实例方法的引用,但这需要包含该方法的对象一起发送到集群上。例如:

1
2
3
4
class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

这等价于rdd.map(x => this.func1(x))。如果创建一个MyClass实例并对其调用doStuff(),其中的map()会引用该MyClass实例的func1()方法,因此需要将整个对象发送到集群。

注:这要求MyClass必须是可序列化的,否则将报错 “SparkException: Task not serializable” 。

类似地,访问外部对象的字段也会引用整个对象:

1
2
3
4
class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

这等价于rdd.map(x => this.field + x)。为了避免这一问题,最简单的方法是将field拷贝到局部变量中:

1
2
3
4
def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
理解闭包

Spark的难点之一是在集群上执行代码时理解变量的作用域和生命周期。修改作用域之外的变量的RDD操作经常会造成混淆。

例如,考虑下面的RDD元素求和。在本地模式下(--master local[n])与部署到集群上(例如通过spark-submit提交到YARN)运行,其行为可能会有所不同。

1
2
3
4
5
6
7
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地vs集群模式

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark会将RDD操作分解为任务(task),每个任务由一个执行器(executor)执行。在执行之前,Spark会计算任务的闭包(closure)。闭包是执行器在RDD上执行计算(在本例中是foreach())时需要的变量和方法。这个闭包被序列化并发送给每个执行器。

发送给每个执行器的闭包内的变量是副本。因此,当counterforeach()函数中被引用时,它不再是驱动节点内存中的counter!执行器只能看到闭包中的副本。因此,驱动节点上counter的最终值仍然为0。

而在本地模式下,foreach()函数可能会在与驱动相同的JVM中执行,因此会引用原始的counter并更新它。

为了确保在这些场景中有明确定义的行为,应该使用累加器(详见4.3.2节)。

打印RDD的元素

另一个常见的习惯用法是尝试使用rdd.foreach(println)打印RDD的元素。在单台机器上,这将生成预期的输出。然而,在集群模式下,输出将被写入执行器的stdout,因此驱动节点的stdout不会显示输出!要在驱动节点上打印所有元素,可以使用collect()方法先将RDD元素传回驱动节点再打印:rdd.collect().foreach(println)。如果只需要打印几个元素,可以使用take()rdd.take(100).foreach(println)

使用键值对

在Scala中,使用RDD[Tuple2]即可实现键值对操作。例如,下面的代码使用reduceByKey操作统计单词出现次数:

1
2
3
4
val lines = sc.textFile("to-be-or-not-to-be.txt")
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(w => (w, 1)).reduceByKey(_ + _)
wordCount.sortBy(_._2, false).take(10)

键值对RDD独有的操作定义在PairRDDFunctions,例如groupByKeyreduceByKeyjoinsaveAsNewAPIHadoopFile等。

4.3 共享变量

通常,当传递给Spark操作(例如map()reduce())的函数在远程集群节点上执行时,函数中使用的变量会被拷贝到每台机器,并且远程机器对变量的更新不会传回驱动节点。Spark提供了两种类型的共享变量:广播变量和累加器。

4.3.1 广播变量

广播变量允许在每台机器上缓存一个只读变量,而不是随任务一起发送副本。例如,它们可以以高效的方式为每个节点提供一份大型输入数据集的副本。

广播变量通过调用SparkContext.broadcast(v)创建,它是v的包装器,可以通过调用value()方法访问其值。例如:

1
2
3
4
5
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

创建广播变量后,集群上运行的函数应该使用它而不是v。此外,v在广播后不应该被修改,以确保所有节点都获得相同的广播变量值。

4.3.2 累加器

累加器是只能(通过可结合、可交换运算)“加”的变量,可以高效地并行操作。可用于实现计数器或求和。Spark提供了数值类型的累加器,也可以添加对新类型的支持。

可以通过调用SparkContext.longAccumulator()SparkContext.doubleAccumulator()来创建数值累加器。之后,集群上运行的任务可以使用add()方法添加值,但不能读取它的值。只有驱动程序可以使用value()方法读取累加器的值。

下面的代码使用累加器将数组元素相加:

1
2
3
4
5
6
7
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

scala> accum.value
res1: Long = 10

可以通过继承AccumulatorV2支持自己的类型。

累加器不会改变Spark的惰性求值。如果在RDD操作中更新累加器,只有action操作才会更新,转换操作不会更新。例如:

1
2
3
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

5.Spark SQL

https://spark.apache.org/docs/latest/sql-programming-guide.html

Spark SQL是用于处理结构化数据的Spark模块。可以使用SQL或Dataset API(见3.1节)与Spark SQL交互。执行计算时,底层使用的执行引擎是相同的,与使用的API或语言无关。

Spark SQL的核心数据结构是DataFrame,即包含数据结构信息(列名、类型等)的Dataset(相当于关系型数据库表或pandas的DataFrame)。在Scala中,DataFrameDataset[Row]的别名。

RDD、Dataset和DataFrame的区别:

  • RDD是分布式数据集合,是早期Spark使用的数据结构。
  • Dataset也是分布式数据集合,是Spark 1.6新增的,在RDD的基础上增加了Spark SQL的优化。
  • DataFrame是结构化数据集合,相当于Dataset[Row]

下面使用Spark Shell介绍DataFrame的基本操作。

完整示例代码:SparkSQLExample.scala

SQL参考:https://spark.apache.org/docs/latest/sql-ref.html

5.1 创建DataFrame

使用SparkSession,可以从RDD、Hive表或Spark数据源创建DataFrame。

例如,people.json包含一些人员信息,每行是一条记录,有name和age两个属性:

1
2
3
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

注:这种文件格式实际上叫做JSON Lines

下面的代码基于以上JSON文件的内容创建了一个DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

// Displays the content of the DataFrame to stdout
scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

注:也可以使用toDF()方法从Scala集合创建DataFrame,必须先导入spark.implicits._。可以通过toDF()的参数指定列名,Spark将自动推断类型。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
// for toDF()
scala> import spark.implicits._

scala> val df = Seq((30, "Andy"), (19, "Justin")).toDF("age", "name")
df: org.apache.spark.sql.DataFrame = [age: int, name: string]

scala> df.show
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

如果要包含null,则必须使用NoneSome

1
2
3
4
5
6
7
8
9
10
11
scala> val df = Seq((None, "Michael"), (Some(30), "Andy"), (Some(19), "Justin")).toDF("age", "name")
df: org.apache.spark.sql.DataFrame = [age: int, name: string]

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

5.2 DataFrame操作

DataFrame提供了结构化数据操作。

下面是一些基本示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Print the schema in a tree format
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

// Select only the "name" column
scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

// This import is needed to use the $-notation
scala> import spark.implicits._

// Select everybody, but increment the age by 1
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

// Select people older than 21
scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

// Count people by age
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

注:

  • $"name"等价于col("name")
  • df.select($"name", $"age" + 1)等价于df.select(col("name"), expr("age + 1"))df.selectExpr("name", "age + 1")
  • df.filter($"age" > 21)等价于df.filter("age > 21")

其中col()expr()等函数定义在 org.apache.spark.sql.functions

下面是一些常用的DataFrame操作:

SQL子句DataFrame操作
SELECTselect()selectExpr()
增加列withColumn()
ASwithColumnRenamed()
JOINjoin()
WHEREfilter()where()
GROUP BYgroupBy().agg()
ORDER BYsort()orderBy()

完整列表见Dataset API文档

除了引用列和表达式外,Spark SQL还提供了丰富的函数库,包括字符串操作、日期运算、常见数学函数等,另外还支持用户自定义函数(UDF)。详见DataFrame Function Reference

5.3 SQL查询

SparkSessionsql()方法可以运行SQL查询,并将结果作为DataFrame返回。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Register the DataFrame as a SQL temporary view
scala> df.createOrReplaceTempView("people")

scala> spark.sql("SELECT * FROM people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.sql("SELECT name FROM people WHERE age > 21").show
+----+
|name|
+----+
|Andy|
+----+

5.4 创建Dataset

可以使用toDS()方法从Scala集合创建Dataset,必须先导入spark.implicits._。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scala> case class Person(name: String, age: Long)
defined class Person

// for toDS()
scala> import spark.implicits._

// Encoders are created for case classes
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
scala> val primitiveDS = Seq(1, 2, 3).toDS
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> primitiveDS.map(_ + 1).collect
res21: Array[Int] = Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
scala> val peopleDS = spark.read.json("examples/src/main/resources/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

注:如果元素类型不是case类,则默认只有一个名为 “value” 的列。例如:

1
2
3
4
5
6
7
8
9
10
11
scala> val ds = Seq(1, 2, 3).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

5.5 与RDD互操作

RDD与Dataset/DataFrame可以相互转换:

  • RDD转Dataset:rdd.toDS()spark.createDataset()
  • RDD转DataFrame:rdd.toDF()spark.createDataFrame()
  • Dataset/DataFrame转RDD:df.rdd

其中,使用toDS()toDF()之前需要import spark.implicits._

Spark SQL支持两种不同的方式将RDD转换为DataFrame。第一种方法是使用反射自动推断数据模式(schema)(即列名和类型),第二种方法是手动构造数据模式。

5.5.1 使用反射推断schema

Spark SQL的Scala接口支持将包含case类的RDD自动转换为DataFrame,case类定义了DataFrame的schema。

例如,下面的代码将文本文件people.txt读取为一个RDD[Person],并转换为DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// For implicit conversions from RDDs to DataFrames
scala> import spark.implicits._
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(a => Person(a(0), a(1).trim.toInt))
peopleRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[3] at map at <console>:28

scala> val peopleDF = peopleRDD.toDF
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> peopleDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

scala> peopleDF.filter("age BETWEEN 13 AND 19").show
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+

5.5.2 手动指定schema

当无法提前定义case类时,可以通过以下三步手动指定schema:

  1. 从原始RDD创建一个RDD[Row]
  2. 根据schema创建一个StructType
  3. 通过SparkSession.createDataFrame()方法将schema应用到RDD

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

// Create an RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[11] at textFile at <console>:30

// Generate the schema
scala> val schema = StructType(Array(
     |   StructField("name", StringType, nullable = true),
     |   StructField("age", IntegerType, nullable = true)
     | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true),StructField(age,IntegerType,true))

// Convert records of the RDD (people) to Rows
scala> val rowRDD = peopleRDD.map(_.split(",")).map(a => Row(a(0), a(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at map at <console>:30

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> peopleDF.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

5.6 标量函数

标量函数是对于每行返回单个值的函数,包括数学函数、字符串函数、日期时间函数等。完整列表见Built-in Scalar Functions

5.7 聚合函数

聚合函数是对于每个分组返回一个值的函数,例如count(), count_distinct(), avg(), max(), min()等。完整列表见Built-in Aggregation Functions

5.8 数据源

Spark SQL支持从多种不同的数据源加载数据,包括文本文件、CSV、JSON、Hive表等等。详见Data Sources

6.Spark Streaming

https://spark.apache.org/docs/latest/streaming-programming-guide.html

注:Spark Streaming是上一代的Spark流引擎,已经不再更新。流式引用应该使用新的流引擎Structured Streaming

Spark Streaming用于处理流式数据,可以从多种数据源(例如Kafka、TCP套接字)消费数据,使用高层次函数(例如mapreducejoinwindow)处理数据,并将处理后的数据输出到文件系统、数据库等。

Spark Streaming architecture

Spark Streaming内部将输入数据流划分成批次(batch),之后使用Spark引擎处理。

Spark Streaming data flow

Spark Streaming提供的高层次抽象称为离散流(discretized stream, DStream),内部表示为RDD序列。

下面介绍如何编写Spark Streaming程序。

6.1 简单示例

假设我们希望统计从TCP套接字接收到的文本数据中的单词数,步骤如下。

首先,创建流式应用的主入口StreamingContext,具有2个执行线程、分批间隔为1秒:

1
2
3
4
5
6
7
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

// Create a local StreamingContext with two working threads and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

之后创建一个DStream,表示来自TCP套接字的流式数据,指定主机名和端口号:

1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

DStream中的每条记录是一行文本。接下来,使用空格将文本行分割为单词:

1
2
// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一个一对多操作,创建了一个新的DStream。下面统计单词数:

1
2
3
4
5
6
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

注:在流式应用中,数据是源源不断到达的,通常是无尽的,因此需要对数据流分批处理。该示例是将每秒内到达的数据作为一个批次,对每个批次分别统计单词数。

注意,(类似于RDD操作)Spark Streaming操作也是懒惰执行的,上面几行代码只记录了要执行的计算,而不会立即执行。要开始处理,需要调用

1
2
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整示例代码:NetworkWordCount

要运行该示例,首先运行Netcat:

1
$ nc -lk 9999

在另一个终端运行:

1
$ ./bin/run-example streaming.NetworkWordCount localhost 9999

之后在Netcat终端输入任意文本,Spark终端将会每秒统计并打印单词数:

1
2
3
4
$ nc -lk 9999
hello world
hello world hello world
^C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1697455022000 ms
-------------------------------------------
(world,1)
(hello,1)

-------------------------------------------
Time: 1697455023000 ms
-------------------------------------------

-------------------------------------------
Time: 1697455024000 ms
-------------------------------------------
(world,2)
(hello,2)
...

注:为了便于观察输出,可以将日志级别改为warn。将conf目录下的log4j2.properties.template拷贝到log4j2.properties,并将其中的rootLogger.level = info改为rootLogger.level = warn。详见Configuring Logging

6.2 依赖

Spark Streaming应用需要添加以下依赖:

1
2
3
4
5
6
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.4.0</version>
    <scope>provided</scope>
</dependency>

如果要从Kafka等数据源消费数据,还需要添加对应的依赖,例如spark-streaming-kafka-0-10_2.12。

6.3 初始化StreamingContext

Spark Streaming程序的主入口是StreamingContext,可以从SparkConf对象创建,并指定分批间隔:

1
2
val conf = new SparkConf().setMaster(master).setAppName(appName)
val ssc = new StreamingContext(conf, Seconds(1))

其中master是集群的Master URL,如果运行在本地模式则为 “local[*]” 。通过spark-submit启动应用时,可以使用--master选项指定Master URL。分批间隔应当根据应用的延迟要求和可用集群资源来设置。

StreamingContext将会自动创建SparkContext,可以通过ssc.sparkContext访问。

初始化StreamingContext之后:

  1. 通过输入DStream定义输入源。
  2. 通过DStream转换和输出操作定义流式计算。
  3. 使用ssc.start()开始接收和处理数据。
  4. 使用ssc.awaitTermination()等待处理结束(手动停止或遇到错误)。
  5. 或者使用ssc.stop()手动停止处理。

6.4 离散流(DStream)

离散流(discretized stream, DStream)是Spark Streaming提供的基本抽象,表示连续的数据流。DStream在内部表示为RDD序列,每个RDD包含来自一定时间间隔的数据,如下图所示。

DStream

作用于DStream上的任何操作都会转换为底层RDD上的操作。例如,在6.1节的例子中,lines.flatMap()生成words,如下图所示。

DStream ops

6.5 输入DStream和接收器

输入DStream表示输入数据流。在6.1节的例子中,lines是表示从netcat接收到的数据流的输入DStream。每个输入DStream(除文件流外)都关联了一个接收器(Receiver)对象。

Spark Streaming提供了两种类型的内置数据源:

(1)基本数据源:StreamingContext API直接提供的数据源,例如:

  • socketTextStream():来自TCP套接字的文本数据
  • textFileStream():读取HDFS目录下的文本文件

(2)高级数据源:需要添加依赖的数据源,例如KafkaKinesis等。

另外,也可以自定义接收器

6.6 DStream操作

完整列表见API文档DStreamPairDStreamFunctions

6.6.1 转换操作

与RDD类似,DStream允许通过转换(transformation)操作修改其中的数据。常用的转换操作:mapflatMapfilterrepartitionunioncountreducecountByValuereduceByKeyjoincogrouptransformupdateStateByKey等。

6.6.2 窗口操作

Spark Streaming还提供了窗口操作,可以在数据的滑动窗口上执行转换操作,如下图所示。

DStream window

落在窗口中的RDD将被组合并执行转换操作,以生成输出DStream中的RDD。

窗口操作需要指定两个参数:窗口长度和滑动距离。在这个例子中,窗口长度为3个时间单位,滑动距离为2个时间单位。这两个参数必须是输入DStream分批间隔的整数倍。

例如,在6.1节的例子中,如果希望每10秒钟计算过去30秒的单词数,可以对pairs DStream执行reduceByKeyAndWindow()操作:

1
2
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(30), Seconds(10))

常用的窗口操作:windowcountByWindowreduceByWindowreduceByKeyAndWindowreduceByKeyAndWindowcountByValueAndWindow等。

6.6.3 join操作

在Spark Streaming中可以很容易地执行join操作。

(1) DStream-DStream join

1
2
3
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

在每个批次中,stream1的RDD与stream2的RDD进行join。另外,也可以执行outer join:leftOuterJoinrightOuterJoinfullOuterJoin

(2) DStream-DataSet join

1
2
3
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

6.6.4 输出操作

DStream的输出操作将数据输出到外部系统,例如数据库或文件系统。输出操作会触发DStream转换操作的执行(类似于RDD的action)。目前支持的输出操作:printsaveAsTextFilessaveAsObjectFilessaveAsHadoopFilesforeachRDD

6.7 DataFrame和SQL操作

foreachRDD中,可以对RDD使用DataFrame和SQL操作。

例如:SqlNetworkWordCount.scala

6.8 缓存/持久化

与RDD类似,DStream可以通过persist()方法将数据保存在内存中,这将自动保存DStream中的每个RDD。如果DStream中的数据将被多次计算(例如窗口操作),这会非常有用。窗口操作生成的DStream会自动保存在内存中,不需要手动调用persist()

6.9 检查点

https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

7.Structured Streaming

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

This post is licensed under CC BY 4.0 by the author.