This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 30637a8 [SPARK-31593][SS] Remove unnecessary streaming query progress update 30637a8 is described below commit 30637a81fb50492e3c54759bb3fd7aac1cd0e326 Author: uncleGen <husty...@gmail.com> AuthorDate: Sun Jun 14 14:49:01 2020 +0900 [SPARK-31593][SS] Remove unnecessary streaming query progress update ### What changes were proposed in this pull request? Structured Streaming progress reporter will always report an `empty` progress when there is no new data. As design, we should provide progress updates every 10s (default) when there is no new data. Before PR: ![20200428175008](https://user-images.githubusercontent.com/7402327/80474832-88a8ca00-897a-11ea-820b-d4be6127d2fe.jpg) ![20200428175037](https://user-images.githubusercontent.com/7402327/80474844-8ba3ba80-897a-11ea-873c-b7137bd4a804.jpg) ![20200428175102](https://user-images.githubusercontent.com/7402327/80474848-8e061480-897a-11ea-806e-28c6bbf1fe03.jpg) After PR: ![image](https://user-images.githubusercontent.com/7402327/80475099-f35a0580-897a-11ea-8fb3-53f343df2c3f.png) ### Why are the changes needed? Fixes a bug around incorrect progress report ### Does this PR introduce any user-facing change? Fixes a bug around incorrect progress report ### How was this patch tested? existing ut and manual test Closes #28391 from uncleGen/SPARK-31593. Authored-by: uncleGen <husty...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 1e40bccf447dccad9d31bccc75d21b8fca77ba52) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../sql/execution/streaming/ProgressReporter.scala | 2 +- .../streaming/StreamingDeduplicationSuite.scala | 7 ++- .../streaming/StreamingQueryListenerSuite.scala | 56 ++++++++++++++++++++-- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2..ea1f2ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -201,7 +201,7 @@ trait ProgressReporter extends Logging { if (hasExecuted) { // Reset noDataEventTimestamp if we processed any data - lastNoExecutionProgressEventTime = Long.MinValue + lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() updateProgress(newProgress) } else { val now = triggerClock.getTimeMillis() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778a..51ddc7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { if (flag) assertNumStateRows(total = 1, updated = 1) else assertNumStateRows(total = 7, updated = 1) }, - AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) + AssertOnQuery { q => + eventually(timeout(streamingTimeout)) { + q.lastProgress.sink.numOutputRows == 0L + true + } + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index e585b8a..6e08b88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { @@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } try { + val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key spark.streams.addListener(listener) testStream(df, OutputMode.Append)( - StartStream(Trigger.ProcessingTime(100), triggerClock = clock), + StartStream( + Trigger.ProcessingTime(100), + triggerClock = clock, + Map(noDataProgressIntervalKey -> "100")), // Batch 1 AddData(inputData, 1, 2), AdvanceManualClock(100), @@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons( + test("SPARK-31593: remove unnecessary streaming query progress update") { + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") { + @volatile var numProgressEvent = 0 + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = {} + override def onQueryProgress(event: QueryProgressEvent): Unit = { + numProgressEvent += 1 + } + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + spark.streams.addListener(listener) + + def checkProgressEvent(count: Int): StreamAction = { + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == count) + } + true + } + } + + try { + val input = new MemoryStream[Int](0, sqlContext) + val clock = new StreamManualClock() + val result = input.toDF().select("value") + testStream(result)( + StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), + AddData(input, 10), + checkProgressEvent(1), + AdvanceManualClock(10), + checkProgressEvent(2), + AdvanceManualClock(90), + checkProgressEvent(2), + AdvanceManualClock(10), + checkProgressEvent(3) + ) + } finally { + spark.streams.removeListener(listener) + } + } + } + + private def testReplayListenerBusWithBrokenEventJsons( fileName: String, expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org