HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+    timeoutMode match {
+      case "NoTimeouts" => TimeoutMode.NoTimeouts()
+      case "ProcessingTime" => TimeoutMode.ProcessingTime()
+      case "EventTime" => TimeoutMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+    }
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+        handle.getValueState[Long]("testState")
+      }
+    }
+  }
+
+  private def verifyInvalidOperation(
+      handle: StatefulProcessorHandleImpl,
+      handleState: StatefulProcessorHandleState.Value,
+      errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+    handle.setHandleState(handleState)
+    assert(handle.getHandleState === handleState)
+    val ex = intercept[Exception] {
+      fn(handle)
+    }
+    assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+    handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = {
+    handle.registerTimer(1000L)
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode " +
+      "and invalid state should fail") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.INITIALIZED,

Review Comment:
   Maybe, worth doing refactor with Seq & foreach as only phase is different.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {

Review Comment:
   nit: indent



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,

Review Comment:
   nit: indentation



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+    timeoutMode match {
+      case "NoTimeouts" => TimeoutMode.NoTimeouts()
+      case "ProcessingTime" => TimeoutMode.ProcessingTime()
+      case "EventTime" => TimeoutMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+    }
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+        handle.getValueState[Long]("testState")
+      }
+    }
+  }
+
+  private def verifyInvalidOperation(
+      handle: StatefulProcessorHandleImpl,
+      handleState: StatefulProcessorHandleState.Value,
+      errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+    handle.setHandleState(handleState)
+    assert(handle.getHandleState === handleState)
+    val ex = intercept[Exception] {

Review Comment:
   FYI, if you are using error class framework, it's recommended to verify the 
error with `checkError`.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
       Iterator((key, count.toString))
     }
   }
+}
+
+// Class to verify stateful processor usage with adding processing time timers
+class RunningCountStatefulProcessorWithProcTimeTimer extends 
RunningCountStatefulProcessor {
+  private def handleProcessingTimeBasedTimers(
+      key: String,
+      expiryTimestampMs: Long): Iterator[(String, String)] = {
+    _countState.clear()
+    Iterator((key, "-1"))
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[String],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
+
+    if (expiredTimerInfo.isValid()) {
+      handleProcessingTimeBasedTimers(key, 
expiredTimerInfo.getExpiryTimeInMs())
+    } else {
+      val currCount = _countState.getOption().getOrElse(0L)
+      if (currCount == 0 && (key == "a" || key == "c")) {
+        
_processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs()
+          + 5000)
+      }
+
+      val count = currCount + 1
+      if (count == 3) {
+        _countState.clear()
+        Iterator.empty
+      } else {
+        _countState.update(count)
+        Iterator((key, count.toString))
+      }
+    }
+  }
+}
 
