Repository: spark
Updated Branches:
  refs/heads/master 7696b9de0 -> 807ba44cb


[SPARK-24159][SS] Enable no-data micro batches for streaming mapGroupswithState

## What changes were proposed in this pull request?

Enabled no-data batches in flatMapGroupsWithState in following two cases.
- When ProcessingTime timeout is used, then we always run a batch every trigger 
interval.
- When event-time watermark is defined, then the user may be doing arbitrary 
logic against the watermark value even if timeouts are not set. In such cases, 
it's best to run batches whenever the watermark has changed, irrespective of 
whether timeouts (i.e. event-time timeout) have been explicitly enabled.

## How was this patch tested?
updated tests

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21345 from tdas/SPARK-24159.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/807ba44c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/807ba44c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/807ba44c

Branch: refs/heads/master
Commit: 807ba44cb742c5f7c22bdf6bfe2cf814be85398e
Parents: 7696b9d
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri May 18 10:35:43 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri May 18 10:35:43 2018 -0700

----------------------------------------------------------------------
 .../streaming/FlatMapGroupsWithStateExec.scala  |  17 ++-
 .../streaming/FlatMapGroupsWithStateSuite.scala | 120 ++++++++++---------
 2 files changed, 80 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/807ba44c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 80769d7..8e82ccc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -97,6 +97,18 @@ case class FlatMapGroupsWithStateExec(
 
   override def keyExpressions: Seq[Attribute] = groupingAttributes
 
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
+    timeoutConf match {
+      case ProcessingTimeTimeout =>
+        true  // Always run batches to process timeouts
+      case EventTimeTimeout =>
+        // Process another non-data batch only if the watermark has changed in 
this executed plan
+        eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > 
eventTimeWatermark.get
+      case _ =>
+        false
+    }
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
@@ -126,7 +138,6 @@ case class FlatMapGroupsWithStateExec(
           case _ =>
             iter
         }
-
         // Generate a iterator that returns the rows grouped by the grouping 
function
         // Note that this code ensures that the filtering for timeout occurs 
only after
         // all the data has been processed. This is to ensure that the timeout 
information of all
@@ -194,11 +205,11 @@ case class FlatMapGroupsWithStateExec(
             throw new IllegalStateException(
               s"Cannot filter timed out keys for $timeoutConf")
         }
-        val timingOutKeys = store.getRange(None, None).filter { rowPair =>
+        val timingOutPairs = store.getRange(None, None).filter { rowPair =>
           val timeoutTimestamp = getTimeoutTimestamp(rowPair.value)
           timeoutTimestamp != NO_TIMESTAMP && timeoutTimestamp < 
timeoutThreshold
         }
-        timingOutKeys.flatMap { rowPair =>
+        timingOutPairs.flatMap { rowPair =>
           callFunctionAndUpdateState(rowPair.key, Iterator.empty, 
rowPair.value, hasTimedOut = true)
         }
       } else Iterator.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/807ba44c/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index b1416bf..988c8e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -615,20 +615,20 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
 
     testStream(result, Update)(
       AddData(inputData, "a"),
-      CheckLastBatch(("a", "1")),
+      CheckNewAnswer(("a", "1")),
       assertNumStateRows(total = 1, updated = 1),
       AddData(inputData, "a", "b"),
-      CheckLastBatch(("a", "2"), ("b", "1")),
+      CheckNewAnswer(("a", "2"), ("b", "1")),
       assertNumStateRows(total = 2, updated = 2),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "b"), // should remove state for "a" and not 
return anything for a
-      CheckLastBatch(("b", "2")),
+      CheckNewAnswer(("b", "2")),
       assertNumStateRows(total = 1, updated = 2),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "c"), // should recreate state for "a" and 
return count as 1 and
-      CheckLastBatch(("a", "1"), ("c", "1")),
+      CheckNewAnswer(("a", "1"), ("c", "1")),
       assertNumStateRows(total = 3, updated = 2)
     )
   }
@@ -657,15 +657,15 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
         .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc)
     testStream(result, Update)(
       AddData(inputData, "a", "a", "b"),
-      CheckLastBatch(("a", "1"), ("a", "2"), ("b", "1")),
+      CheckNewAnswer(("a", "1"), ("a", "2"), ("b", "1")),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "b"), // should remove state for "a" and not 
return anything for a
-      CheckLastBatch(("b", "2")),
+      CheckNewAnswer(("b", "2")),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "c"), // should recreate state for "a" and 
return count as 1 and
-      CheckLastBatch(("a", "1"), ("c", "1"))
+      CheckNewAnswer(("a", "1"), ("c", "1"))
     )
   }
 
