Repository: spark Updated Branches: refs/heads/master 5c47db065 -> 08db49126
[SPARK-9926] Parallelize partition logic in UnionRDD. This patch has the new logic from #8512 that uses a parallel collection to compute partitions in UnionRDD. The rest of #8512 added an alternative code path for calculating splits in S3, but that isn't necessary to get the same speedup. The underlying problem wasn't that bulk listing wasn't used, it was that an extra FileStatus was retrieved for each file. The fix was just committed as [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810). (I think the original commit also used a single prefix to enumerate all paths, but that isn't always helpful and it was removed in later versions so there is no need for SparkS3Utils.) I tested this using the same table that piapiaozhexiu was using. Calculating splits for a 10-day period took 25 seconds with this change and HADOOP-12810, which is on par with the results from #8512. Author: Ryan Blue <b...@apache.org> Author: Cheolsoo Park <cheols...@netflix.com> Closes #11242 from rdblue/SPARK-9926-parallelize-union-rdd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08db4912 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08db4912 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08db4912 Branch: refs/heads/master Commit: 08db491265a3b50e31993ac6aa07c3f0dd08cdbb Parents: 5c47db0 Author: Ryan Blue <b...@apache.org> Authored: Thu May 5 14:40:37 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu May 5 14:40:37 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/UnionRDD.scala | 18 +++++++++++++++++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 17 +++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/08db4912/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 66cf436..8171dcc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,6 +20,8 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} @@ -62,8 +64,22 @@ class UnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies + // visible for testing + private[spark] val isPartitionListingParallel: Boolean = + rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) + + @transient private lazy val partitionEvalTaskSupport = + new ForkJoinTaskSupport(new ForkJoinPool(8)) + override def getPartitions: Array[Partition] = { - val array = new Array[Partition](rdds.map(_.partitions.length).sum) + val parRDDs = if (isPartitionListingParallel) { + val parArray = rdds.par + parArray.tasksupport = partitionEvalTaskSupport + parArray + } else { + rdds + } + val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) http://git-wip-us.apache.org/repos/asf/spark/blob/08db4912/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index a663dab..979fb42 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -116,6 +116,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test("SparkContext.union parallel partition listing") { + val nums1 = sc.makeRDD(Array(1, 2, 3, 4), 2) + val nums2 = sc.makeRDD(Array(5, 6, 7, 8), 2) + val serialUnion = sc.union(nums1, nums2) + val expected = serialUnion.collect().toList + + assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === false) + + sc.conf.set("spark.rdd.parallelListingThreshold", "1") + val parallelUnion = sc.union(nums1, nums2) + val actual = parallelUnion.collect().toList + sc.conf.remove("spark.rdd.parallelListingThreshold") + + assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === true) + assert(expected === actual) + } + test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") { val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1)) val rddWithNoPartitioner = sc.parallelize(Seq(2 -> true)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org