-  override def close(): Unit = {}
+// Class to verify stateful processor usage with adding/deleting processing 
time timers
+class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer
+  extends RunningCountStatefulProcessor {
+  @transient private var _timerState: ValueState[Long] = _
+
+  override def init(
+      handle: StatefulProcessorHandle,
+      outputMode: OutputMode,
+      timeoutMode: TimeoutMode) : Unit = {
+    super.init(handle, outputMode, timeoutMode)
+    _timerState = _processorHandle.getValueState[Long]("timerState")
+  }
+
+  private def handleProcessingTimeBasedTimers(
+      key: String,
+      expiryTimestampMs: Long): Iterator[(String, String)] = {
+    _timerState.clear()
+    Iterator((key, "-1"))
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[String],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
+    if (expiredTimerInfo.isValid()) {
+      handleProcessingTimeBasedTimers(key, 
expiredTimerInfo.getExpiryTimeInMs())
+    } else {
+      val currCount = _countState.getOption().getOrElse(0L)
+      val count = currCount + inputRows.size
+      _countState.update(count)
+      if (key == "a") {
+        var nextTimerTs: Long = 0L
+        if (currCount == 0) {
+          nextTimerTs = timerValues.getCurrentProcessingTimeInMs() + 5000
+          _processorHandle.registerTimer(nextTimerTs)
+          _timerState.update(nextTimerTs)
+        } else if (currCount == 1) {
+          _processorHandle.deleteTimer(_timerState.get())

Review Comment:
   Just to double check, users can do listTimer to avoid having another value 
state, right? There might be performance indication but just functionality 
perspective.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -195,6 +327,115 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "should succeed") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val clock = new StreamManualClock
+
+      val inputData = MemoryStream[String]
+      val result = inputData.toDS()
+        .groupByKey(x => x)
+        .transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
+          TimeoutMode.ProcessingTime(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "a"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("a", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("b", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(10 * 1000),
+        CheckNewAnswer(("a", "-1"), ("b", "2")),
+
+        StopStream,
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "b"),
+        AddData(inputData, "c"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("c", "1")),
+        AddData(inputData, "d"),

Review Comment:
   nit: new line in above to separate batch



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+    timeoutMode match {
+      case "NoTimeouts" => TimeoutMode.NoTimeouts()
+      case "ProcessingTime" => TimeoutMode.ProcessingTime()
+      case "EventTime" => TimeoutMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+    }
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+        handle.getValueState[Long]("testState")
+      }
+    }
+  }
+
+  private def verifyInvalidOperation(
+      handle: StatefulProcessorHandleImpl,
+      handleState: StatefulProcessorHandleState.Value,
+      errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+    handle.setHandleState(handleState)
+    assert(handle.getHandleState === handleState)
+    val ex = intercept[Exception] {
+      fn(handle)
+    }
+    assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+    handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = {
+    handle.registerTimer(1000L)
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode " +
+      "and invalid state should fail") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.INITIALIZED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.DATA_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.TIMER_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+      }
+    }
+  }
+
+  test("registering processing/event time timeouts with NoTimeout mode should 
fail") {
+    tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store,
+        UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts())
+      val ex = intercept[Exception] {

Review Comment:
   same, checkError if applicable



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -199,6 +199,21 @@ class ValueStateSuite extends SharedSparkSession
     }
   }
 
+  test("Value state operations for unsupported type name should fail") {
+    tryWithProviderResource(newStoreProviderWithValueState(true)) { provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store,
+        UUID.randomUUID(), 
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
+        TimeoutMode.NoTimeouts())
+
+      val ex = intercept[Exception] {

Review Comment:
   same, checkError if applicable



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
       Iterator((key, count.toString))
     }
   }
+}
+
+// Class to verify stateful processor usage with adding processing time timers
+class RunningCountStatefulProcessorWithProcTimeTimer extends 
RunningCountStatefulProcessor {
+  private def handleProcessingTimeBasedTimers(
+      key: String,
+      expiryTimestampMs: Long): Iterator[(String, String)] = {
+    _countState.clear()
+    Iterator((key, "-1"))
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[String],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
+
+    if (expiredTimerInfo.isValid()) {
+      handleProcessingTimeBasedTimers(key, 
expiredTimerInfo.getExpiryTimeInMs())
+    } else {
+      val currCount = _countState.getOption().getOrElse(0L)
+      if (currCount == 0 && (key == "a" || key == "c")) {
+        
_processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs()
+          + 5000)
+      }
+
+      val count = currCount + 1
+      if (count == 3) {
+        _countState.clear()
+        Iterator.empty
+      } else {
+        _countState.update(count)
+        Iterator((key, count.toString))
+      }
+    }
+  }
+}
 
-  override def close(): Unit = {}
+// Class to verify stateful processor usage with adding/deleting processing 
time timers

Review Comment:
   nit: replacing seems to be clearer and straightforward than adding/deleting, 
