Repository: spark Updated Branches: refs/heads/master 601a237b3 -> 2d968a07d
[SPARK-21421][SS] Add the query id as a local property to allow source and sink using it ## What changes were proposed in this pull request? Add the query id as a local property to allow source and sink using it. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #18638 from zsxwing/SPARK-21421. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d968a07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d968a07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d968a07 Branch: refs/heads/master Commit: 2d968a07d211688a9c588deb859667dd8b653b27 Parents: 601a237 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Fri Jul 14 14:37:27 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri Jul 14 14:37:27 2017 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 4 +++ .../sql/streaming/StreamingQuerySuite.scala | 27 ++++++++++++++++++++ 2 files changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 10c42a7..5ee596e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -263,6 +263,7 @@ class StreamExecution( try { sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, interruptOnCancel = true) + sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString) if (sparkSession.sessionState.conf.streamingMetricsEnabled) { sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } @@ -842,6 +843,9 @@ class StreamExecution( } } +object StreamExecution { + val QUERY_ID_KEY = "sql.streaming.queryId" +} /** * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 0925646..41f73b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -613,6 +613,33 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("get the query id in source") { + @volatile var queryId: String = null + val source = new Source { + override def stop(): Unit = {} + override def getOffset: Option[Offset] = { + queryId = spark.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY) + None + } + override def getBatch(start: Option[Offset], end: Offset): DataFrame = spark.emptyDataFrame + override def schema: StructType = MockSourceProvider.fakeSchema + } + + MockSourceProvider.withMockSources(source) { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.util.MockSourceProvider") + .load() + testStream(df)( + AssertOnQuery { sq => + sq.processAllAvailable() + assert(sq.id.toString === queryId) + assert(sq.runId.toString !== queryId) + true + } + ) + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org