Repository: spark Updated Branches: refs/heads/master 1cb347fbc -> 4b8806741
[SPARK-15483][SQL] IncrementalExecution should use extra strategies. ## What changes were proposed in this pull request? Extra strategies does not work for streams because `IncrementalExecution` uses modified planner with stateful operations but it does not include extra strategies. This pr fixes `IncrementalExecution` to include extra strategies to use them. ## How was this patch tested? I added a test to check if extra strategies work for streams. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #13261 from ueshin/issues/SPARK-15483. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b880674 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b880674 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b880674 Branch: refs/heads/master Commit: 4b88067416ce922ae15a1445cf953fb9b5c43427 Parents: 1cb347f Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Wed May 25 12:02:07 2016 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed May 25 12:02:07 2016 -0700 ---------------------------------------------------------------------- .../execution/streaming/IncrementalExecution.scala | 3 ++- .../org/apache/spark/sql/streaming/StreamSuite.scala | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b880674/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 8b96f65..fe5f36e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -36,7 +36,8 @@ class IncrementalExecution private[sql]( extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. - val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil + val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +: + sparkSession.sessionState.experimentalMethods.extraStrategies // Modified planner with stateful operations. override def planner: SparkPlanner = http://git-wip-us.apache.org/repos/asf/spark/blob/4b880674/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b742206..ae89a68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext { CheckOffsetLogLatestBatchId(2), CheckSinkLatestBatchId(2)) } + + test("insert an extraStrategy") { + try { + spark.experimental.extraStrategies = TestStrategy :: Nil + + val inputData = MemoryStream[(String, Int)] + val df = inputData.toDS().map(_._1).toDF("a") + + testStream(df)( + AddData(inputData, ("so slow", 1)), + CheckAnswer("so fast")) + } finally { + spark.experimental.extraStrategies = Nil + } + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org