Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1fb7b3a0a -> 1de3446d9


[SPARK-15512][CORE] repartition(0) should raise IllegalArgumentException

## What changes were proposed in this pull request?

Previously, SPARK-8893 added the constraints on positive number of partitions 
for repartition/coalesce operations in general. This PR adds one missing part 
for that and adds explicit two testcases.

**Before**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0).collect()
res1: Array[Int] = Array()   // empty
scala> spark.sql("select 1").coalesce(0)
res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").coalesce(0).collect()
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
scala> spark.sql("select 1").repartition(0)
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").repartition(0).collect()
res4: Array[org.apache.spark.sql.Row] = Array()  // empty
```

**After**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
...
scala> spark.sql("select 1").coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
...
scala> spark.sql("select 1").repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions 
(0) must be positive.
...
```

## How was this patch tested?

Pass the Jenkins tests with new testcases.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13282 from dongjoon-hyun/SPARK-15512.

(cherry picked from commit f08bf587b1913c6cc8ecb34c45331cf4750961c9)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1de3446d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1de3446d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1de3446d

Branch: refs/heads/branch-2.0
Commit: 1de3446d9240c4ee7513d3b2f3be2a77344c2e70
Parents: 1fb7b3a
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue May 24 18:55:23 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 24 18:55:31 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala           | 1 +
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala      | 8 ++++++++
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala   | 1 +
 .../spark/sql/catalyst/plans/logical/partitioning.scala      | 2 +-
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala   | 8 ++++++++
 6 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e6db9b3..e251421 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -437,6 +437,7 @@ abstract class RDD[T: ClassTag](
                partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
               (implicit ord: Ordering[T] = null)
       : RDD[T] = withScope {
+    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be 
positive.")
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from 
a random partition. */
       val distributePartition = (index: Int, items: Iterator[T]) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index a4992fe..ad56715 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -276,6 +276,10 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
   test("repartitioned RDDs") {
     val data = sc.parallelize(1 to 1000, 10)
 
+    intercept[IllegalArgumentException] {
+      data.repartition(0)
+    }
+
     // Coalesce partitions
     val repartitioned1 = data.repartition(2)
     assert(repartitioned1.partitions.size == 2)
@@ -329,6 +333,10 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
   test("coalesced RDDs") {
     val data = sc.parallelize(1 to 10, 10)
 
+    intercept[IllegalArgumentException] {
+      data.coalesce(0)
+    }
+
     val coalesced1 = data.coalesce(2)
     assert(coalesced1.collect().toList === (1 to 10).toList)
     assert(coalesced1.glom().collect().map(_.toList).toList ===

http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ca0096e..0a9250b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -734,6 +734,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
  */
 case class Repartition(numPartitions: Int, shuffle: Boolean, child: 
LogicalPlan)
   extends UnaryNode {
+  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be 
positive.")
   override def output: Seq[Attribute] = child.output
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
index a5bdee1..28cbce8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -43,7 +43,7 @@ case class RepartitionByExpression(
     child: LogicalPlan,
     numPartitions: Option[Int] = None) extends RedistributeData {
   numPartitions match {
-    case Some(n) => require(n > 0, "numPartitions must be greater than 0.")
+    case Some(n) => require(n > 0, s"Number of partitions ($n) must be 
positive.")
     case None => // Ok
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f573abf..0614747 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -259,12 +259,20 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("repartition") {
+    intercept[IllegalArgumentException] {
+      testData.select('key).repartition(0)
+    }
+
     checkAnswer(
       testData.select('key).repartition(10).select('key),
       testData.select('key).collect().toSeq)
   }
 
   test("coalesce") {
+    intercept[IllegalArgumentException] {
+      testData.select('key).coalesce(0)
+    }
+
     assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)
 
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/1de3446d/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 0ffbd6d..05de79e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -81,6 +81,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val data = (1 to 100).map(i => ClassData(i.toString, i))
     val ds = data.toDS()
 
+    intercept[IllegalArgumentException] {
+      ds.coalesce(0)
+    }
+
+    intercept[IllegalArgumentException] {
+      ds.repartition(0)
+    }
+
     assert(ds.repartition(10).rdd.partitions.length == 10)
     checkDataset(
       ds.repartition(10),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to