Repository: spark
Updated Branches:
  refs/heads/branch-2.2 82ae1f0ac -> a79a120a8


[SPARK-20717][SS] Minor tweaks to the MapGroupsWithState behavior

## What changes were proposed in this pull request?

Timeout and state data are two independent entities and should be settable 
independently. Therefore, in the same call of the user-defined function, one 
should be able to set the timeout before initializing the state and also after 
removing the state. Whether timeouts can be set or not, should not depend on 
the current state, and vice versa.

However, a limitation of the current implementation is that state cannot be 
null while timeout is set. This is checked lazily after the function call has 
completed.

## How was this patch tested?
- Updated existing unit tests that test the behavior of 
GroupState.setTimeout*** wrt to the current state
- Added new tests that verify the disallowed cases where state is undefined but 
timeout is set.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17957 from tdas/SPARK-20717.

(cherry picked from commit 499ba2cb47efd6a860e74e6995412408efc5238d)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a79a120a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a79a120a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a79a120a

Branch: refs/heads/branch-2.2
Commit: a79a120a8fc595045b32f16663286b32dadc53ed
Parents: 82ae1f0
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon May 15 10:48:10 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon May 15 10:48:17 2017 -0700

----------------------------------------------------------------------
 .../streaming/FlatMapGroupsWithStateExec.scala  |  15 +-
 .../execution/streaming/GroupStateImpl.scala    |  16 +-
 .../apache/spark/sql/streaming/GroupState.scala |   2 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala | 161 ++++++++++++++-----
 4 files changed, 139 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a79a120a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 5e79232..bd8d5d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -230,6 +230,20 @@ case class FlatMapGroupsWithStateExec(
 
       // When the iterator is consumed, then write changes to state
       def onIteratorCompletion: Unit = {
+
+        val currentTimeoutTimestamp = keyedState.getTimeoutTimestamp
+        // If the state has not yet been set but timeout has been set, then
+        // we have to generate a row to save the timeout. However, attempting 
serialize
+        // null using case class encoder throws -
+        //    java.lang.NullPointerException: Null value appeared in 
non-nullable field:
+        //    If the schema is inferred from a Scala tuple / case class, or a 
Java bean, please
+        //    try to use scala.Option[_] or other nullable types.
+        if (!keyedState.exists && currentTimeoutTimestamp != NO_TIMESTAMP) {
+          throw new IllegalStateException(
+            "Cannot set timeout when state is not defined, that is, state has 
not been" +
+              "initialized or has been removed")
+        }
+
         if (keyedState.hasRemoved) {
           store.remove(keyRow)
           numUpdatedStateRows += 1
@@ -239,7 +253,6 @@ case class FlatMapGroupsWithStateExec(
             case Some(row) => getTimeoutTimestamp(row)
             case None => NO_TIMESTAMP
           }
-          val currentTimeoutTimestamp = keyedState.getTimeoutTimestamp
           val stateRowToWrite = if (keyedState.hasUpdated) {
             getStateRow(keyedState.get)
           } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/a79a120a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index 148d922..d4606fd5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -91,7 +91,6 @@ private[sql] class GroupStateImpl[S](
     defined = false
     updated = false
     removed = true
-    timeoutTimestamp = NO_TIMESTAMP
   }
 
   override def setTimeoutDuration(durationMs: Long): Unit = {
@@ -100,16 +99,10 @@ private[sql] class GroupStateImpl[S](
         "Cannot set timeout duration without enabling processing time timeout 
in " +
           "map/flatMapGroupsWithState")
     }
-    if (!defined) {
-      throw new IllegalStateException(
-        "Cannot set timeout information without any state value, " +
-          "state has either not been initialized, or has already been removed")
-    }
-
     if (durationMs <= 0) {
       throw new IllegalArgumentException("Timeout duration must be positive")
     }
-    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+    if (batchProcessingTimeMs != NO_TIMESTAMP) {
       timeoutTimestamp = durationMs + batchProcessingTimeMs
     } else {
       // This is being called in a batch query, hence no processing timestamp.
@@ -135,7 +128,7 @@ private[sql] class GroupStateImpl[S](
         s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
           s"current watermark ($eventTimeWatermarkMs)")
     }
-    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+    if (batchProcessingTimeMs != NO_TIMESTAMP) {
       timeoutTimestamp = timestampMs
     } else {
       // This is being called in a batch query, hence no processing timestamp.
@@ -213,11 +206,6 @@ private[sql] class GroupStateImpl[S](
         "Cannot set timeout timestamp without enabling event time timeout in " 
+
           "map/flatMapGroupsWithState")
     }
-    if (!defined) {
-      throw new IllegalStateException(
-        "Cannot set timeout timestamp without any state value, " +
-          "state has either not been initialized, or has already been removed")
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a79a120a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
index c659ac7..04a956b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -212,7 +212,7 @@ trait GroupState[S] extends LogicalGroupState[S] {
   @throws[IllegalArgumentException]("when updating with null")
   def update(newState: S): Unit
 
-  /** Remove this state. Note that this resets any timeout configuration as 
well. */
+  /** Remove this state. */
   def remove(): Unit
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a79a120a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 89cfba6..10e9174 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -112,11 +112,12 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 
     state = new GroupStateImpl[Int](None, 1000, 1000, ProcessingTimeTimeout, 
hasTimedOut = false)
     assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
-    testTimeoutDurationNotAllowed[IllegalStateException](state)
+    state.setTimeoutDuration(500)
+    assert(state.getTimeoutTimestamp === 1500)    // can be set without 
initializing state
     testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
 
     state.update(5)
-    assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
+    assert(state.getTimeoutTimestamp === 1500)    // does not change
     state.setTimeoutDuration(1000)
     assert(state.getTimeoutTimestamp === 2000)
     state.setTimeoutDuration("2 second")
@@ -124,8 +125,9 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
 
     state.remove()
-    assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
-    testTimeoutDurationNotAllowed[IllegalStateException](state)
+    assert(state.getTimeoutTimestamp === 3000)    // does not change
+    state.setTimeoutDuration(500)                 // can still be set
+    assert(state.getTimeoutTimestamp === 1500)
     testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
   }
 
@@ -134,9 +136,11 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
       None, 1000, 1000, EventTimeTimeout, hasTimedOut = false)
     assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
     testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
-    testTimeoutTimestampNotAllowed[IllegalStateException](state)
+    state.setTimeoutTimestamp(5000)
+    assert(state.getTimeoutTimestamp === 5000)    // can be set without 
initializing state
 
     state.update(5)
+    assert(state.getTimeoutTimestamp === 5000)    // does not change
     state.setTimeoutTimestamp(10000)
     assert(state.getTimeoutTimestamp === 10000)
     state.setTimeoutTimestamp(new Date(20000))
@@ -144,9 +148,10 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
 
     state.remove()
-    assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
+    assert(state.getTimeoutTimestamp === 20000)
+    state.setTimeoutTimestamp(5000)
+    assert(state.getTimeoutTimestamp === 5000)    // can be set after removing 
state
     testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
-    testTimeoutTimestampNotAllowed[IllegalStateException](state)
   }
 
   test("GroupState - illegal params to setTimeout****") {
@@ -154,26 +159,54 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 
     // Test setTimeout****() with illegal values
     def testIllegalTimeout(body: => Unit): Unit = {
-      intercept[IllegalArgumentException] { body }
+      intercept[IllegalArgumentException] {
+        body
+      }
       assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
     }
 
     state = new GroupStateImpl(Some(5), 1000, 1000, ProcessingTimeTimeout, 
hasTimedOut = false)
-    testIllegalTimeout { state.setTimeoutDuration(-1000) }
-    testIllegalTimeout { state.setTimeoutDuration(0) }
-    testIllegalTimeout { state.setTimeoutDuration("-2 second") }
-    testIllegalTimeout { state.setTimeoutDuration("-1 month") }
-    testIllegalTimeout { state.setTimeoutDuration("1 month -1 day") }
+    testIllegalTimeout {
+      state.setTimeoutDuration(-1000)
+    }
+    testIllegalTimeout {
+      state.setTimeoutDuration(0)
+    }
+    testIllegalTimeout {
+      state.setTimeoutDuration("-2 second")
+    }
+    testIllegalTimeout {
+      state.setTimeoutDuration("-1 month")
+    }
+    testIllegalTimeout {
+      state.setTimeoutDuration("1 month -1 day")
+    }
 
     state = new GroupStateImpl(Some(5), 1000, 1000, EventTimeTimeout, 
hasTimedOut = false)
-    testIllegalTimeout { state.setTimeoutTimestamp(-10000) }
-    testIllegalTimeout { state.setTimeoutTimestamp(10000, "-3 second") }
-    testIllegalTimeout { state.setTimeoutTimestamp(10000, "-1 month") }
-    testIllegalTimeout { state.setTimeoutTimestamp(10000, "1 month -1 day") }
-    testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000)) }
-    testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000), "-3 
second") }
-    testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000), "-1 
month") }
-    testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000), "1 month 
-1 day") }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(-10000)
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(10000, "-3 second")
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(10000, "-1 month")
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(10000, "1 month -1 day")
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(new Date(-10000))
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(new Date(-10000), "-3 second")
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(new Date(-10000), "-1 month")
+    }
+    testIllegalTimeout {
+      state.setTimeoutTimestamp(new Date(-10000), "1 month -1 day")
+    }
   }
 
   test("GroupState - hasTimedOut") {
@@ -318,6 +351,44 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     }
   }
 
