HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { + try { + f(provider) + } finally { + provider.close() + } + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { + timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + } + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) + handle.getValueState[Long]("testState") + } + } + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { + handle.setHandleState(handleState) + assert(handle.getHandleState === handleState) + val ex = intercept[Exception] { + fn(handle) + } + assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { + handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = { + handle.registerTimer(1000L) + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode " + + "and invalid state should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + + verifyInvalidOperation(handle, StatefulProcessorHandleState.INITIALIZED, Review Comment: Maybe, worth doing refactor with Seq & foreach as only phase is different. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { Review Comment: nit: indent ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, Review Comment: nit: indentation ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { + try { + f(provider) + } finally { + provider.close() + } + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { + timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + } + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) + handle.getValueState[Long]("testState") + } + } + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { + handle.setHandleState(handleState) + assert(handle.getHandleState === handleState) + val ex = intercept[Exception] { Review Comment: FYI, if you are using error class framework, it's recommended to verify the error with `checkError`. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S Iterator((key, count.toString)) } } +} + +// Class to verify stateful processor usage with adding processing time timers +class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor { + private def handleProcessingTimeBasedTimers( + key: String, + expiryTimestampMs: Long): Iterator[(String, String)] = { + _countState.clear() + Iterator((key, "-1")) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { + + if (expiredTimerInfo.isValid()) { + handleProcessingTimeBasedTimers(key, expiredTimerInfo.getExpiryTimeInMs()) + } else { + val currCount = _countState.getOption().getOrElse(0L) + if (currCount == 0 && (key == "a" || key == "c")) { + _processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + + 5000) + } + + val count = currCount + 1 + if (count == 3) { + _countState.clear() + Iterator.empty + } else { + _countState.update(count) + Iterator((key, count.toString)) + } + } + } +} - override def close(): Unit = {} +// Class to verify stateful processor usage with adding/deleting processing time timers +class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer + extends RunningCountStatefulProcessor { + @transient private var _timerState: ValueState[Long] = _ + + override def init( + handle: StatefulProcessorHandle, + outputMode: OutputMode, + timeoutMode: TimeoutMode) : Unit = { + super.init(handle, outputMode, timeoutMode) + _timerState = _processorHandle.getValueState[Long]("timerState") + } + + private def handleProcessingTimeBasedTimers( + key: String, + expiryTimestampMs: Long): Iterator[(String, String)] = { + _timerState.clear() + Iterator((key, "-1")) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { + if (expiredTimerInfo.isValid()) { + handleProcessingTimeBasedTimers(key, expiredTimerInfo.getExpiryTimeInMs()) + } else { + val currCount = _countState.getOption().getOrElse(0L) + val count = currCount + inputRows.size + _countState.update(count) + if (key == "a") { + var nextTimerTs: Long = 0L + if (currCount == 0) { + nextTimerTs = timerValues.getCurrentProcessingTimeInMs() + 5000 + _processorHandle.registerTimer(nextTimerTs) + _timerState.update(nextTimerTs) + } else if (currCount == 1) { + _processorHandle.deleteTimer(_timerState.get()) Review Comment: Just to double check, users can do listTimer to avoid having another value state, right? There might be performance indication but just functionality perspective. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -195,6 +327,115 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - streaming with rocksdb and processing time timer " + + "should succeed") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "a"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("a", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("b", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(10 * 1000), + CheckNewAnswer(("a", "-1"), ("b", "2")), + + StopStream, + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "b"), + AddData(inputData, "c"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("c", "1")), + AddData(inputData, "d"), Review Comment: nit: new line in above to separate batch ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { + try { + f(provider) + } finally { + provider.close() + } + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { + timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + } + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) + handle.getValueState[Long]("testState") + } + } + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { + handle.setHandleState(handleState) + assert(handle.getHandleState === handleState) + val ex = intercept[Exception] { + fn(handle) + } + assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { + handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = { + handle.registerTimer(1000L) + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode " + + "and invalid state should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + + verifyInvalidOperation(handle, StatefulProcessorHandleState.INITIALIZED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.DATA_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.TIMER_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + } + } + } + + test("registering processing/event time timeouts with NoTimeout mode should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts()) + val ex = intercept[Exception] { Review Comment: same, checkError if applicable ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala: ########## @@ -199,6 +199,21 @@ class ValueStateSuite extends SharedSparkSession } } + test("Value state operations for unsupported type name should fail") { + tryWithProviderResource(newStoreProviderWithValueState(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], + TimeoutMode.NoTimeouts()) + + val ex = intercept[Exception] { Review Comment: same, checkError if applicable ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S Iterator((key, count.toString)) } } +} + +// Class to verify stateful processor usage with adding processing time timers +class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor { + private def handleProcessingTimeBasedTimers( + key: String, + expiryTimestampMs: Long): Iterator[(String, String)] = { + _countState.clear() + Iterator((key, "-1")) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { + + if (expiredTimerInfo.isValid()) { + handleProcessingTimeBasedTimers(key, expiredTimerInfo.getExpiryTimeInMs()) + } else { + val currCount = _countState.getOption().getOrElse(0L) + if (currCount == 0 && (key == "a" || key == "c")) { + _processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + + 5000) + } + + val count = currCount + 1 + if (count == 3) { + _countState.clear() + Iterator.empty + } else { + _countState.update(count) + Iterator((key, count.toString)) + } + } + } +} - override def close(): Unit = {} +// Class to verify stateful processor usage with adding/deleting processing time timers Review Comment: nit: replacing seems to be clearer and straightforward than adding/deleting, same in class name. updating is also fine. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { + try { + f(provider) + } finally { + provider.close() + } + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { + timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + } + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) + handle.getValueState[Long]("testState") + } + } + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { + handle.setHandleState(handleState) + assert(handle.getHandleState === handleState) + val ex = intercept[Exception] { + fn(handle) + } + assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { + handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = { + handle.registerTimer(1000L) + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode " + + "and invalid state should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + + verifyInvalidOperation(handle, StatefulProcessorHandleState.INITIALIZED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.DATA_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.TIMER_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + } + } + } + + test("registering processing/event time timeouts with NoTimeout mode should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts()) + val ex = intercept[Exception] { + handle.registerTimer(10000L) + } + assert(ex.getMessage.contains("Cannot use timers")) + + val ex2 = intercept[Exception] { + handle.deleteTimer(10000L) + } + assert(ex2.getMessage.contains("Cannot use timers")) + } + } + + Seq("ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"registering timeouts with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + handle.setHandleState(StatefulProcessorHandleState.INITIALIZED) + assert(handle.getHandleState === StatefulProcessorHandleState.INITIALIZED) + + ImplicitGroupingKeyTracker.setImplicitKey("test_key") + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined) + + handle.registerTimer(10000L) + handle.deleteTimer(10000L) + + ImplicitGroupingKeyTracker.removeImplicitKey() + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty) + } + } + } + + Seq("ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"verify listing of registered timers with timeoutMode=$timeoutMode") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED) + assert(handle.getHandleState === StatefulProcessorHandleState.DATA_PROCESSED) + + ImplicitGroupingKeyTracker.setImplicitKey("test_key1") + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined) + + // Generate some random timer timestamps in arbitrary sorted order + val timerTimestamps1 = Seq(931L, 8000L, 452300L, 4200L, 90L, + 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps1.foreach { timestamp => + handle.registerTimer(timestamp) + } + + val timers1 = handle.listTimers() + assert(timers1.toSeq.sorted === timerTimestamps1.sorted) + ImplicitGroupingKeyTracker.removeImplicitKey() + + ImplicitGroupingKeyTracker.setImplicitKey("test_key2") + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined) + + // Generate some random timer timestamps in arbitrary sorted order + val timerTimestamps2 = Seq(12000L, 14500L, 16000L) + timerTimestamps2.foreach { timestamp => + handle.registerTimer(timestamp) + } + + val timers2 = handle.listTimers() + assert(timers2.toSeq.sorted === timerTimestamps2.sorted) + ImplicitGroupingKeyTracker.removeImplicitKey() + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty) + } + } + } + + Seq("ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"verify that expired timers are returned in sorted order with timeoutMode=$timeoutMode") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED) + assert(handle.getHandleState === StatefulProcessorHandleState.DATA_PROCESSED) + + ImplicitGroupingKeyTracker.setImplicitKey("test_key") + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined) + + // Generate some random timer timestamps in arbitrary sorted order + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps.foreach { timestamp => + handle.registerTimer(timestamp) + } + + // Ensure that the expired timers are returned in sorted order + var expiredTimers = handle.getExpiredTimers(1000000L).map(_._2).toSeq + assert(expiredTimers === timerTimestamps.sorted) + + expiredTimers = handle.getExpiredTimers(5L).map(_._2).toSeq + assert(expiredTimers === Seq(1L, 2L, 3L, 5L)) + + expiredTimers = handle.getExpiredTimers(10L).map(_._2).toSeq + assert(expiredTimers === Seq(1L, 2L, 3L, 5L, 6L, 8L, 9L)) + + ImplicitGroupingKeyTracker.removeImplicitKey() + assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty) + } + } + } + + Seq("ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"registering timeouts with timeoutMode=$timeoutMode and invalid state should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + + verifyInvalidOperation(handle, StatefulProcessorHandleState.CREATED, Review Comment: same, Seq and foreach ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + after { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): + RocksDBStateStoreProvider = { + newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( + storeId: StateStoreId, + numColsPrefixKey: Int, + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration, + useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { + val provider = new RocksDBStateStoreProvider() + provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) + provider + } + + private def tryWithProviderResource[T]( + provider: StateStoreProvider)(f: StateStoreProvider => T): T = { + try { + f(provider) + } finally { + provider.close() + } + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { + timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + } + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) + handle.getValueState[Long]("testState") + } + } + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { + handle.setHandleState(handleState) + assert(handle.getHandleState === handleState) + val ex = intercept[Exception] { + fn(handle) + } + assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { + handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = { + handle.registerTimer(1000L) + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => + test(s"value state creation with timeoutMode=$timeoutMode " + + "and invalid state should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) + + verifyInvalidOperation(handle, StatefulProcessorHandleState.INITIALIZED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.DATA_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.TIMER_PROCESSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + + verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED, + "Cannot create state variable") { handle => + createValueStateInstance(handle) + } + } + } + } + + test("registering processing/event time timeouts with NoTimeout mode should fail") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => + val store = provider.getStore(0) + val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, TimeoutMode.NoTimeouts()) + val ex = intercept[Exception] { + handle.registerTimer(10000L) + } + assert(ex.getMessage.contains("Cannot use timers")) + + val ex2 = intercept[Exception] { Review Comment: same, checkError if applicable ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -195,6 +327,115 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - streaming with rocksdb and processing time timer " + + "should succeed") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "a"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("a", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("b", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(10 * 1000), + CheckNewAnswer(("a", "-1"), ("b", "2")), + + StopStream, + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "b"), + AddData(inputData, "c"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("c", "1")), + AddData(inputData, "d"), + AdvanceManualClock(10 * 1000), + CheckNewAnswer(("c", "-1"), ("d", "1")), + StopStream + ) + } + } + + test("transformWithState - streaming with rocksdb and processing time timer " + + "and add/remove timers should succeed") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState( + new RunningCountStatefulProcessorWithAddRemoveProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "a"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("a", "1")), + + AddData(inputData, "a"), + AdvanceManualClock(2 * 1000), + CheckNewAnswer(("a", "2")), + StopStream, + + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "d"), + AdvanceManualClock(10 * 1000), + CheckNewAnswer(("a", "-1"), ("d", "1")), Review Comment: Shall we add code comment about elaborating the expectation here, or add code comment step by step - per microbatch? It's hard to track what is expected because we don't check the timer values. It's totally OK to not directly check the values of timers, but wanted to see the reasoning. My reasoning: at batch 0, ts = 1, timer = "a" -> [6] (= 1 + 5) at batch 1, ts = 2, timer = "a" -> [9.5] (2 + 7.5) at batch 2, ts = 12, timer for "a" is expired. If the timer of "a" is not replaced (pure addition), it should have triggered timer two times here and produced ("a", "-1") two times. Do I understand correctly? ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S Iterator((key, count.toString)) } } +} + +// Class to verify stateful processor usage with adding processing time timers Review Comment: I understand it's not easy to come up with "reasonable" scenario where we maintain multiple timers for a single grouping key, but we don't seem to have a e2e test coverage. Do you think unit test should be sufficient, or would you add a new test demonstrating multiple timers? ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -195,6 +327,115 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - streaming with rocksdb and processing time timer " + + "should succeed") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "a"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("a", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("b", "1")), + + AddData(inputData, "b"), + AdvanceManualClock(10 * 1000), + CheckNewAnswer(("a", "-1"), ("b", "2")), + + StopStream, + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, "b"), + AddData(inputData, "c"), + AdvanceManualClock(1 * 1000), + CheckNewAnswer(("c", "1")), Review Comment: nit: I'd leave a comment to explicitly say that b is removed as count == 3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org