Repository: spark Updated Branches: refs/heads/master 838effb98 -> e68aed70f
[SPARK-21216][SS] Hive strategies missed in Structured Streaming IncrementalExecution ## What changes were proposed in this pull request? If someone creates a HiveSession, the planner in `IncrementalExecution` doesn't take into account the Hive scan strategies. This causes joins of Streaming DataFrame's with Hive tables to fail. ## How was this patch tested? Regression test Author: Burak Yavuz <brk...@gmail.com> Closes #18426 from brkyvz/hive-join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e68aed70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e68aed70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e68aed70 Branch: refs/heads/master Commit: e68aed70fbf1cfa59ba51df70287d718d737a193 Parents: 838effb Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Jun 28 10:45:45 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jun 28 10:45:45 2017 -0700 ---------------------------------------------------------------------- .../streaming/IncrementalExecution.scala | 4 ++ .../spark/sql/hive/execution/HiveDDLSuite.scala | 41 +++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index ab89dc6..dbe652b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -47,6 +47,10 @@ class IncrementalExecution( sparkSession.sparkContext, sparkSession.sessionState.conf, sparkSession.sessionState.experimentalMethods) { + override def strategies: Seq[Strategy] = + extraPlanningStrategies ++ + sparkSession.sessionState.planner.strategies + override def extraPlanningStrategies: Seq[Strategy] = StatefulAggregationStrategy :: FlatMapGroupsWithStateStrategy :: http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index aca9649..31fa3d2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -160,7 +160,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA test("drop table") { testDropTable(isDatasourceTable = false) } - } class HiveDDLSuite @@ -1956,4 +1955,44 @@ class HiveDDLSuite } } } + + test("SPARK-21216: join with a streaming DataFrame") { + import org.apache.spark.sql.execution.streaming.MemoryStream + import testImplicits._ + + implicit val _sqlContext = spark.sqlContext + + Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word").createOrReplaceTempView("t1") + // Make a table and ensure it will be broadcast. + sql("""CREATE TABLE smallTable(word string, number int) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |STORED AS TEXTFILE + """.stripMargin) + + sql( + """INSERT INTO smallTable + |SELECT word, number from t1 + """.stripMargin) + + val inputData = MemoryStream[Int] + val joined = inputData.toDS().toDF() + .join(spark.table("smallTable"), $"value" === $"number") + + val sq = joined.writeStream + .format("memory") + .queryName("t2") + .start() + try { + inputData.addData(1, 2) + + sq.processAllAvailable() + + checkAnswer( + spark.table("t2"), + Seq(Row(1, "one", 1), Row(2, "two", 2)) + ) + } finally { + sq.stop() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org