Repository: spark
Updated Branches:
  refs/heads/master 1cb347fbc -> 4b8806741


[SPARK-15483][SQL] IncrementalExecution should use extra strategies.

## What changes were proposed in this pull request?

Extra strategies does not work for streams because `IncrementalExecution` uses 
modified planner with stateful operations but it does not include extra 
strategies.

This pr fixes `IncrementalExecution` to include extra strategies to use them.

## How was this patch tested?

I added a test to check if extra strategies work for streams.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #13261 from ueshin/issues/SPARK-15483.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b880674
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b880674
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b880674

Branch: refs/heads/master
Commit: 4b88067416ce922ae15a1445cf953fb9b5c43427
Parents: 1cb347f
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Wed May 25 12:02:07 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed May 25 12:02:07 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/IncrementalExecution.scala   |  3 ++-
 .../org/apache/spark/sql/streaming/StreamSuite.scala | 15 +++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b880674/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8b96f65..fe5f36e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -36,7 +36,8 @@ class IncrementalExecution private[sql](
   extends QueryExecution(sparkSession, logicalPlan) {
 
   // TODO: make this always part of planning.
-  val stateStrategy = 
sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil
+  val stateStrategy = 
sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+    sparkSession.sessionState.experimentalMethods.extraStrategies
 
   // Modified planner with stateful operations.
   override def planner: SparkPlanner =

http://git-wip-us.apache.org/repos/asf/spark/blob/4b880674/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b742206..ae89a68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext 
{
       CheckOffsetLogLatestBatchId(2),
       CheckSinkLatestBatchId(2))
   }
+
+  test("insert an extraStrategy") {
+    try {
+      spark.experimental.extraStrategies = TestStrategy :: Nil
+
+      val inputData = MemoryStream[(String, Int)]
+      val df = inputData.toDS().map(_._1).toDF("a")
+
+      testStream(df)(
+        AddData(inputData, ("so slow", 1)),
+        CheckAnswer("so fast"))
+    } finally {
+      spark.experimental.extraStrategies = Nil
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to