This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 30637a8  [SPARK-31593][SS] Remove unnecessary streaming query progress 
update
30637a8 is described below

commit 30637a81fb50492e3c54759bb3fd7aac1cd0e326
Author: uncleGen <husty...@gmail.com>
AuthorDate: Sun Jun 14 14:49:01 2020 +0900

    [SPARK-31593][SS] Remove unnecessary streaming query progress update
    
    ### What changes were proposed in this pull request?
    
    Structured Streaming progress reporter will always report an `empty` 
progress when there is no new data. As design, we should provide progress 
updates every 10s (default) when there is no new data.
    
    Before PR:
    
    
![20200428175008](https://user-images.githubusercontent.com/7402327/80474832-88a8ca00-897a-11ea-820b-d4be6127d2fe.jpg)
    
![20200428175037](https://user-images.githubusercontent.com/7402327/80474844-8ba3ba80-897a-11ea-873c-b7137bd4a804.jpg)
    
![20200428175102](https://user-images.githubusercontent.com/7402327/80474848-8e061480-897a-11ea-806e-28c6bbf1fe03.jpg)
    
    After PR:
    
    
![image](https://user-images.githubusercontent.com/7402327/80475099-f35a0580-897a-11ea-8fb3-53f343df2c3f.png)
    
    ### Why are the changes needed?
    
    Fixes a bug around incorrect progress report
    
    ### Does this PR introduce any user-facing change?
    
    Fixes a bug around incorrect progress report
    
    ### How was this patch tested?
    
    existing ut and manual test
    
    Closes #28391 from uncleGen/SPARK-31593.
    
    Authored-by: uncleGen <husty...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 1e40bccf447dccad9d31bccc75d21b8fca77ba52)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../sql/execution/streaming/ProgressReporter.scala |  2 +-
 .../streaming/StreamingDeduplicationSuite.scala    |  7 ++-
 .../streaming/StreamingQueryListenerSuite.scala    | 56 ++++++++++++++++++++--
 3 files changed, 58 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 0dff1c2..ea1f2ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -201,7 +201,7 @@ trait ProgressReporter extends Logging {
 
     if (hasExecuted) {
       // Reset noDataEventTimestamp if we processed any data
-      lastNoExecutionProgressEventTime = Long.MinValue
+      lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
       updateProgress(newProgress)
     } else {
       val now = triggerClock.getTimeMillis()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index f63778a..51ddc7b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
           if (flag) assertNumStateRows(total = 1, updated = 1)
           else assertNumStateRows(total = 7, updated = 1)
         },
-        AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
+        AssertOnQuery { q =>
+          eventually(timeout(streamingTimeout)) {
+            q.lastProgress.sink.numOutputRows == 0L
+            true
+          }
+        }
       )
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e585b8a..6e08b88 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     // Structured Streaming in Spark 2.0.0. Because we renamed the classes,
     // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
     // to verify that we can skip broken jsons generated by Structured 
Streaming.
-    
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 
1)
+    
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 
1)
   }
 
   testQuietly("ReplayListenerBus should ignore broken event jsons generated in 
2_0_1") {
@@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     // Structured Streaming in Spark 2.0.1. Because we renamed the classes,
     // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
     // to verify that we can skip broken jsons generated by Structured 
Streaming.
-    
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 
1)
+    
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 
1)
   }
 
   testQuietly("ReplayListenerBus should ignore broken event jsons generated in 
2_0_2") {
     // query-event-logs-version-2.0.2.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured 
Streaming query events
     // in 2.1.0. This test is to verify we are able to load events generated 
by Spark 2.0.2.
-    
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 
5)
+    
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 
5)
   }
 
   test("listener propagates observable metrics") {
@@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     }
 
     try {
+      val noDataProgressIntervalKey = 
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
       spark.streams.addListener(listener)
       testStream(df, OutputMode.Append)(
-        StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
+        StartStream(
+          Trigger.ProcessingTime(100),
+          triggerClock = clock,
+          Map(noDataProgressIntervalKey -> "100")),
         // Batch 1
         AddData(inputData, 1, 2),
         AdvanceManualClock(100),
@@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
-  private def testReplayListenerBusWithBorkenEventJsons(
+  test("SPARK-31593: remove unnecessary streaming query progress update") {
+    withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> 
"100") {
+      @volatile var numProgressEvent = 0
+      val listener = new StreamingQueryListener {
+        override def onQueryStarted(event: QueryStartedEvent): Unit = {}
+        override def onQueryProgress(event: QueryProgressEvent): Unit = {
+          numProgressEvent += 1
+        }
+        override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+      }
+      spark.streams.addListener(listener)
+
+      def checkProgressEvent(count: Int): StreamAction = {
+        AssertOnQuery { _ =>
+          eventually(Timeout(streamingTimeout)) {
+            assert(numProgressEvent == count)
+          }
+          true
+        }
+      }
+
+      try {
+        val input = new MemoryStream[Int](0, sqlContext)
+        val clock = new StreamManualClock()
+        val result = input.toDF().select("value")
+        testStream(result)(
+          StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = 
clock),
+          AddData(input, 10),
+          checkProgressEvent(1),
+          AdvanceManualClock(10),
+          checkProgressEvent(2),
+          AdvanceManualClock(90),
+          checkProgressEvent(2),
+          AdvanceManualClock(10),
+          checkProgressEvent(3)
+        )
+      } finally {
+        spark.streams.removeListener(listener)
+      }
+    }
+  }
+
+  private def testReplayListenerBusWithBrokenEventJsons(
       fileName: String,
       expectedEventSize: Int): Unit = {
     val input = 
getClass.getResourceAsStream(s"/structured-streaming/$fileName")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to