Repository: spark
Updated Branches:
  refs/heads/master 1270e7175 -> f10cbf17d


[SPARK-21977][HOTFIX] Adjust EnsureStatefulOpPartitioningSuite to use scalatest 
lifecycle normally instead of constructor

## What changes were proposed in this pull request?

Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally 
instead of constructor; fixes:

```
*** RUN ABORTED ***
  org.apache.spark.SparkException: Only one SparkContext may be running in this 
JVM (see SPARK-2243). To ignore this error, set 
spark.driver.allowMultipleContexts = true. The currently running SparkContext 
was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #19306 from srowen/SPARK-21977.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f10cbf17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f10cbf17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f10cbf17

Branch: refs/heads/master
Commit: f10cbf17dc7ceb96982fcdc964c849336fb50deb
Parents: 1270e71
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Sep 21 18:00:19 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Sep 21 18:00:19 2017 +0100

----------------------------------------------------------------------
 .../EnsureStatefulOpPartitioningSuite.scala     | 92 +++++++++++---------
 1 file changed, 49 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f10cbf17/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
index 66c0263..044bb03 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
 import java.util.UUID
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -32,39 +33,47 @@ import org.apache.spark.sql.test.SharedSQLContext
 class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with 
SharedSQLContext {
 
   import testImplicits._
-  super.beforeAll()
 
-  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+  private var baseDf: DataFrame = null
 
-  testEnsureStatefulOpPartitioning(
-    "ClusteredDistribution generates Exchange with HashPartitioning",
-    baseDf.queryExecution.sparkPlan,
-    requiredDistribution = keys => ClusteredDistribution(keys),
-    expectedPartitioning =
-      keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
-    expectShuffle = true)
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+  }
+
+  test("ClusteredDistribution generates Exchange with HashPartitioning") {
+    testEnsureStatefulOpPartitioning(
+      baseDf.queryExecution.sparkPlan,
+      requiredDistribution = keys => ClusteredDistribution(keys),
+      expectedPartitioning =
+        keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+      expectShuffle = true)
+  }
 
-  testEnsureStatefulOpPartitioning(
-    "ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
-    baseDf.coalesce(1).queryExecution.sparkPlan,
-    requiredDistribution = keys => ClusteredDistribution(keys),
-    expectedPartitioning =
-      keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
-    expectShuffle = true)
+  test("ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning") {
+    testEnsureStatefulOpPartitioning(
+      baseDf.coalesce(1).queryExecution.sparkPlan,
+      requiredDistribution = keys => ClusteredDistribution(keys),
+      expectedPartitioning =
+        keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+      expectShuffle = true)
+  }
 
-  testEnsureStatefulOpPartitioning(
-    "AllTuples generates Exchange with SinglePartition",
-    baseDf.queryExecution.sparkPlan,
-    requiredDistribution = _ => AllTuples,
-    expectedPartitioning = _ => SinglePartition,
-    expectShuffle = true)
+  test("AllTuples generates Exchange with SinglePartition") {
+    testEnsureStatefulOpPartitioning(
+      baseDf.queryExecution.sparkPlan,
+      requiredDistribution = _ => AllTuples,
+      expectedPartitioning = _ => SinglePartition,
+      expectShuffle = true)
+  }
 
-  testEnsureStatefulOpPartitioning(
-    "AllTuples with coalesce(1) doesn't need Exchange",
-    baseDf.coalesce(1).queryExecution.sparkPlan,
-    requiredDistribution = _ => AllTuples,
-    expectedPartitioning = _ => SinglePartition,
-    expectShuffle = false)
+  test("AllTuples with coalesce(1) doesn't need Exchange") {
+    testEnsureStatefulOpPartitioning(
+      baseDf.coalesce(1).queryExecution.sparkPlan,
+      requiredDistribution = _ => AllTuples,
+      expectedPartitioning = _ => SinglePartition,
+      expectShuffle = false)
+  }
 
   /**
    * For `StatefulOperator` with the given `requiredChildDistribution`, and 
child SparkPlan
@@ -72,26 +81,23 @@ class EnsureStatefulOpPartitioningSuite extends 
SparkPlanTest with SharedSQLCont
    * ensure the expected partitioning.
    */
   private def testEnsureStatefulOpPartitioning(
-      testName: String,
       inputPlan: SparkPlan,
       requiredDistribution: Seq[Attribute] => Distribution,
       expectedPartitioning: Seq[Attribute] => Partitioning,
       expectShuffle: Boolean): Unit = {
-    test(testName) {
-      val operator = TestStatefulOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
-      val executed = executePlan(operator, OutputMode.Complete())
-      if (expectShuffle) {
-        val exchange = executed.children.find(_.isInstanceOf[Exchange])
-        if (exchange.isEmpty) {
-          fail(s"Was expecting an exchange but didn't get one in:\n$executed")
-        }
-        assert(exchange.get ===
-          ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
-          s"Exchange didn't have expected properties:\n${exchange.get}")
-      } else {
-        assert(!executed.children.exists(_.isInstanceOf[Exchange]),
-          s"Unexpected exchange found in:\n$executed")
+    val operator = TestStatefulOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
+    val executed = executePlan(operator, OutputMode.Complete())
+    if (expectShuffle) {
+      val exchange = executed.children.find(_.isInstanceOf[Exchange])
+      if (exchange.isEmpty) {
+        fail(s"Was expecting an exchange but didn't get one in:\n$executed")
       }
+      assert(exchange.get ===
+        ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
+        s"Exchange didn't have expected properties:\n${exchange.get}")
+    } else {
+      assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+        s"Unexpected exchange found in:\n$executed")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to