same in class name. updating is also fine.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+    timeoutMode match {
+      case "NoTimeouts" => TimeoutMode.NoTimeouts()
+      case "ProcessingTime" => TimeoutMode.ProcessingTime()
+      case "EventTime" => TimeoutMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+    }
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+        handle.getValueState[Long]("testState")
+      }
+    }
+  }
+
+  private def verifyInvalidOperation(
+      handle: StatefulProcessorHandleImpl,
+      handleState: StatefulProcessorHandleState.Value,
+      errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+    handle.setHandleState(handleState)
+    assert(handle.getHandleState === handleState)
+    val ex = intercept[Exception] {
+      fn(handle)
+    }
+    assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+    handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = {
+    handle.registerTimer(1000L)
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode " +
+      "and invalid state should fail") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.INITIALIZED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.DATA_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.TIMER_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+      }
+    }
+  }
+
+  test("registering processing/event time timeouts with NoTimeout mode should 
fail") {
+    tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store,
+        UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts())
+      val ex = intercept[Exception] {
+        handle.registerTimer(10000L)
+      }
+      assert(ex.getMessage.contains("Cannot use timers"))
+
+      val ex2 = intercept[Exception] {
+        handle.deleteTimer(10000L)
+      }
+      assert(ex2.getMessage.contains("Cannot use timers"))
+    }
+  }
+
+  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"registering timeouts with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        handle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+        assert(handle.getHandleState === 
StatefulProcessorHandleState.INITIALIZED)
+
+        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
+
+        handle.registerTimer(10000L)
+        handle.deleteTimer(10000L)
+
+        ImplicitGroupingKeyTracker.removeImplicitKey()
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty)
+      }
+    }
+  }
+
+  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"verify listing of registered timers with timeoutMode=$timeoutMode") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED)
+        assert(handle.getHandleState === 
StatefulProcessorHandleState.DATA_PROCESSED)
+
+        ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
+
+        // Generate some random timer timestamps in arbitrary sorted order
+        val timerTimestamps1 = Seq(931L, 8000L, 452300L, 4200L, 90L,
+          1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
+        timerTimestamps1.foreach { timestamp =>
+          handle.registerTimer(timestamp)
+        }
+
+        val timers1 = handle.listTimers()
+        assert(timers1.toSeq.sorted === timerTimestamps1.sorted)
+        ImplicitGroupingKeyTracker.removeImplicitKey()
+
+        ImplicitGroupingKeyTracker.setImplicitKey("test_key2")
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
+
+        // Generate some random timer timestamps in arbitrary sorted order
+        val timerTimestamps2 = Seq(12000L, 14500L, 16000L)
+        timerTimestamps2.foreach { timestamp =>
+          handle.registerTimer(timestamp)
+        }
+
+        val timers2 = handle.listTimers()
+        assert(timers2.toSeq.sorted === timerTimestamps2.sorted)
+        ImplicitGroupingKeyTracker.removeImplicitKey()
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty)
+      }
+    }
+  }
+
+  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"verify that expired timers are returned in sorted order with 
timeoutMode=$timeoutMode") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED)
+        assert(handle.getHandleState === 
StatefulProcessorHandleState.DATA_PROCESSED)
+
+        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
+
+        // Generate some random timer timestamps in arbitrary sorted order
+        val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 
8L, 3L, 35L, 6L, 9L, 5L)
+        timerTimestamps.foreach { timestamp =>
+          handle.registerTimer(timestamp)
+        }
+
+        // Ensure that the expired timers are returned in sorted order
+        var expiredTimers = handle.getExpiredTimers(1000000L).map(_._2).toSeq
+        assert(expiredTimers === timerTimestamps.sorted)
+
+        expiredTimers = handle.getExpiredTimers(5L).map(_._2).toSeq
+        assert(expiredTimers === Seq(1L, 2L, 3L, 5L))
+
+        expiredTimers = handle.getExpiredTimers(10L).map(_._2).toSeq
+        assert(expiredTimers === Seq(1L, 2L, 3L, 5L, 6L, 8L, 9L))
+
+        ImplicitGroupingKeyTracker.removeImplicitKey()
+        assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty)
+      }
+    }
+  }
+
+  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"registering timeouts with timeoutMode=$timeoutMode and invalid 
state should fail") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+
+        verifyInvalidOperation(handle, StatefulProcessorHandleState.CREATED,

Review Comment:
   same, Seq and foreach



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+    Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+    timeoutMode match {
+      case "NoTimeouts" => TimeoutMode.NoTimeouts()
+      case "ProcessingTime" => TimeoutMode.ProcessingTime()
+      case "EventTime" => TimeoutMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+    }
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+        assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+        handle.getValueState[Long]("testState")
+      }
+    }
+  }
+
+  private def verifyInvalidOperation(
+      handle: StatefulProcessorHandleImpl,
+      handleState: StatefulProcessorHandleState.Value,
+      errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+    handle.setHandleState(handleState)
+    assert(handle.getHandleState === handleState)
+    val ex = intercept[Exception] {
+      fn(handle)
+    }
+    assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+    handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = {
+    handle.registerTimer(1000L)
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+    test(s"value state creation with timeoutMode=$timeoutMode " +
+      "and invalid state should fail") {
+      tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+        val store = provider.getStore(0)
+        val handle = new StatefulProcessorHandleImpl(store,
+          UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.INITIALIZED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.DATA_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, 
StatefulProcessorHandleState.TIMER_PROCESSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+
+        verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED,
+          "Cannot create state variable") { handle =>
+          createValueStateInstance(handle)
+        }
+      }
+    }
+  }
+
+  test("registering processing/event time timeouts with NoTimeout mode should 
fail") {
+    tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store,
+        UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts())
+      val ex = intercept[Exception] {
+        handle.registerTimer(10000L)
+      }
+      assert(ex.getMessage.contains("Cannot use timers"))
+
+      val ex2 = intercept[Exception] {

Review Comment:
   same, checkError if applicable



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -195,6 +327,115 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "should succeed") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val clock = new StreamManualClock
+
+      val inputData = MemoryStream[String]
+      val result = inputData.toDS()
+        .groupByKey(x => x)
+        .transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
+          TimeoutMode.ProcessingTime(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "a"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("a", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("b", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(10 * 1000),
+        CheckNewAnswer(("a", "-1"), ("b", "2")),
+
+        StopStream,
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "b"),
+        AddData(inputData, "c"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("c", "1")),
+        AddData(inputData, "d"),
+        AdvanceManualClock(10 * 1000),
+        CheckNewAnswer(("c", "-1"), ("d", "1")),
+        StopStream
+      )
+    }
+  }
+
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "and add/remove timers should succeed") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val clock = new StreamManualClock
+
+      val inputData = MemoryStream[String]
+      val result = inputData.toDS()
+        .groupByKey(x => x)
+        .transformWithState(
+          new RunningCountStatefulProcessorWithAddRemoveProcTimeTimer(),
+          TimeoutMode.ProcessingTime(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "a"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("a", "1")),
+
+        AddData(inputData, "a"),
+        AdvanceManualClock(2 * 1000),
+        CheckNewAnswer(("a", "2")),
+        StopStream,
+
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "d"),
+        AdvanceManualClock(10 * 1000),
+        CheckNewAnswer(("a", "-1"), ("d", "1")),

