Repository: spark Updated Branches: refs/heads/branch-2.0 1fb7b3a0a -> 1de3446d9
[SPARK-15512][CORE] repartition(0) should raise IllegalArgumentException ## What changes were proposed in this pull request? Previously, SPARK-8893 added the constraints on positive number of partitions for repartition/coalesce operations in general. This PR adds one missing part for that and adds explicit two testcases. **Before** ```scala scala> sc.parallelize(1 to 5).coalesce(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. ... scala> sc.parallelize(1 to 5).repartition(0).collect() res1: Array[Int] = Array() // empty scala> spark.sql("select 1").coalesce(0) res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int] scala> spark.sql("select 1").coalesce(0).collect() java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. scala> spark.sql("select 1").repartition(0) res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int] scala> spark.sql("select 1").repartition(0).collect() res4: Array[org.apache.spark.sql.Row] = Array() // empty ``` **After** ```scala scala> sc.parallelize(1 to 5).coalesce(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. ... scala> sc.parallelize(1 to 5).repartition(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. ... scala> spark.sql("select 1").coalesce(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. ... scala> spark.sql("select 1").repartition(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. ... ``` ## How was this patch tested? Pass the Jenkins tests with new testcases. Author: Dongjoon Hyun <dongj...@apache.org> Closes #13282 from dongjoon-hyun/SPARK-15512. (cherry picked from commit f08bf587b1913c6cc8ecb34c45331cf4750961c9) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1de3446d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1de3446d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1de3446d Branch: refs/heads/branch-2.0 Commit: 1de3446d9240c4ee7513d3b2f3be2a77344c2e70 Parents: 1fb7b3a Author: Dongjoon Hyun <dongj...@apache.org> Authored: Tue May 24 18:55:23 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue May 24 18:55:31 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 + core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 8 ++++++++ .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 1 + .../spark/sql/catalyst/plans/logical/partitioning.scala | 2 +- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 ++++++++ 6 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e6db9b3..e251421 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -437,6 +437,7 @@ abstract class RDD[T: ClassTag]( partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { + require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index a4992fe..ad56715 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -276,6 +276,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { test("repartitioned RDDs") { val data = sc.parallelize(1 to 1000, 10) + intercept[IllegalArgumentException] { + data.repartition(0) + } + // Coalesce partitions val repartitioned1 = data.repartition(2) assert(repartitioned1.partitions.size == 2) @@ -329,6 +333,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { test("coalesced RDDs") { val data = sc.parallelize(1 to 10, 10) + intercept[IllegalArgumentException] { + data.coalesce(0) + } + val coalesced1 = data.coalesce(2) assert(coalesced1.collect().toList === (1 to 10).toList) assert(coalesced1.glom().collect().map(_.toList).toList === http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ca0096e..0a9250b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -734,6 +734,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { */ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { + require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override def output: Seq[Attribute] = child.output } http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index a5bdee1..28cbce8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -43,7 +43,7 @@ case class RepartitionByExpression( child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData { numPartitions match { - case Some(n) => require(n > 0, "numPartitions must be greater than 0.") + case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.") case None => // Ok } } http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f573abf..0614747 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -259,12 +259,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("repartition") { + intercept[IllegalArgumentException] { + testData.select('key).repartition(0) + } + checkAnswer( testData.select('key).repartition(10).select('key), testData.select('key).collect().toSeq) } test("coalesce") { + intercept[IllegalArgumentException] { + testData.select('key).coalesce(0) + } + assert(testData.select('key).coalesce(1).rdd.partitions.size === 1) checkAnswer( http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0ffbd6d..05de79e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -81,6 +81,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val data = (1 to 100).map(i => ClassData(i.toString, i)) val ds = data.toDS() + intercept[IllegalArgumentException] { + ds.coalesce(0) + } + + intercept[IllegalArgumentException] { + ds.repartition(0) + } + assert(ds.repartition(10).rdd.partitions.length == 10) checkDataset( ds.repartition(10), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org