@@ -694,22 +694,22 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
 
     testStream(result, Complete)(
       AddData(inputData, "a"),
-      CheckLastBatch(("a", 1)),
+      CheckNewAnswer(("a", 1)),
       AddData(inputData, "a", "b"),
       // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a 
and b by 1
-      CheckLastBatch(("a", 2), ("b", 1)),
+      CheckNewAnswer(("a", 2), ("b", 1)),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "b"),
       // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", 
"2") ;
       // so increment a and b by 1
-      CheckLastBatch(("a", 3), ("b", 2)),
+      CheckNewAnswer(("a", 3), ("b", 2)),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "c"),
       // mapGroups should recreate state for "a" and generate ("a", "1"), 
("c", "1") ;
       // so increment a and c by 1
-      CheckLastBatch(("a", 4), ("b", 2), ("c", 1))
+      CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
     )
   }
 
@@ -729,8 +729,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
   }
 
   test("flatMapGroupsWithState - streaming with processing time timeout") {
-    // Function to maintain running count up to 2, and then remove the count
-    // Returns the data and the count (-1 if count reached beyond 2 and state 
was just removed)
+    // Function to maintain the count as state and set the proc. time timeout 
delay of 10 seconds.
+    // It returns the count if changed, or -1 if the state was removed by 
timeout.
     val stateFunc = (key: String, values: Iterator[String], state: 
GroupState[RunningCount]) => {
       assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 }
       assertCannotGetWatermark { state.getCurrentWatermarkMs() }
@@ -757,17 +757,17 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
       StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
       AddData(inputData, "a"),
       AdvanceManualClock(1 * 1000),
-      CheckLastBatch(("a", "1")),
+      CheckNewAnswer(("a", "1")),
       assertNumStateRows(total = 1, updated = 1),
 
       AddData(inputData, "b"),
       AdvanceManualClock(1 * 1000),
-      CheckLastBatch(("b", "1")),
+      CheckNewAnswer(("b", "1")),
       assertNumStateRows(total = 2, updated = 1),
 
       AddData(inputData, "b"),
       AdvanceManualClock(10 * 1000),
-      CheckLastBatch(("a", "-1"), ("b", "2")),
+      CheckNewAnswer(("a", "-1"), ("b", "2")),
       assertNumStateRows(total = 1, updated = 2),
 
       StopStream,
@@ -775,38 +775,42 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
 
       AddData(inputData, "c"),
       AdvanceManualClock(11 * 1000),
-      CheckLastBatch(("b", "-1"), ("c", "1")),
+      CheckNewAnswer(("b", "-1"), ("c", "1")),
       assertNumStateRows(total = 1, updated = 2),
 
