Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR closed pull request #45051: [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator URL: https://github.com/apache/spark/pull/45051 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on PR #45051: URL: https://github.com/apache/spark/pull/45051#issuecomment-1995685935 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1522327291 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1522188039 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S Iterator((key, count.toString)) } } +} + +// Class to verify stateful processor usage with adding processing time timers +class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor { + private def handleProcessingTimeBasedTimers( + key: String, + expiryTimestampMs: Long): Iterator[(String, String)] = { +_countState.clear() +Iterator((key, "-1")) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { + +if (expiredTimerInfo.isValid()) { + handleProcessingTimeBasedTimers(key, expiredTimerInfo.getExpiryTimeInMs()) +} else { + val currCount = _countState.getOption().getOrElse(0L) + if (currCount == 0 && (key == "a" || key == "c")) { + _processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + + 5000) + } + + val count = currCount + 1 + if (count == 3) { +_countState.clear() +Iterator.empty + } else { +_countState.update(count) +Iterator((key, count.toString)) + } +} + } +} - override def close(): Unit = {} +// Class to verify stateful processor usage with adding/deleting processing time timers +class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer + extends RunningCountStatefulProcessor { + @transient private var _timerState: ValueState[Long] = _ + + override def init( + handle: StatefulProcessorHandle, + outputMode: OutputMode, + timeoutMode: TimeoutMode) : Unit = { +super.init(handle, outputMode, timeoutMode) +_timerState = _processorHandle.getValueState[Long]("timerState") + } + + private def handleProcessingTimeBasedTimers( + key: String, + expiryTimestampMs: Long): Iterator[(String, String)] = { +_timerState.clear() +Iterator((key, "-1")) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { +if (expiredTimerInfo.isValid()) { + handleProcessingTimeBasedTimers(key, expiredTimerInfo.getExpiryTimeInMs()) +} else { + val currCount = _countState.getOption().getOrElse(0L) + val count = currCount + inputRows.size + _countState.update(count) + if (key == "a") { +var nextTimerTs: Long = 0L +if (currCount == 0) { + nextTimerTs = timerValues.getCurrentProcessingTimeInMs() + 5000 + _processorHandle.registerTimer(nextTimerTs) + _timerState.update(nextTimerTs) +} else if (currCount == 1) { + _processorHandle.deleteTimer(_timerState.get()) Review Comment: Yes - added it in a different test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1522187745 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -195,6 +327,115 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - streaming with rocksdb and processing time timer " + + "should succeed") { +withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() +.groupByKey(x => x) +.transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( +StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), +AddData(inputData, "a"), +AdvanceManualClock(1 * 1000), +CheckNewAnswer(("a", "1")), + +AddData(inputData, "b"), +AdvanceManualClock(1 * 1000), +CheckNewAnswer(("b", "1")), + +AddData(inputData, "b"), +AdvanceManualClock(10 * 1000), +CheckNewAnswer(("a", "-1"), ("b", "2")), + +StopStream, +StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), +AddData(inputData, "b"), +AddData(inputData, "c"), +AdvanceManualClock(1 * 1000), +CheckNewAnswer(("c", "1")), +AddData(inputData, "d"), +AdvanceManualClock(10 * 1000), +CheckNewAnswer(("c", "-1"), ("d", "1")), +StopStream + ) +} + } + + test("transformWithState - streaming with rocksdb and processing time timer " + + "and add/remove timers should succeed") { +withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + + val inputData = MemoryStream[String] + val result = inputData.toDS() +.groupByKey(x => x) +.transformWithState( + new RunningCountStatefulProcessorWithAddRemoveProcTimeTimer(), + TimeoutMode.ProcessingTime(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( +StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), +AddData(inputData, "a"), +AdvanceManualClock(1 * 1000), +CheckNewAnswer(("a", "1")), + +AddData(inputData, "a"), +AdvanceManualClock(2 * 1000), +CheckNewAnswer(("a", "2")), +StopStream, + +StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), +AddData(inputData, "d"), +AdvanceManualClock(10 * 1000), +CheckNewAnswer(("a", "-1"), ("d", "1")), Review Comment: Done ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S Iterator((key, count.toString)) } } +} + +// Class to verify stateful processor usage with adding processing time timers Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1520644679 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -163,6 +249,16 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver +timeoutMode match { + case ProcessingTime => +require(batchTimestampMs.nonEmpty) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1520644342 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) { + throw StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString) +} + +if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) { + throw StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString) +} + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1520520070 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -103,8 +116,12 @@ case class TransformWithStateExec( val keyObj = getKeyObj(keyRow) // convert key to objects ImplicitGroupingKeyTracker.setImplicitKey(keyObj) val valueObjIter = valueRowIter.map(getValueObj.apply) -val mappedIterator = statefulProcessor.handleInputRows(keyObj, valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map { obj => +val mappedIterator = statefulProcessor.handleInputRows( + keyObj, + valueObjIter, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents), Review Comment: Done - updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1520519676 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) { + throw StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString) +} + +if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) { Review Comment: Well - we don't want to allow creation when the `currState` is `CREATED` either. Updated the condition little bit to make it more readable. Pls take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1520472228 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { Review Comment: Done ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
sahnib commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519943584 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -103,8 +116,12 @@ case class TransformWithStateExec( val keyObj = getKeyObj(keyRow) // convert key to objects ImplicitGroupingKeyTracker.setImplicitKey(keyObj) val valueObjIter = valueRowIter.map(getValueObj.apply) -val mappedIterator = statefulProcessor.handleInputRows(keyObj, valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map { obj => +val mappedIterator = statefulProcessor.handleInputRows( + keyObj, + valueObjIter, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents), Review Comment: I discovered this as well and updated it to `eventTimeForEviction` in my PR. https://github.com/apache/spark/pull/45376/files#diff-2bac1c42eb2edac75b4d725015d7a690269eb0869389e0347b8b6c01d222 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519251919 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519192303 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519192303 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519187563 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519185494 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519178347 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519009915 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
neilramaswamy commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518995350 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range sca
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
neilramaswamy commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518738187 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range sca
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518723182 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518698125 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range scan.
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
neilramaswamy commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518669207 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range sca
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
neilramaswamy commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1518669207 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + // We maintain a secondary index that inverts the ordering of the timestamp + // and grouping key and maintains the list of (expiry) timestamps in sorted order + // (using BIG_ENDIAN encoding) within RocksDB. + // This is because RocksDB uses byte-wise comparison using the default comparator to + // determine sorted order of keys. This is used to read expired timers at any given + // processing time/event time timestamp threshold by performing a range sca
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { +StateStore.stop() +require(!StateStore.isMaintenanceRunning) + } + + after { +StateStore.stop() +require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = +Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): +RocksDBStateStoreProvider = { +newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( +storeId: StateStoreId, +numColsPrefixKey: Int, +sqlConf: Option[SQLConf] = None, +conf: Configuration = new Configuration, +useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { +val provider = new RocksDBStateStoreProvider() +provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) +provider + } + + private def tryWithProviderResource[T]( +provider: StateStoreProvider)(f: StateStoreProvider => T): T = { +try { + f(provider) +} finally { + provider.close() +} + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { +timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") +} + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => +test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => +val store = provider.getStore(0) +val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) +assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) +handle.getValueState[Long]("testState") + } +} + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { +handle.setHandleState(handleState) +assert(handle.getHandleState === handleState) +val ex = intercept[Exception] { + fn(handle) +} +assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { +handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHan
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1517119556 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.Serializable + +import org.apache.spark.annotation.{Evolving, Experimental} + +/** + * Class used to provide access to expired timer's expiry time and timeout mode. These values Review Comment: nit: Technically the timeout mode is not visible with trait. If that's intentional, probably remove that part in the interface doc. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode} + +/** + * Class that provides a concrete implementation that can be used to provide access to expired + * timer's expiry time and timeout mode. These values are only relevant if the ExpiredTimerInfo Review Comment: nit: same, timeout mode is not visible to user function AFAIK. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -103,8 +116,12 @@ case class TransformWithStateExec( val keyObj = getKeyObj(keyRow) // convert key to objects ImplicitGroupingKeyTracker.setImplicitKey(keyObj) val valueObjIter = valueRowIter.map(getValueObj.apply) -val mappedIterator = statefulProcessor.handleInputRows(keyObj, valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map { obj => +val mappedIterator = statefulProcessor.handleInputRows( + keyObj, + valueObjIter, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents), + new ExpiredTimerInfoImpl(false) Review Comment: super nit / 2 cents: name parameter for boolean would give much better readability in non-IDE environment. It's really more about general suggestion and preference, so you can ignore. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) { + throw StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString) +} + +if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) { + throw StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString) +} + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: I'm OK with moving the logging to TimerStateImpl if it helps to solve this. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1507971789 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: Yes thats correct. Will fire within the same microbatch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
sahnib commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1507925323 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: As we process timers after the data, this would result in firing of the timer right away in the same micro-batch after the data is processed. I think that is fine, just pointing it out. Also, equality should take care of firing the timer right away though, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1505248645 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1505126889 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: I thought we decided not to do this ? So basically we allow for a timestamp earlier than latest to be generated which would fire the timer right away in the next invocation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1505126284 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + private def encodeSecIndexKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(tsToKeyCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val bbuf = ByteBuffer.allocate(8) +bbuf.order(ByteOrder.BIG_ENDIAN) +bbuf.putLong(expiryTimestampMs) Review Comment: Do
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1505125882 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.Serializable + +import org.apache.spark.annotation.{Evolving, Experimental} + +/** + * Class used to provide access to expired timer's expiry time and timeout mode. These values + * are only relevant if the ExpiredTimerInfo is valid. + */ +@Experimental +@Evolving +private[sql] trait ExpiredTimerInfo extends Serializable { + /** + * Check if provided ExpiredTimerInfo is valid. + */ + def isValid(): Boolean + + /** + * Get the expired timer's expiry time as milliseconds in epoch time. + */ + def getExpiryTimeInMs(): Long + + /** + * Get the expired timer's timeout mode. + */ + def getTimeoutMode(): TimeoutMode Review Comment: Done - modified this. Discussed offline and we decided to pass the mode here to the `init` method. If we decide to support both timer types in the future, the mode can be changed and we will also add ability to specify timer type for registering/deleting timers ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala: ## @@ -51,6 +51,25 @@ private[sql] trait StatefulProcessorHandle extends Serializable { /** Function to return queryInfo for currently running task */ def getQueryInfo(): QueryInfo + /** + * Function to register a processing/event time based timer for given implicit key Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
sahnib commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1504503668 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -252,16 +252,30 @@ class RocksDB( } } + /** + * Check whether the column family name is for internal column families. + * @param cfName - column family name + * @return - true if the column family is for internal use, false otherwise + */ + private def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_' + /** * Create RocksDB column family, if not created already */ - def createColFamilyIfAbsent(colFamilyName: String): Unit = { -if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { + def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = false): Unit = { +// Remove leading and trailing whitespaces +val cfName = colFamilyName.trim + +if (cfName == StateStore.DEFAULT_COL_FAMILY_NAME) { throw new SparkUnsupportedOperationException( errorClass = "_LEGACY_ERROR_TEMP_3197", messageParameters = Map("colFamilyName" -> colFamilyName).toMap) } +if (!isInternal && cfName.charAt(0) == '_') { Review Comment: We should add a check that the cfName is not empty string, and throw an error. [Right now we will end in IndexOutofBoundsException] ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: We should validate here that `expiryTimestampMs` is >= `batchTimestampMs`, or watermark based on the timeout mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
sahnib commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1504459373 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.Serializable +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.types._ +import org.apache.spark.util.NextIterator + +/** + * Singleton utils class used primarily while interacting with TimerState + */ +object TimerStateUtils { + case class TimestampWithKey( + key: Any, + expiryTimestampMs: Long) extends Serializable + + val PROC_TIMERS_STATE_NAME = "_procTimers" + val EVENT_TIMERS_STATE_NAME = "_eventTimers" + val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" + val TIMESTAMP_TO_KEY_CF = "_timestampToKey" +} + +/** + * Class that provides the implementation for storing timers + * used within the `transformWithState` operator. + * @param store - state store to be used for storing timer data + * @param timeoutMode - mode of timeout (event time or processing time) + * @param keyExprEnc - encoder for key expression + * @tparam S - type of timer value + */ +class TimerStateImpl[S]( +store: StateStore, +timeoutMode: TimeoutMode, +keyExprEnc: ExpressionEncoder[Any]) extends Logging { + + private val EMPTY_ROW = + UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) + + private val schemaForPrefixKey: StructType = new StructType() +.add("key", BinaryType) + + private val schemaForKeyRow: StructType = new StructType() +.add("key", BinaryType) +.add("expiryTimestampMs", LongType, nullable = false) + + private val keySchemaForSecIndex: StructType = new StructType() +.add("expiryTimestampMs", BinaryType, nullable = false) +.add("key", BinaryType) + + private val schemaForValueRow: StructType = +StructType(Array(StructField("__dummy__", NullType))) + + private val keySerializer = keyExprEnc.createSerializer() + + private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey) + + private val keyEncoder = UnsafeProjection.create(schemaForKeyRow) + + private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) + + val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) { +TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { +TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + + val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF + store.createColFamilyIfAbsent(keyToTsCFName, +schemaForKeyRow, numColsPrefixKey = 1, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF + store.createColFamilyIfAbsent(tsToKeyCFName, +keySchemaForSecIndex, numColsPrefixKey = 0, +schemaForValueRow, useMultipleValuesPerKey = false, +isInternal = true) + + private def encodeKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs)) +keyRow + } + + private def encodeSecIndexKey(expiryTimestampMs: Long): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (!keyOption.isDefined) { + throw StateStoreErrors.implicitKeyNotFound(tsToKeyCFName) +} + +val keyByteArr = keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() +val bbuf = ByteBuffer.allocate(8) +bbuf.order(ByteOrder.BIG_ENDIAN) +bbuf.putLong(expiryTimestampMs) Review Comment: Found th
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1503733977 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -69,8 +70,20 @@ case class TransformWithStateExec( override def shortName: String = "transformWithStateExec" - // TODO: update this to run no-data batches when timer support is added - override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = false + override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = { +timeoutMode match { + // TODO: check if we can return true only if actual timers are registered Review Comment: Yea - but the time at which this is called - I believe we don't have the storeRDD or store instance available directly. We could potentially track some count, but haven't tried the change/optimization yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1503733123 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -48,16 +48,19 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * @param inputRows - iterator of input rows associated with grouping key * @param timerValues - instance of TimerValues that provides access to current processing/event *time if available + * @param expiredTimerInfo - instance of ExpiredTimerInfo that provides access to expired timer + * if applicable * @return - Zero or more output rows */ def handleInputRows( key: K, inputRows: Iterator[I], - timerValues: TimerValues): Iterator[O] + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[O] Review Comment: Yea thought of that - but we would have the same problem with the Java API in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1503732422 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, Review Comment: The existing one is tied to `GroupState`, do did not want to reuse that. Also, if we add more modes here in the future - thought that its better to keep this generic and separate to this API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1503731606 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.Serializable + +import org.apache.spark.annotation.{Evolving, Experimental} + +/** + * Class used to provide access to expired timer's expiry time and timeout mode. These values + * are only relevant if the ExpiredTimerInfo is valid. + */ +@Experimental +@Evolving +private[sql] trait ExpiredTimerInfo extends Serializable { + /** + * Check if provided ExpiredTimerInfo is valid. + */ + def isValid(): Boolean + + /** + * Get the expired timer's expiry time as milliseconds in epoch time. + */ + def getExpiryTimeInMs(): Long + + /** + * Get the expired timer's timeout mode. + */ + def getTimeoutMode(): TimeoutMode Review Comment: But that is not available to the `StatefulProcessor` though. Do you prefer to pass the timeout mode in the `init` method instead ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
sahnib commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1503581091 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.Serializable + +import org.apache.spark.annotation.{Evolving, Experimental} + +/** + * Class used to provide access to expired timer's expiry time and timeout mode. These values + * are only relevant if the ExpiredTimerInfo is valid. + */ +@Experimental +@Evolving +private[sql] trait ExpiredTimerInfo extends Serializable { + /** + * Check if provided ExpiredTimerInfo is valid. + */ + def isValid(): Boolean + + /** + * Get the expired timer's expiry time as milliseconds in epoch time. + */ + def getExpiryTimeInMs(): Long + + /** + * Get the expired timer's timeout mode. + */ + def getTimeoutMode(): TimeoutMode Review Comment: Would this ever be different than the timeout mode provided to `transformWithState` API? If no, do we need this here? ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -48,16 +48,19 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * @param inputRows - iterator of input rows associated with grouping key * @param timerValues - instance of TimerValues that provides access to current processing/event *time if available + * @param expiredTimerInfo - instance of ExpiredTimerInfo that provides access to expired timer + * if applicable * @return - Zero or more output rows */ def handleInputRows( key: K, inputRows: Iterator[I], - timerValues: TimerValues): Iterator[O] + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[O] /** * Function called as the last method that allows for users to perform * any cleanup or teardown operations. */ - def close (): Unit + def close (): Unit = {} Review Comment: Nice idea to provide a default implementation here. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, +s"Cannot register timers with incorrect TimeoutMode") +verify(currState == INITIALIZED || currState == DATA_PROCESSED, +s"Cannot register timers with " + + s"expiryTimestampMs=$expiryTimestampMs in current state=$currState") Review Comment: We should use the NERF framework for these user errors. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +verify(timeoutMode == ProcessingTime || timeoutMode == EventTime, Review Comment: Shouldn't this be same as the timeoutMode in transformWithState API? ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala: ## @@ -51,6 +51,25 @@ private[sql] trait StatefulProcessorHandle extends Serializable { /** Function to return queryInfo for currently running task */ def getQueryInfo(): QueryInfo + /** + * Function to register a processing/event time based timer for given implicit key Review Comment: [nit] `implicit key` -> `implicit grouping key`. ## sql/api/src/main/scala/org/apache/spark/sql/stream
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db commented on PR #45051: URL: https://github.com/apache/spark/pull/45051#issuecomment-1960408558 @HeartSaVioR - PTAL, thx ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org