Repository: spark Updated Branches: refs/heads/branch-1.3 81de30ae5 -> d13080aa2
[SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner Added a check to the SparkContext.union method to check that a partitioner is defined on all RDDs when instantiating a PartitionerAwareUnionRDD. Author: Steven She <ste...@canopylabs.com> Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits: 5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner (cherry picked from commit b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d13080aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d13080aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d13080aa Branch: refs/heads/branch-1.3 Commit: d13080aa24106a348d3d1e2b8a788292d5915f21 Parents: 81de30a Author: Steven She <ste...@canopylabs.com> Authored: Mon Apr 27 18:55:02 2015 -0400 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Apr 27 18:55:15 2015 -0400 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/rdd/PartitionerAwareUnionRDD.scala | 1 + .../scala/org/apache/spark/rdd/RDDSuite.scala | 21 ++++++++++++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 495227b..66426f3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -972,7 +972,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { val partitioners = rdds.flatMap(_.partitioner).toSet - if (partitioners.size == 1) { + if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 92b0641..7598ff6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { require(rdds.length > 0) + require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index bede1ff..b5f4a19 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val rddWithNoPartitioner = sc.parallelize(Seq(2->true)) + val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner) + assert(unionRdd.isInstanceOf[UnionRDD[_]]) + } + + test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner) + assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]]) + } + + test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val rddWithNoPartitioner = sc.parallelize(Seq(2->true)) + intercept[IllegalArgumentException] { + new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner)) + } + } + test("partitioner aware union") { def makeRDDWithPartitioner(seq: Seq[Int]) = { sc.makeRDD(seq, 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org