+  // Currently disallowed cases for 
StateStoreUpdater.updateStateForKeysWithData(),
+  // Try to remove these cases in the future
+  for (priorTimeoutTimestamp <- Seq(NO_TIMESTAMP, 1000)) {
+    val testName =
+      if (priorTimeoutTimestamp != NO_TIMESTAMP) "prior timeout set" else "no 
prior timeout"
+    testStateUpdateWithData(
+      s"ProcessingTimeTimeout - $testName - setting timeout without init state 
not allowed",
+      stateUpdates = state => { state.setTimeoutDuration(5000) },
+      timeoutConf = ProcessingTimeTimeout,
+      priorState = None,
+      priorTimeoutTimestamp = priorTimeoutTimestamp,
+      expectedException = classOf[IllegalStateException])
+
+    testStateUpdateWithData(
+      s"ProcessingTimeTimeout - $testName - setting timeout with state removal 
not allowed",
+      stateUpdates = state => { state.remove(); state.setTimeoutDuration(5000) 
},
+      timeoutConf = ProcessingTimeTimeout,
+      priorState = Some(5),
+      priorTimeoutTimestamp = priorTimeoutTimestamp,
+      expectedException = classOf[IllegalStateException])
+
+    testStateUpdateWithData(
+      s"EventTimeTimeout - $testName - setting timeout without init state not 
allowed",
+      stateUpdates = state => { state.setTimeoutTimestamp(10000) },
+      timeoutConf = EventTimeTimeout,
+      priorState = None,
+      priorTimeoutTimestamp = priorTimeoutTimestamp,
+      expectedException = classOf[IllegalStateException])
+
+    testStateUpdateWithData(
+      s"EventTimeTimeout - $testName - setting timeout with state removal not 
allowed",
+      stateUpdates = state => { state.remove(); 
state.setTimeoutTimestamp(10000) },
+      timeoutConf = EventTimeTimeout,
+      priorState = Some(5),
+      priorTimeoutTimestamp = priorTimeoutTimestamp,
+      expectedException = classOf[IllegalStateException])
+  }
+
   // Tests for StateStoreUpdater.updateStateForTimedOutKeys()
   val preTimeoutState = Some(5)
   for (timeoutConf <- Seq(ProcessingTimeTimeout, EventTimeTimeout)) {
@@ -806,7 +877,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
       priorState: Option[Int],
       priorTimeoutTimestamp: Long = NO_TIMESTAMP,
       expectedState: Option[Int] = None,
-      expectedTimeoutTimestamp: Long = NO_TIMESTAMP): Unit = {
+      expectedTimeoutTimestamp: Long = NO_TIMESTAMP,
+      expectedException: Class[_ <: Exception] = null): Unit = {
 
     if (priorState.isEmpty && priorTimeoutTimestamp != NO_TIMESTAMP) {
       return // there can be no prior timestamp, when there is no prior state
@@ -820,7 +892,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
       }
       testStateUpdate(
         testTimeoutUpdates = false, mapGroupsFunc, timeoutConf,
-        priorState, priorTimeoutTimestamp, expectedState, 
expectedTimeoutTimestamp)
+        priorState, priorTimeoutTimestamp,
+        expectedState, expectedTimeoutTimestamp, expectedException)
     }
   }
 