Review Comment:
   Shall we add code comment about elaborating the expectation here, or add 
code comment step by step - per microbatch? It's hard to track what is expected 
because we don't check the timer values. It's totally OK to not directly check 
the values of timers, but wanted to see the reasoning.
   
   My reasoning: 
   at batch 0, ts = 1, timer = "a" -> [6] (= 1 + 5)
   at batch 1, ts = 2, timer = "a" -> [9.5] (2 + 7.5)
   at batch 2, ts = 12, timer for "a" is expired. If the timer of "a" is not 
replaced (pure addition), it should have triggered timer two times here and 
produced ("a", "-1") two times.
   
   Do I understand correctly?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
       Iterator((key, count.toString))
     }
   }
+}
+
+// Class to verify stateful processor usage with adding processing time timers

Review Comment:
   I understand it's not easy to come up with "reasonable" scenario where we 
maintain multiple timers for a single grouping key, but we don't seem to have a 
e2e test coverage. Do you think unit test should be sufficient, or would you 
add a new test demonstrating multiple timers?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -195,6 +327,115 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "should succeed") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val clock = new StreamManualClock
+
+      val inputData = MemoryStream[String]
+      val result = inputData.toDS()
+        .groupByKey(x => x)
+        .transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
+          TimeoutMode.ProcessingTime(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "a"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("a", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("b", "1")),
+
+        AddData(inputData, "b"),
+        AdvanceManualClock(10 * 1000),
+        CheckNewAnswer(("a", "-1"), ("b", "2")),
+
+        StopStream,
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, "b"),
+        AddData(inputData, "c"),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer(("c", "1")),

Review Comment:
   nit: I'd leave a comment to explicitly say that b is removed as count == 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to