Spark RDD 介绍
FJHHH Lv3

[TOC]

Spark RDD 介绍

记录一下最近看的 Spark 的东西, 大部分东西都从 RDD Programming Guide 里整理摘抄. 对 Scala 还不熟悉, 示例以 Java 为主.

RDD 的东西其实还有很多, 远超这篇笔记的内容. 刚接触 Spark , 理解得不深也讲不好, 还是以文档和书籍为主.

Spark 围绕着 RDD (Resilient Distributed Dataset) 展开, RDD 是一个容错的可并行的集合.

创建操作

RDD 可以通过并行化驱动程序中的已有集合或者读取外部存储系统的数据集来创建.

在创建 RDD 之前, 需要先初始化 SparkContext:

1
2
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

在 shell 中可以省略这一步.

并行化集合

1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

外部数据集

Spark 能够用任何 Hadoop 支持的存储元来创建 RDD, 包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等等. Spark 支持文本文件、SequenceFiles、和其他的任何 Hadoop 输入格式. External Datasets

1
2
3
4
// 文件系统
JavaRDD<String> distFile = sc.textFile("README.md");
// 从HDFS中读取
JavaRDD<String> hdfsFile = sc.textFile("hdfs://host:port/path/README.md");

闭包

下面这段代码在本地模式运行和在集群中运行的表现将会不一样 (实际上与是否在同一个JVM中运行相关).

1
2
3
4
5
6
7
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

在执行 job 之前, Spark 会将 RDD 的操作切分成 task, 每个 task 都将在一个 executor 中执行. 在执行之前, Spark 会计算 task 的闭包(executor 执行计算所需要的变量、方法). 这个闭包会被序列化, 并被发送给每个 executor. 被发送给 executor 的闭包都是一个 copy, 因此闭包的中变量都不再是驱动节点中的变量了. 原来驱动节点中的变量对 executor 不再可见. 在上面的例子中, counter 的最终值将会是0.

在本地模式中, 一些情况下, foreach 方法会与驱动程序在同一个 JVM 中执行, 这时将会引用同一个 counter.

为了确保在这些场景中的良好行为, 应该使用 Accumulator.

闭包中不应改变某些全局状态.

转换操作和行动操作

RDD 是不可变的, 转换操作将会在原 RDD 的基础上生成一个新的 RDD. 所有的转换操作都是 lazy 的, 在行动操作之前不会进行计算. 默认情况下, 每次行动操作, 转换 RDD 会被重新计算, 可以使用 persist 方法来把一个 RDD 放到内存、磁盘里, 这样就只需要计算一次.

Spark 支持的转换操作请看 Transformations

行动操作会触发计算, 并将结果返回给驱动程序. Spark 支持的行动操作请看 Actions

分区

分区是 RDD 并行计算的单元. 一个 RDD 会被分为若干个分区, 这些分区甚至可以不在同一台机器上. 每个分区的技术计算都在一个 task 中进行, task 的个数也由分区数决定. 分区的最小数量可以在创建 RDD 时指定:

1
JavaRDD<String> distFile = sc.textFile("README.md", 4);

默认的分区数量可以通过修改 spark.default.parallelism 来配置. 如果没有这个配置, 则与不同的集群环境相关.

  • 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
  • Apache Mesos:默认的分区数为8
  • Standalone或YARN:默认取集群中所有核心数目的总和,或者2,取二者的较大值。对于parallelize来说,没有在方法中的指定分区数,则默认为spark.default.parallelism,对于textFile来说,没有在方法中的指定分区数,则默认为min(defaultParallelism,2),而defaultParallelism对应的就是spark.default.parallelism。如果是从hdfs上面读取文件,其分区数为文件分片数(128MB/片)

Shuffle 操作

shuffle 是 Spark 重新分配数据的一个机制. 一些跨分区的操作会触发 shuffle. shuffle 通常涉及跨 executor 和机器的数据拷贝(会涉及磁盘I/O、数据序列化和网络I/O), 因此非常昂贵和复杂. Shuffle operations

持久性

Spark 的一项重要功能是跨操作持久化或缓存 RDD, 并且可以使用不同的存储级别.RDD Persistence

变量共享

之前提到过, 在集群中运行时, 变量会被拷贝到每台机器上, 并且对变量的改动不会被传播回驱动程序. Spark 提供了两种类型的共享变量 broadcast variable 和 accumulator.

累加器

Spark 支持数字型的累加器, 用户也能自定义一些累加器,

broadcast variable

Broadcast variable 允许你在每台机器上创建一个只读的变量缓存, 而不是和任务一起传输. Spark 同时尝试使用更高效的广播算法来减少网络开销. 使用 SparkContext.broadcast(v) 来创建一个广播变量.

1
2
3
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]

如果 v 是一个对象, 在创建完广播变量后不要修改 v, 防止将不同的值传给不同的机器. 我们可以创建命名的和匿名(named or unnamed)的累加器, 命名的累加器将会被展示在 webUI中.

1
2
3
4
5
6
LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));

accum.value();
// returns 10
 Comments