@@ -839,9 +912,10 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
         stateUpdates(state)
         Iterator.empty
       }
+
       testStateUpdate(
         testTimeoutUpdates = true, mapGroupsFunc, timeoutConf = timeoutConf,
-        preTimeoutState, priorTimeoutTimestamp, expectedState, 
expectedTimeoutTimestamp)
+        preTimeoutState, priorTimeoutTimestamp, expectedState, 
expectedTimeoutTimestamp, null)
     }
   }
 
@@ -852,7 +926,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
       priorState: Option[Int],
       priorTimeoutTimestamp: Long,
       expectedState: Option[Int],
-      expectedTimeoutTimestamp: Long): Unit = {
+      expectedTimeoutTimestamp: Long,
+      expectedException: Class[_ <: Exception]): Unit = {
 
     val store = newStateStore()
     val mapGroupsSparkPlan = newFlatMapGroupsWithStateExec(
@@ -867,22 +942,30 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     }
 
     // Call updating function to update state store
-    val returnedIter = if (testTimeoutUpdates) {
-      updater.updateStateForTimedOutKeys()
-    } else {
-      updater.updateStateForKeysWithData(Iterator(key))
+    def callFunction() = {
+      val returnedIter = if (testTimeoutUpdates) {
+        updater.updateStateForTimedOutKeys()
+      } else {
+        updater.updateStateForKeysWithData(Iterator(key))
+      }
+      returnedIter.size // consume the iterator to force state updates
     }
-    returnedIter.size // consumer the iterator to force state updates
-
-    // Verify updated state in store
-    val updatedStateRow = store.get(key)
-    assert(
-      updater.getStateObj(updatedStateRow).map(_.toString.toInt) === 
expectedState,
-      "final state not as expected")
-    if (updatedStateRow.nonEmpty) {
+    if (expectedException != null) {
+      // Call function and verify the exception type
+      val e = intercept[Exception] { callFunction() }
+      assert(e.getClass === expectedException, "Exception thrown but of the 
wrong type")
+    } else {
+      // Call function to update and verify updated state in store
+      callFunction()
+      val updatedStateRow = store.get(key)
       assert(
-        updater.getTimeoutTimestamp(updatedStateRow.get) === 
expectedTimeoutTimestamp,
-        "final timeout timestamp not as expected")
+        updater.getStateObj(updatedStateRow).map(_.toString.toInt) === 
expectedState,
+        "final state not as expected")
+      if (updatedStateRow.nonEmpty) {
+        assert(
+          updater.getTimeoutTimestamp(updatedStateRow.get) === 
expectedTimeoutTimestamp,
+          "final timeout timestamp not as expected")
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to