Repository: spark Updated Branches: refs/heads/master 1270e7175 -> f10cbf17d
[SPARK-21977][HOTFIX] Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor ## What changes were proposed in this pull request? Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes: ``` *** RUN ABORTED *** org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35) ``` ## How was this patch tested? Existing tests Author: Sean Owen <so...@cloudera.com> Closes #19306 from srowen/SPARK-21977.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f10cbf17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f10cbf17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f10cbf17 Branch: refs/heads/master Commit: f10cbf17dc7ceb96982fcdc964c849336fb50deb Parents: 1270e71 Author: Sean Owen <so...@cloudera.com> Authored: Thu Sep 21 18:00:19 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Sep 21 18:00:19 2017 +0100 ---------------------------------------------------------------------- .../EnsureStatefulOpPartitioningSuite.scala | 92 +++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f10cbf17/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala index 66c0263..044bb03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming import java.util.UUID import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Attribute @@ -32,39 +33,47 @@ import org.apache.spark.sql.test.SharedSQLContext class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLContext { import testImplicits._ - super.beforeAll() - private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char") + private var baseDf: DataFrame = null - testEnsureStatefulOpPartitioning( - "ClusteredDistribution generates Exchange with HashPartitioning", - baseDf.queryExecution.sparkPlan, - requiredDistribution = keys => ClusteredDistribution(keys), - expectedPartitioning = - keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), - expectShuffle = true) + override def beforeAll(): Unit = { + super.beforeAll() + baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char") + } + + test("ClusteredDistribution generates Exchange with HashPartitioning") { + testEnsureStatefulOpPartitioning( + baseDf.queryExecution.sparkPlan, + requiredDistribution = keys => ClusteredDistribution(keys), + expectedPartitioning = + keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), + expectShuffle = true) + } - testEnsureStatefulOpPartitioning( - "ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning", - baseDf.coalesce(1).queryExecution.sparkPlan, - requiredDistribution = keys => ClusteredDistribution(keys), - expectedPartitioning = - keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), - expectShuffle = true) + test("ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning") { + testEnsureStatefulOpPartitioning( + baseDf.coalesce(1).queryExecution.sparkPlan, + requiredDistribution = keys => ClusteredDistribution(keys), + expectedPartitioning = + keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions), + expectShuffle = true) + } - testEnsureStatefulOpPartitioning( - "AllTuples generates Exchange with SinglePartition", - baseDf.queryExecution.sparkPlan, - requiredDistribution = _ => AllTuples, - expectedPartitioning = _ => SinglePartition, - expectShuffle = true) + test("AllTuples generates Exchange with SinglePartition") { + testEnsureStatefulOpPartitioning( + baseDf.queryExecution.sparkPlan, + requiredDistribution = _ => AllTuples, + expectedPartitioning = _ => SinglePartition, + expectShuffle = true) + } - testEnsureStatefulOpPartitioning( - "AllTuples with coalesce(1) doesn't need Exchange", - baseDf.coalesce(1).queryExecution.sparkPlan, - requiredDistribution = _ => AllTuples, - expectedPartitioning = _ => SinglePartition, - expectShuffle = false) + test("AllTuples with coalesce(1) doesn't need Exchange") { + testEnsureStatefulOpPartitioning( + baseDf.coalesce(1).queryExecution.sparkPlan, + requiredDistribution = _ => AllTuples, + expectedPartitioning = _ => SinglePartition, + expectShuffle = false) + } /** * For `StatefulOperator` with the given `requiredChildDistribution`, and child SparkPlan @@ -72,26 +81,23 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont * ensure the expected partitioning. */ private def testEnsureStatefulOpPartitioning( - testName: String, inputPlan: SparkPlan, requiredDistribution: Seq[Attribute] => Distribution, expectedPartitioning: Seq[Attribute] => Partitioning, expectShuffle: Boolean): Unit = { - test(testName) { - val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1))) - val executed = executePlan(operator, OutputMode.Complete()) - if (expectShuffle) { - val exchange = executed.children.find(_.isInstanceOf[Exchange]) - if (exchange.isEmpty) { - fail(s"Was expecting an exchange but didn't get one in:\n$executed") - } - assert(exchange.get === - ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan), - s"Exchange didn't have expected properties:\n${exchange.get}") - } else { - assert(!executed.children.exists(_.isInstanceOf[Exchange]), - s"Unexpected exchange found in:\n$executed") + val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1))) + val executed = executePlan(operator, OutputMode.Complete()) + if (expectShuffle) { + val exchange = executed.children.find(_.isInstanceOf[Exchange]) + if (exchange.isEmpty) { + fail(s"Was expecting an exchange but didn't get one in:\n$executed") } + assert(exchange.get === + ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan), + s"Exchange didn't have expected properties:\n${exchange.get}") + } else { + assert(!executed.children.exists(_.isInstanceOf[Exchange]), + s"Unexpected exchange found in:\n$executed") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org