-      AddData(inputData, "c"),
-      AdvanceManualClock(20 * 1000),
-      CheckLastBatch(("c", "2")),
-      assertNumStateRows(total = 1, updated = 1)
+      AdvanceManualClock(12 * 1000),
+      AssertOnQuery { _ => clock.getTimeMillis() == 35000 },
+      Execute { q =>
+        failAfter(streamingTimeout) {
+          while (q.lastProgress.timestamp != "1970-01-01T00:00:35.000Z") {
+            Thread.sleep(1)
+          }
+        }
+      },
+      CheckNewAnswer(("c", "-1")),
+      assertNumStateRows(total = 0, updated = 0)
     )
   }
 
   test("flatMapGroupsWithState - streaming with event time timeout + 
watermark") {
-    // Function to maintain the max event time
-    // Returns the max event time in the state, or -1 if the state was removed 
by timeout
+    // Function to maintain the max event time as state and set the timeout 
timestamp based on the
+    // current max event time seen. It returns the max event time in the 
state, or -1 if the state
+    // was removed by timeout.
     val stateFunc = (key: String, values: Iterator[(String, Long)], state: 
GroupState[Long]) => {
       assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 }
       assertCanGetWatermark { state.getCurrentWatermarkMs() >= -1 }
 
-      val timeoutDelay = 5
-      if (key != "a") {
-        Iterator.empty
+      val timeoutDelaySec = 5
+      if (state.hasTimedOut) {
+        state.remove()
+        Iterator((key, -1))
       } else {
-        if (state.hasTimedOut) {
-          state.remove()
-          Iterator((key, -1))
-        } else {
-          val valuesSeq = values.toSeq
-          val maxEventTime = math.max(valuesSeq.map(_._2).max, 
state.getOption.getOrElse(0L))
-          val timeoutTimestampMs = maxEventTime + timeoutDelay
-          state.update(maxEventTime)
-          state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
-          Iterator((key, maxEventTime.toInt))
-        }
+        val valuesSeq = values.toSeq
+        val maxEventTimeSec = math.max(valuesSeq.map(_._2).max, 
state.getOption.getOrElse(0L))
+        val timeoutTimestampSec = maxEventTimeSec + timeoutDelaySec
+        state.update(maxEventTimeSec)
+        state.setTimeoutTimestamp(timeoutTimestampSec * 1000)
+        Iterator((key, maxEventTimeSec.toInt))
       }
     }
     val inputData = MemoryStream[(String, Int)]
@@ -819,15 +823,23 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
         .flatMapGroupsWithState(Update, EventTimeTimeout)(stateFunc)
 
     testStream(result, Update)(
-      StartStream(Trigger.ProcessingTime("1 second")),
-      AddData(inputData, ("a", 11), ("a", 13), ("a", 15)), // Set timeout 
timestamp of ...
-      CheckLastBatch(("a", 15)),                           // "a" to 15 + 5 = 
20s, watermark to 5s
+      StartStream(),
+
+      AddData(inputData, ("a", 11), ("a", 13), ("a", 15)),
+      // Max event time = 15. Timeout timestamp for "a" = 15 + 5 = 20. 
Watermark = 15 - 10 = 5.
+      CheckNewAnswer(("a", 15)),  // Output = max event time of a
+
       AddData(inputData, ("a", 4)),       // Add data older than watermark for 
"a"
-      CheckLastBatch(),                   // No output as data should get 
filtered by watermark
-      AddData(inputData, ("dummy", 35)),  // Set watermark = 35 - 10 = 25s
-      CheckLastBatch(),                   // No output as no data for "a"
-      AddData(inputData, ("a", 24)),      // Add data older than watermark, 
should be ignored
-      CheckLastBatch(("a", -1))           // State for "a" should timeout and 
emit -1
+      CheckNewAnswer(),                   // No output as data should get 
filtered by watermark
+
+      AddData(inputData, ("a", 10)),      // Add data newer than watermark for 
"a"
+      CheckNewAnswer(("a", 15)),          // Max event time is still the same
+      // Timeout timestamp for "a" is still 20 as max event time for "a" is 
still 15.
+      // Watermark is still 5 as max event time for all data is still 15.
+
+      AddData(inputData, ("b", 31)),      // Add data newer than watermark for 
"b", not "a"
+      // Watermark = 31 - 10 = 21, so "a" should be timed out as timeout 
timestamp for "a" is 20.
+      CheckNewAnswer(("a", -1), ("b", 31))           // State for "a" should 
timeout and emit -1
     )
   }
 
@@ -856,20 +868,20 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
 
     testStream(result, Update)(
       AddData(inputData, "a"),
-      CheckLastBatch(("a", "1")),
+      CheckNewAnswer(("a", "1")),
       assertNumStateRows(total = 1, updated = 1),
       AddData(inputData, "a", "b"),
-      CheckLastBatch(("a", "2"), ("b", "1")),
+      CheckNewAnswer(("a", "2"), ("b", "1")),
       assertNumStateRows(total = 2, updated = 2),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "b"), // should remove state for "a" and return 
count as -1
-      CheckLastBatch(("a", "-1"), ("b", "2")),
+      CheckNewAnswer(("a", "-1"), ("b", "2")),
       assertNumStateRows(total = 1, updated = 2),
       StopStream,
       StartStream(),
       AddData(inputData, "a", "c"), // should recreate state for "a" and 
return count as 1
-      CheckLastBatch(("a", "1"), ("c", "1")),
+      CheckNewAnswer(("a", "1"), ("c", "1")),
       assertNumStateRows(total = 3, updated = 2)
     )
   }
@@ -920,15 +932,15 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
     testStream(result, Update)(
       setFailInTask(false),
       AddData(inputData, "a"),
-      CheckLastBatch(("a", 1L)),
+      CheckNewAnswer(("a", 1L)),
       AddData(inputData, "a"),
-      CheckLastBatch(("a", 2L)),
+      CheckNewAnswer(("a", 2L)),
       setFailInTask(true),
       AddData(inputData, "a"),
       ExpectFailure[SparkException](),   // task should fail but should not 
increment count
       setFailInTask(false),
       StartStream(),
-      CheckLastBatch(("a", 3L))     // task should not fail, and should show 
correct count
+      CheckNewAnswer(("a", 3L))     // task should not fail, and should show 
correct count
     )
   }
 
@@ -938,7 +950,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
     val result = inputData.toDS.groupByKey(x => 
x).mapGroupsWithState(stateFunc)
     testStream(result, Update)(
       AddData(inputData, "a"),
-      CheckLastBatch("a"),
+      CheckNewAnswer("a"),
       AssertOnQuery(_.lastExecution.executedPlan.outputPartitioning === 
UnknownPartitioning(0))
     )
   }
@@ -1000,7 +1012,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
         StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
         AddData(inputData, ("a", 1L)),
         AdvanceManualClock(1 * 1000),
-        CheckLastBatch(("a", "1"))
+        CheckNewAnswer(("a", "1"))
       )
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to