This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9ffdbc65029a [SPARK-47784][SS] Merge TTLMode and TimeoutMode into a single TimeMode 9ffdbc65029a is described below commit 9ffdbc65029a08ce621ca50037683db05dc55761 Author: Bhuwan Sahni <bhuwan.sa...@databricks.com> AuthorDate: Fri Apr 12 13:47:07 2024 +0900 [SPARK-47784][SS] Merge TTLMode and TimeoutMode into a single TimeMode ### What changes were proposed in this pull request? This PR merges the `TimeoutMode` and `TTLMode` parameter for `transformWithState` into a single `TimeMode`. Currently, users need to specify the notion of time (ProcessingTime/EventTime) for timers and ttl separately. This allows users to use a single parameter. We do not expect users to use mix/match EventTime/ProcessingTime for timers and ttl in a single query because it makes hard to reason about the time semantics (when will timer be fired?, when will the state be evicted? etc.). Its simpler to stick to one notion of time throughout timers and ttl. ### Why are the changes needed? Changes are needed to simplify Arbitrary State API `transformWithState` interface by merging TTLMode/TimeoutMode into a single TimeMode. ### Does this PR introduce _any_ user-facing change? Yes, this PR changes the API parameters for `transformWithState`. ### How was this patch tested? All existing testcases for `transformWithState` API pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45960 from sahnib/introduce-timeMode. Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 16 +++--- .../apache/spark/sql/KeyValueGroupedDataset.scala | 38 +++++-------- dev/checkstyle-suppressions.xml | 4 +- docs/sql-error-conditions.md | 16 +++--- .../sql/streaming/{TTLMode.java => TimeMode.java} | 34 ++++++----- .../apache/spark/sql/streaming/TimeoutMode.java | 51 ----------------- .../logical/{TTLMode.scala => TimeMode.scala} | 10 ++-- .../logical/TransformWithStateTimeoutModes.scala | 24 -------- .../spark/sql/streaming/StatefulProcessor.scala | 5 +- .../spark/sql/catalyst/plans/logical/object.scala | 17 ++---- .../apache/spark/sql/KeyValueGroupedDataset.scala | 36 +++++------- .../spark/sql/execution/SparkStrategies.scala | 9 ++- .../execution/streaming/ExpiredTimerInfoImpl.scala | 4 +- .../streaming/StatefulProcessorHandleImpl.scala | 17 +++--- .../sql/execution/streaming/TimerStateImpl.scala | 14 ++--- .../streaming/TransformWithStateExec.scala | 65 +++++++++------------- .../streaming/state/StateStoreErrors.scala | 37 +++++------- .../org/apache/spark/sql/JavaDatasetSuite.java | 6 +- .../apache/spark/sql/TestStatefulProcessor.java | 3 +- .../sql/TestStatefulProcessorWithInitialState.java | 3 +- .../execution/streaming/state/ListStateSuite.scala | 14 ++--- .../execution/streaming/state/MapStateSuite.scala | 11 ++-- .../state/StatefulProcessorHandleSuite.scala | 64 ++++++++++----------- .../sql/execution/streaming/state/TimerSuite.scala | 42 +++++++------- .../streaming/state/ValueStateSuite.scala | 35 ++++-------- .../streaming/TransformWithListStateSuite.scala | 30 ++++------ .../sql/streaming/TransformWithMapStateSuite.scala | 18 ++---- .../TransformWithStateInitialStateSuite.scala | 25 +++------ .../sql/streaming/TransformWithStateSuite.scala | 61 +++++++------------- .../TransformWithValueStateTTLSuite.scala | 21 +++---- 30 files changed, 267 insertions(+), 463 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 62581116000b..7b13fa4278e4 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3591,21 +3591,15 @@ ], "sqlState" : "0A000" }, - "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE" : { - "message" : [ - "Cannot use TTL for state=<stateName> in NoTTL() mode." - ], - "sqlState" : "42802" - }, "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : { "message" : [ "Failed to perform stateful processor operation=<operationType> with invalid handle state=<handleState>." ], "sqlState" : "42802" }, - "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE" : { + "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE" : { "message" : [ - "Failed to perform stateful processor operation=<operationType> with invalid timeoutMode=<timeoutMode>" + "Failed to perform stateful processor operation=<operationType> with invalid timeMode=<timeMode>" ], "sqlState" : "42802" }, @@ -3615,6 +3609,12 @@ ], "sqlState" : "42802" }, + "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL" : { + "message" : [ + "Cannot use TTL for state=<stateName> in timeMode=<timeMode>, use TimeMode.ProcessingTime() instead." + ], + "sqlState" : "42802" + }, "STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE" : { "message" : [ "TTL duration must be greater than zero for State store operation=<operationType> on state=<stateName>." diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 39e0c429046d..e38adb9b0b27 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder import org.apache.spark.sql.connect.common.UdfUtils import org.apache.spark.sql.expressions.ScalarUserDefinedFunction import org.apache.spark.sql.functions.col -import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode, TTLMode} +import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode} /** * A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not @@ -827,17 +827,14 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { * The type of the output objects. Must be encodable to Spark SQL types. * @param statefulProcessor * Instance of statefulProcessor whose functions will be invoked by the operator. - * @param timeoutMode - * The timeout mode of the stateful processor. - * @param ttlMode - * The ttlMode to evict user state on ttl expiration. + * @param timeMode + * The time mode semantics of the stateful processor for timers and TTL. * @param outputMode * The output mode of the stateful processor. */ def transformWithState[U: Encoder]( statefulProcessor: StatefulProcessor[K, V, U], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode): Dataset[U] = { throw new UnsupportedOperationException } @@ -854,10 +851,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { * The type of the output objects. Must be encodable to Spark SQL types. * @param statefulProcessor * Instance of statefulProcessor whose functions will be invoked by the operator. - * @param timeoutMode - * The timeout mode of the stateful processor. - * @param ttlMode - * The ttlMode to evict user state on ttl expiration. + * @param timeMode + * The time mode semantics of the stateful processor for timers and TTL. * @param outputMode * The output mode of the stateful processor. * @param outputEncoder @@ -865,8 +860,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { */ def transformWithState[U: Encoder]( statefulProcessor: StatefulProcessor[K, V, U], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, outputEncoder: Encoder[U]): Dataset[U] = { throw new UnsupportedOperationException @@ -883,10 +877,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { * The type of initial state objects. Must be encodable to Spark SQL types. * @param statefulProcessor * Instance of statefulProcessor whose functions will be invoked by the operator. - * @param timeoutMode - * The timeout mode of the stateful processor. - * @param ttlMode - * The ttlMode to evict user state on ttl expiration. + * @param timeMode + * The time mode semantics of the stateful processor for timers and TTL. * @param outputMode * The output mode of the stateful processor. * @param initialState @@ -897,8 +889,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { */ def transformWithState[U: Encoder, S: Encoder]( statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = { throw new UnsupportedOperationException @@ -915,10 +906,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { * The type of initial state objects. Must be encodable to Spark SQL types. * @param statefulProcessor * Instance of statefulProcessor whose functions will be invoked by the operator. - * @param timeoutMode - * The timeout mode of the stateful processor. - * @param ttlMode - * The ttlMode to evict user state on ttl expiration + * @param timeMode + * The time mode semantics of the stateful processor for timers and TTL. * @param outputMode * The output mode of the stateful processor. * @param initialState @@ -933,8 +922,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { */ private[sql] def transformWithState[U: Encoder, S: Encoder]( statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], outputEncoder: Encoder[U], diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 94dfe20af56e..834265f48aa8 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -57,11 +57,9 @@ <suppress checks="MethodName" files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> <suppress checks="MethodName" - files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java"/> + files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java"/> <suppress checks="MethodName" files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> - <suppress checks="MethodName" - files="sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java"/> <suppress checks="LineLength" files="src/main/java/org/apache/spark/sql/api/java/*"/> <suppress checks="IllegalImport" diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 827ee04b7606..3f9ae72a50a6 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2185,23 +2185,17 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists Star (*) is not allowed in a select list when GROUP BY an ordinal position is used. -### STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE - -[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) - -Cannot use TTL for state=`<stateName>` in NoTTL() mode. - ### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) Failed to perform stateful processor operation=`<operationType>` with invalid handle state=`<handleState>`. -### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE +### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -Failed to perform stateful processor operation=`<operationType>` with invalid timeoutMode=`<timeoutMode>` +Failed to perform stateful processor operation=`<operationType>` with invalid timeMode=`<timeMode>` ### STATEFUL_PROCESSOR_CANNOT_REINITIALIZE_STATE_ON_KEY @@ -2209,6 +2203,12 @@ Failed to perform stateful processor operation=`<operationType>` with invalid ti Cannot re-initialize state on the same grouping key during initial state handling for stateful processor. Invalid grouping key=`<groupingKey>`. +### STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Cannot use TTL for state=`<stateName>` in timeMode=`<timeMode>`, use TimeMode.ProcessingTime() instead. + ### STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java similarity index 51% rename from sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java rename to sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java index 30594770b3e1..a45a31bd1a05 100644 --- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java +++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java @@ -19,24 +19,32 @@ package org.apache.spark.sql.streaming; import org.apache.spark.annotation.Evolving; import org.apache.spark.annotation.Experimental; -import org.apache.spark.sql.catalyst.plans.logical.*; +import org.apache.spark.sql.catalyst.plans.logical.EventTime$; +import org.apache.spark.sql.catalyst.plans.logical.NoTime$; +import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$; /** - * Represents the type of ttl modes possible for the Dataset operations - * {@code transformWithState}. + * Represents the time modes (used for specifying timers and ttl) possible for + * the Dataset operations {@code transformWithState}. */ @Experimental @Evolving -public class TTLMode { +public class TimeMode { - /** - * Specifies that there is no TTL for the user state. User state would not - * be cleaned up by Spark automatically. - */ - public static final TTLMode NoTTL() { return NoTTL$.MODULE$; } + /** + * Neither timers nor ttl is supported in this mode. + */ + public static final TimeMode None() { return NoTime$.MODULE$; } - /** - * Specifies that all ttl durations for user state are in processing time. - */ - public static final TTLMode ProcessingTimeTTL() { return ProcessingTimeTTL$.MODULE$; } + /** + * Stateful processor that uses query processing time to register timers and + * calculate ttl expiration. + */ + public static final TimeMode ProcessingTime() { return ProcessingTime$.MODULE$; } + + /** + * Stateful processor that uses event time to register timers. Note that ttl is not + * supported in this TimeMode. + */ + public static final TimeMode EventTime() { return EventTime$.MODULE$; } } diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java deleted file mode 100644 index 68b8134cda6c..000000000000 --- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.annotation.Experimental; -import org.apache.spark.sql.catalyst.plans.logical.*; - -/** - * Represents the type of timeouts possible for the Dataset operations - * {@code transformWithState}. - */ -@Experimental -@Evolving -public class TimeoutMode { - /** - * Stateful processor that does not register timers - */ - public static final TimeoutMode NoTimeouts() { - return NoTimeouts$.MODULE$; - } - - /** - * Stateful processor that only registers processing time based timers - */ - public static final TimeoutMode ProcessingTime() { - return ProcessingTime$.MODULE$; - } - - /** - * Stateful processor that only registers event time based timers - */ - public static final TimeoutMode EventTime() { - return EventTime$.MODULE$; - } -} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala similarity index 79% rename from sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala index be4794a5f40b..b6248e97aa3d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala @@ -16,9 +16,11 @@ */ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.streaming.TTLMode +import org.apache.spark.sql.streaming.TimeMode -/** TTL types used in tranformWithState operator */ -case object NoTTL extends TTLMode +/** TimeMode types used in transformWithState operator */ +case object NoTime extends TimeMode -case object ProcessingTimeTTL extends TTLMode +case object ProcessingTime extends TimeMode + +case object EventTime extends TimeMode diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala deleted file mode 100644 index e420f7821b50..000000000000 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.plans.logical - -import org.apache.spark.sql.streaming.TimeoutMode - -/** Types of timeouts used in transformWithState operator */ -case object NoTimeouts extends TimeoutMode -case object ProcessingTime extends TimeoutMode -case object EventTime extends TimeoutMode diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala index 70f9cdfa399a..54e6a9a4ab67 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala @@ -40,12 +40,11 @@ private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { * Function that will be invoked as the first method that allows for users to * initialize all their state variables and perform other init actions before handling data. * @param outputMode - output mode for the stateful processor - * @param timeoutMode - timeout mode for the stateful processor + * @param timeMode - time mode for the stateful processor. */ def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit + timeMode: TimeMode): Unit /** * Function that will allow users to interact with input data rows along with the grouping key diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index ff7c8fb3df4b..28d52d39093b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode, TTLMode} +import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StatefulProcessor, TimeMode} import org.apache.spark.sql.types._ object CatalystSerde { @@ -574,8 +574,7 @@ object TransformWithState { groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], statefulProcessor: StatefulProcessor[K, V, U], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, outputMode: OutputMode, child: LogicalPlan): LogicalPlan = { val keyEncoder = encoderFor[K] @@ -585,8 +584,7 @@ object TransformWithState { groupingAttributes, dataAttributes, statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]], - ttlMode, - timeoutMode, + timeMode, outputMode, keyEncoder.asInstanceOf[ExpressionEncoder[Any]], CatalystSerde.generateObjAttr[U], @@ -607,8 +605,7 @@ object TransformWithState { groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], statefulProcessor: StatefulProcessor[K, V, U], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, outputMode: OutputMode, child: LogicalPlan, initialStateGroupingAttrs: Seq[Attribute], @@ -621,8 +618,7 @@ object TransformWithState { groupingAttributes, dataAttributes, statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]], - ttlMode, - timeoutMode, + timeMode, outputMode, keyEncoder.asInstanceOf[ExpressionEncoder[Any]], CatalystSerde.generateObjAttr[U], @@ -643,8 +639,7 @@ case class TransformWithState( groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], statefulProcessor: StatefulProcessor[Any, Any, Any], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, outputMode: OutputMode, keyEncoder: ExpressionEncoder[Any], outputObjAttr: Attribute, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index f3713edd0ec0..862268eba666 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.expressions.ReduceAggregator import org.apache.spark.sql.internal.TypedAggUtils -import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode, TTLMode} +import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode} /** * A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not @@ -654,16 +654,14 @@ class KeyValueGroupedDataset[K, V] private[sql]( * @tparam U The type of the output objects. Must be encodable to Spark SQL types. * @param statefulProcessor Instance of statefulProcessor whose functions will be invoked * by the operator. - * @param timeoutMode The timeout mode of the stateful processor. - * @param ttlMode The ttlMode to evict user state on ttl expiration + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. * @param outputMode The output mode of the stateful processor. * * See [[Encoder]] for more details on what types are encodable to Spark SQL. */ private[sql] def transformWithState[U: Encoder]( statefulProcessor: StatefulProcessor[K, V, U], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode): Dataset[U] = { Dataset[U]( sparkSession, @@ -671,8 +669,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes, dataAttributes, statefulProcessor, - ttlMode, - timeoutMode, + timeMode, outputMode, child = logicalPlan ) @@ -691,8 +688,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * @tparam U The type of the output objects. Must be encodable to Spark SQL types. * @param statefulProcessor Instance of statefulProcessor whose functions will be invoked by the * operator. - * @param timeoutMode The timeout mode of the stateful processor. - * @param ttlMode The ttlMode to evict user state on ttl expiration + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. * @param outputMode The output mode of the stateful processor. * @param outputEncoder Encoder for the output type. * @@ -700,11 +696,10 @@ class KeyValueGroupedDataset[K, V] private[sql]( */ private[sql] def transformWithState[U: Encoder]( statefulProcessor: StatefulProcessor[K, V, U], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, outputEncoder: Encoder[U]): Dataset[U] = { - transformWithState(statefulProcessor, timeoutMode, ttlMode, outputMode)(outputEncoder) + transformWithState(statefulProcessor, timeMode, outputMode)(outputEncoder) } /** @@ -716,8 +711,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * @tparam S The type of initial state objects. Must be encodable to Spark SQL types. * @param statefulProcessor Instance of statefulProcessor whose functions will * be invoked by the operator. - * @param timeoutMode The timeout mode of the stateful processor. - * @param ttlMode The ttlMode to evict user state on ttl expiration + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. * @param outputMode The output mode of the stateful processor. * @param initialState User provided initial state that will be used to initiate state for * the query in the first batch. @@ -726,8 +720,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( */ private[sql] def transformWithState[U: Encoder, S: Encoder]( statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = { Dataset[U]( @@ -736,8 +729,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes, dataAttributes, statefulProcessor, - ttlMode, - timeoutMode, + timeMode, outputMode, child = logicalPlan, initialState.groupingAttributes, @@ -756,8 +748,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * @tparam S The type of initial state objects. Must be encodable to Spark SQL types. * @param statefulProcessor Instance of statefulProcessor whose functions will * be invoked by the operator. - * @param timeoutMode The timeout mode of the stateful processor. - * @param ttlMode The ttlMode to evict user state on ttl expiration + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. * @param outputMode The output mode of the stateful processor. * @param initialState User provided initial state that will be used to initiate state for * the query in the first batch. @@ -768,13 +759,12 @@ class KeyValueGroupedDataset[K, V] private[sql]( */ private[sql] def transformWithState[U: Encoder, S: Encoder]( statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], - timeoutMode: TimeoutMode, - ttlMode: TTLMode, + timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], outputEncoder: Encoder[U], initialStateEncoder: Encoder[S]): Dataset[U] = { - transformWithState(statefulProcessor, timeoutMode, ttlMode, + transformWithState(statefulProcessor, timeMode, outputMode, initialState)(outputEncoder, initialStateEncoder) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2c534eb36f9d..d7ebf786168b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -751,7 +751,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case TransformWithState( keyDeserializer, valueDeserializer, groupingAttributes, - dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode, + dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder, outputAttr, child, hasInitialState, initialStateGroupingAttrs, initialStateDataAttrs, initialStateDeserializer, initialState) => @@ -761,8 +761,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { groupingAttributes, dataAttributes, statefulProcessor, - ttlMode, - timeoutMode, + timeMode, outputMode, keyEncoder, outputAttr, @@ -926,12 +925,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { hasInitialState, planLater(initialState), planLater(child) ) :: Nil case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes, - dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode, keyEncoder, + dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder, outputObjAttr, child, hasInitialState, initialStateGroupingAttrs, initialStateDataAttrs, initialStateDeserializer, initialState) => TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, valueDeserializer, - groupingAttributes, dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode, + groupingAttributes, dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder, outputObjAttr, planLater(child), hasInitialState, initialStateGroupingAttrs, initialStateDataAttrs, initialStateDeserializer, planLater(initialState)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala index 8ab05ef852b8..e0bfc684585d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode} +import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeMode} /** * Class that provides a concrete implementation that can be used to provide access to expired @@ -28,7 +28,7 @@ import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode} class ExpiredTimerInfoImpl( isValid: Boolean, expiryTimeInMsOpt: Option[Long] = None, - timeoutMode: TimeoutMode = TimeoutMode.NoTimeouts()) extends ExpiredTimerInfo { + timeMode: TimeMode = TimeMode.None()) extends ExpiredTimerInfo { override def isValid(): Boolean = isValid diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 7bef62b7fcce..2b51361f651d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.streaming.state._ -import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, StatefulProcessorHandle, TimeoutMode, TTLConfig, TTLMode, ValueState} +import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, StatefulProcessorHandle, TimeMode, TTLConfig, ValueState} import org.apache.spark.util.Utils /** @@ -78,8 +78,7 @@ class StatefulProcessorHandleImpl( store: StateStore, runId: UUID, keyEncoder: ExpressionEncoder[Any], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, isStreaming: Boolean = true, batchTimestampMs: Option[Long] = None) extends StatefulProcessorHandle with Logging { @@ -143,7 +142,7 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo - private lazy val timerState = new TimerStateImpl(store, timeoutMode, keyEncoder) + private lazy val timerState = new TimerStateImpl(store, timeMode, keyEncoder) private def verifyStateVarOperations(operationType: String): Unit = { if (currState != CREATED) { @@ -153,9 +152,9 @@ class StatefulProcessorHandleImpl( } private def verifyTimerOperations(operationType: String): Unit = { - if (timeoutMode == NoTimeouts) { - throw StateStoreErrors.cannotPerformOperationWithInvalidTimeoutMode(operationType, - timeoutMode.toString) + if (timeMode == NoTime) { + throw StateStoreErrors.cannotPerformOperationWithInvalidTimeMode(operationType, + timeMode.toString) } if (currState < INITIALIZED || currState >= TIMER_PROCESSED) { @@ -242,8 +241,8 @@ class StatefulProcessorHandleImpl( private def validateTTLConfig(ttlConfig: TTLConfig, stateName: String): Unit = { val ttlDuration = ttlConfig.ttlDuration - if (ttlMode != TTLMode.ProcessingTimeTTL()) { - throw StateStoreErrors.cannotProvideTTLConfigForNoTTLMode(stateName) + if (timeMode != TimeMode.ProcessingTime()) { + throw StateStoreErrors.cannotProvideTTLConfigForTimeMode(stateName, timeMode.toString) } else if (ttlDuration == null || ttlDuration.isNegative || ttlDuration.isZero) { throw StateStoreErrors.ttlMustBePositive("update", stateName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala index 55acc4953c50..e83c83df5322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala @@ -16,14 +16,12 @@ */ package org.apache.spark.sql.execution.streaming -import java.io.Serializable - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.streaming.state._ -import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.streaming.TimeMode import org.apache.spark.sql.types._ import org.apache.spark.util.NextIterator @@ -31,10 +29,6 @@ import org.apache.spark.util.NextIterator * Singleton utils class used primarily while interacting with TimerState */ object TimerStateUtils { - case class TimestampWithKey( - key: Any, - expiryTimestampMs: Long) extends Serializable - val PROC_TIMERS_STATE_NAME = "_procTimers" val EVENT_TIMERS_STATE_NAME = "_eventTimers" val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" @@ -45,12 +39,12 @@ object TimerStateUtils { * Class that provides the implementation for storing timers * used within the `transformWithState` operator. * @param store - state store to be used for storing timer data - * @param timeoutMode - mode of timeout (event time or processing time) + * @param timeMode - mode of timeout (event time or processing time) * @param keyExprEnc - encoder for key expression */ class TimerStateImpl( store: StateStore, - timeoutMode: TimeoutMode, + timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) extends Logging { private val EMPTY_ROW = @@ -78,7 +72,7 @@ class TimerStateImpl( private val secIndexKeyEncoder = UnsafeProjection.create(keySchemaForSecIndex) - private val timerCFName = if (timeoutMode == TimeoutMode.ProcessingTime) { + private val timerCFName = if (timeMode == TimeMode.ProcessingTime) { TimerStateUtils.PROC_TIMERS_STATE_NAME } else { TimerStateUtils.EVENT_TIMERS_STATE_NAME diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index eaf51614d7cb..9f4cad1e348d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -42,8 +42,7 @@ import org.apache.spark.util.{CompletionIterator, SerializableConfiguration, Uti * @param groupingAttributes used to group the data * @param dataAttributes used to read the data * @param statefulProcessor processor methods called on underlying data - * @param ttlMode defines the ttl Mode for user state - * @param timeoutMode defines the timeout mode + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. * @param outputMode defines the output mode for the statefulProcessor * @param keyEncoder expression encoder for the key type * @param outputObjAttr Defines the output object @@ -59,8 +58,7 @@ case class TransformWithStateExec( groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], statefulProcessor: StatefulProcessor[Any, Any, Any], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, outputMode: OutputMode, keyEncoder: ExpressionEncoder[Any], outputObjAttr: Attribute, @@ -80,14 +78,15 @@ case class TransformWithStateExec( override def shortName: String = "transformWithStateExec" override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = { - if (ttlMode == TTLMode.ProcessingTimeTTL() || timeoutMode == TimeoutMode.ProcessingTime()) { - // TODO: check if we can return true only if actual timers are registered - true - } else if (timeoutMode == TimeoutMode.EventTime()) { - eventTimeWatermarkForEviction.isDefined && - newInputWatermark > eventTimeWatermarkForEviction.get - } else { - false + timeMode match { + case ProcessingTime => + // TODO: check if we can return true only if actual timers are registered, or there is + // expired state + true + case EventTime => + eventTimeWatermarkForEviction.isDefined && + newInputWatermark > eventTimeWatermarkForEviction.get + case _ => false } } @@ -200,9 +199,9 @@ case class TransformWithStateExec( } private def processTimers( - timeoutMode: TimeoutMode, + timeMode: TimeMode, processorHandle: StatefulProcessorHandleImpl): Iterator[InternalRow] = { - timeoutMode match { + timeMode match { case ProcessingTime => assert(batchTimestampMs.isDefined) val batchTimestamp = batchTimestampMs.get @@ -262,7 +261,7 @@ case class TransformWithStateExec( override def next() = itr.next() private def getIterator(): Iterator[InternalRow] = CompletionIterator[InternalRow, Iterator[InternalRow]]( - processTimers(timeoutMode, processorHandle), { + processTimers(timeMode, processorHandle), { // Note: `timeoutLatencyMs` also includes the time the parent operator took for // processing output returned through iterator. timeoutLatencyMs += NANOSECONDS.toMillis(System.nanoTime - timeoutProcessingStartTimeNs) @@ -297,8 +296,7 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver - validateTTLMode() - validateTimeoutMode() + validateTimeMode() if (hasInitialState) { val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) @@ -413,11 +411,11 @@ case class TransformWithStateExec( private def processData(store: StateStore, singleIterator: Iterator[InternalRow]): CompletionIterator[InternalRow, Iterator[InternalRow]] = { val processorHandle = new StatefulProcessorHandleImpl( - store, getStateInfo.queryRunId, keyEncoder, ttlMode, timeoutMode, + store, getStateInfo.queryRunId, keyEncoder, timeMode, isStreaming, batchTimestampMs) assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) statefulProcessor.setHandle(processorHandle) - statefulProcessor.init(outputMode, timeoutMode, ttlMode) + statefulProcessor.init(outputMode, timeMode) processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) processDataWithPartition(singleIterator, store, processorHandle) } @@ -428,10 +426,10 @@ case class TransformWithStateExec( initStateIterator: Iterator[InternalRow]): CompletionIterator[InternalRow, Iterator[InternalRow]] = { val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder, ttlMode, timeoutMode, isStreaming) + keyEncoder, timeMode, isStreaming) assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) statefulProcessor.setHandle(processorHandle) - statefulProcessor.init(outputMode, timeoutMode, ttlMode) + statefulProcessor.init(outputMode, timeMode) processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) // Check if is first batch @@ -450,27 +448,16 @@ case class TransformWithStateExec( processDataWithPartition(childDataIterator, store, processorHandle) } - private def validateTimeoutMode(): Unit = { - timeoutMode match { + private def validateTimeMode(): Unit = { + timeMode match { case ProcessingTime => if (batchTimestampMs.isEmpty) { - StateStoreErrors.missingTimeoutValues(timeoutMode.toString) + StateStoreErrors.missingTimeValues(timeMode.toString) } case EventTime => if (eventTimeWatermarkForEviction.isEmpty) { - StateStoreErrors.missingTimeoutValues(timeoutMode.toString) - } - - case _ => - } - } - - private def validateTTLMode(): Unit = { - ttlMode match { - case ProcessingTimeTTL => - if (batchTimestampMs.isEmpty) { - StateStoreErrors.missingTTLValues(timeoutMode.toString) + StateStoreErrors.missingTimeValues(timeMode.toString) } case _ => @@ -488,8 +475,7 @@ object TransformWithStateExec { groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], statefulProcessor: StatefulProcessor[Any, Any, Any], - ttlMode: TTLMode, - timeoutMode: TimeoutMode, + timeMode: TimeMode, outputMode: OutputMode, keyEncoder: ExpressionEncoder[Any], outputObjAttr: Attribute, @@ -514,8 +500,7 @@ object TransformWithStateExec { groupingAttributes, dataAttributes, statefulProcessor, - ttlMode, - timeoutMode, + timeMode, outputMode, keyEncoder, outputObjAttr, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 6c63aa94e75b..b8ab32a00851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -32,16 +32,9 @@ object StateStoreErrors { ) } - def missingTimeoutValues(timeoutMode: String): SparkException = { + def missingTimeValues(timeMode: String): SparkException = { SparkException.internalError( - msg = s"Failed to find timeout values for timeoutMode=$timeoutMode", - category = "TWS" - ) - } - - def missingTTLValues(ttlMode: String): SparkException = { - SparkException.internalError( - msg = s"Failed to find timeout values for ttlMode=$ttlMode", + msg = s"Failed to find time values for timeMode=$timeMode", category = "TWS" ) } @@ -108,10 +101,10 @@ object StateStoreErrors { new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName) } - def cannotPerformOperationWithInvalidTimeoutMode( + def cannotPerformOperationWithInvalidTimeMode( operationType: String, - timeoutMode: String): StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode = { - new StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode(operationType, timeoutMode) + timeMode: String): StatefulProcessorCannotPerformOperationWithInvalidTimeMode = { + new StatefulProcessorCannotPerformOperationWithInvalidTimeMode(operationType, timeMode) } def cannotPerformOperationWithInvalidHandleState( @@ -125,9 +118,9 @@ object StateStoreErrors { new StatefulProcessorCannotReInitializeState(groupingKey) } - def cannotProvideTTLConfigForNoTTLMode(stateName: String): - StatefulProcessorCannotAssignTTLInNoTTLMode = { - new StatefulProcessorCannotAssignTTLInNoTTLMode(stateName) + def cannotProvideTTLConfigForTimeMode(stateName: String, timeMode: String): + StatefulProcessorCannotAssignTTLInTimeMode = { + new StatefulProcessorCannotAssignTTLInTimeMode(stateName, timeMode) } def ttlMustBePositive(operationType: String, @@ -163,12 +156,12 @@ class StateStoreUnsupportedOperationException(operationType: String, entity: Str messageParameters = Map("operationType" -> operationType, "entity" -> entity) ) -class StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode( +class StatefulProcessorCannotPerformOperationWithInvalidTimeMode( operationType: String, - timeoutMode: String) + timeMode: String) extends SparkUnsupportedOperationException( - errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE", - messageParameters = Map("operationType" -> operationType, "timeoutMode" -> timeoutMode) + errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE", + messageParameters = Map("operationType" -> operationType, "timeMode" -> timeMode) ) class StatefulProcessorCannotPerformOperationWithInvalidHandleState( @@ -210,10 +203,10 @@ class StateStoreNullTypeOrderingColsNotSupported(fieldName: String, index: Strin errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED", messageParameters = Map("fieldName" -> fieldName, "index" -> index)) -class StatefulProcessorCannotAssignTTLInNoTTLMode(stateName: String) +class StatefulProcessorCannotAssignTTLInTimeMode(stateName: String, timeMode: String) extends SparkUnsupportedOperationException( - errorClass = "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE", - messageParameters = Map("stateName" -> stateName)) + errorClass = "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL", + messageParameters = Map("stateName" -> stateName, "timeMode" -> timeMode)) class StatefulProcessorTTLMustBePositive( operationType: String, diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index f9f075f4468d..5d7ae477e089 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -206,8 +206,7 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> transformWithStateMapped = grouped.transformWithState( new TestStatefulProcessorWithInitialState(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append(), kvInitStateMappedDS, Encoders.STRING(), @@ -362,8 +361,7 @@ public class JavaDatasetSuite implements Serializable { StatefulProcessor<Integer, String, String> testStatefulProcessor = new TestStatefulProcessor(); Dataset<String> transformWithStateMapped = grouped.transformWithState( testStatefulProcessor, - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append(), Encoders.STRING()); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java index c6d705af5f2d..e53e977da149 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java @@ -38,8 +38,7 @@ public class TestStatefulProcessor extends StatefulProcessor<Integer, String, St @Override public void init( OutputMode outputMode, - TimeoutMode timeoutMode, - TTLMode ttlMode) { + TimeMode timeMode) { countState = this.getHandle().getValueState("countState", Encoders.LONG()); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java index db0b222145c4..bfa542e81e35 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java @@ -37,8 +37,7 @@ public class TestStatefulProcessorWithInitialState @Override public void init( OutputMode outputMode, - TimeoutMode timeoutMode, - TTLMode ttlMode) { + TimeMode timeMode) { testState = this.getHandle().getValueState("testState", Encoders.STRING()); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala index 51cfc1548b39..5eb48a86e342 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl} -import org.apache.spark.sql.streaming.{ListState, TimeoutMode, TTLMode, ValueState} +import org.apache.spark.sql.streaming.{ListState, TimeMode, ValueState} /** * Class that adds unit tests for ListState types used in arbitrary stateful @@ -37,8 +37,7 @@ class ListStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val listState: ListState[Long] = handle.getListState[Long]("listState", Encoders.scalaLong) @@ -71,8 +70,7 @@ class ListStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ListState[Long] = handle.getListState[Long]("testState", Encoders.scalaLong) ImplicitGroupingKeyTracker.setImplicitKey("test_key") @@ -100,8 +98,7 @@ class ListStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState1: ListState[Long] = handle.getListState[Long]("testState1", Encoders.scalaLong) val testState2: ListState[Long] = handle.getListState[Long]("testState2", Encoders.scalaLong) @@ -139,8 +136,7 @@ class ListStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val listState1: ListState[Long] = handle.getListState[Long]("listState1", Encoders.scalaLong) val listState2: ListState[Long] = handle.getListState[Long]("listState2", Encoders.scalaLong) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala index 7fa41b12795e..572fc2429273 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala @@ -22,7 +22,7 @@ import java.util.UUID import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl} -import org.apache.spark.sql.streaming.{ListState, MapState, TimeoutMode, TTLMode, ValueState} +import org.apache.spark.sql.streaming.{ListState, MapState, TimeMode, ValueState} import org.apache.spark.sql.types.{BinaryType, StructType} /** @@ -39,8 +39,7 @@ class MapStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: MapState[String, Double] = handle.getMapState[String, Double]("testState", Encoders.STRING, Encoders.scalaDouble) @@ -74,8 +73,7 @@ class MapStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState1: MapState[Long, Double] = handle.getMapState[Long, Double]("testState1", Encoders.scalaLong, Encoders.scalaDouble) @@ -114,8 +112,7 @@ class MapStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val mapTestState1: MapState[String, Int] = handle.getMapState[String, Int]("mapTestState1", Encoders.STRING, Encoders.scalaInt) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala index a32b4111eae8..e9ffe4ca9269 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} -import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode} +import org.apache.spark.sql.streaming.{TimeMode, TTLConfig} /** @@ -36,21 +36,21 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { private def keyExprEncoder: ExpressionEncoder[Any] = Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] - private def getTimeoutMode(timeoutMode: String): TimeoutMode = { - timeoutMode match { - case "NoTimeouts" => TimeoutMode.NoTimeouts() - case "ProcessingTime" => TimeoutMode.ProcessingTime() - case "EventTime" => TimeoutMode.EventTime() - case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") + private def getTimeMode(timeMode: String): TimeMode = { + timeMode match { + case "None" => TimeMode.None() + case "ProcessingTime" => TimeMode.ProcessingTime() + case "EventTime" => TimeMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeMode=$timeMode") } } - Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => - test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + Seq("None", "ProcessingTime", "EventTime").foreach { timeMode => + test(s"value state creation with timeMode=$timeMode should succeed") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), getTimeoutMode(timeoutMode)) + UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode)) assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) handle.getValueState[Long]("testState", Encoders.scalaLong) } @@ -85,13 +85,13 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { handle.registerTimer(1000L) } - Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => - test(s"value state creation with timeoutMode=$timeoutMode " + + Seq("None", "ProcessingTime", "EventTime").foreach { timeMode => + test(s"value state creation with timeMode=$timeMode " + "and invalid state should fail") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), getTimeoutMode(timeoutMode)) + UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode)) Seq(StatefulProcessorHandleState.INITIALIZED, StatefulProcessorHandleState.DATA_PROCESSED, @@ -105,21 +105,21 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { } } - test("registering processing/event time timeouts with NoTimeout mode should fail") { + test("registering processing/event time timeouts with None timeMode should fail") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + UUID.randomUUID(), keyExprEncoder, TimeMode.None()) val ex = intercept[SparkUnsupportedOperationException] { handle.registerTimer(10000L) } checkError( ex, - errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE", + errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE", parameters = Map( "operationType" -> "register_timer", - "timeoutMode" -> TimeoutMode.NoTimeouts().toString + "timeMode" -> TimeMode.None().toString ), matchPVals = true ) @@ -130,22 +130,22 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { checkError( ex2, - errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE", + errorClass = "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE", parameters = Map( "operationType" -> "delete_timer", - "timeoutMode" -> TimeoutMode.NoTimeouts().toString + "timeMode" -> TimeMode.None().toString ), matchPVals = true ) } } - Seq("ProcessingTime", "EventTime").foreach { timeoutMode => - test(s"registering timeouts with timeoutMode=$timeoutMode should succeed") { + Seq("ProcessingTime", "EventTime").foreach { timeMode => + test(s"registering timeouts with timeMode=$timeMode should succeed") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), getTimeoutMode(timeoutMode)) + UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode)) handle.setHandleState(StatefulProcessorHandleState.INITIALIZED) assert(handle.getHandleState === StatefulProcessorHandleState.INITIALIZED) @@ -161,12 +161,12 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { } } - Seq("ProcessingTime", "EventTime").foreach { timeoutMode => - test(s"verify listing of registered timers with timeoutMode=$timeoutMode") { + Seq("ProcessingTime", "EventTime").foreach { timeMode => + test(s"verify listing of registered timers with timeMode=$timeMode") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), getTimeoutMode(timeoutMode)) + UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode)) handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED) assert(handle.getHandleState === StatefulProcessorHandleState.DATA_PROCESSED) @@ -201,12 +201,12 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { } } - Seq("ProcessingTime", "EventTime").foreach { timeoutMode => - test(s"registering timeouts with timeoutMode=$timeoutMode and invalid state should fail") { + Seq("ProcessingTime", "EventTime").foreach { timeMode => + test(s"registering timeouts with timeMode=$timeMode and invalid state should fail") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), getTimeoutMode(timeoutMode)) + UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode)) Seq(StatefulProcessorHandleState.CREATED, StatefulProcessorHandleState.TIMER_PROCESSED, @@ -219,11 +219,11 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { } } - test(s"ttl States are populated for ttlMode=ProcessingTime") { + test(s"ttl States are populated for timeMode=ProcessingTime") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(), + UUID.randomUUID(), keyExprEncoder, TimeMode.ProcessingTime(), batchTimestampMs = Some(10)) val valueStateWithTTL = handle.getValueState("testState", @@ -237,11 +237,11 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { } } - test(s"ttl States are not populated for ttlMode=NoTTL") { + test(s"ttl States are not populated for timeMode=None") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + UUID.randomUUID(), keyExprEncoder, TimeMode.None()) handle.getValueState("testState", Encoders.STRING) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala index 1af33aa7b5ad..0bf160d8b321 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala @@ -20,31 +20,31 @@ package org.apache.spark.sql.execution.streaming.state import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, TimerStateImpl} -import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.streaming.TimeMode /** * Class that adds unit tests for Timer State used in arbitrary stateful * operators such as transformWithState */ class TimerSuite extends StateVariableSuiteBase { - private def testWithTimeOutMode(testName: String) - (testFunc: TimeoutMode => Unit): Unit = { + private def testWithTimeMode(testName: String) + (testFunc: TimeMode => Unit): Unit = { Seq("Processing", "Event").foreach { timeoutMode => test(s"$timeoutMode timer - " + testName) { timeoutMode match { - case "Processing" => testFunc(TimeoutMode.ProcessingTime()) - case "Event" => testFunc(TimeoutMode.EventTime()) + case "Processing" => testFunc(TimeMode.ProcessingTime()) + case "Event" => testFunc(TimeMode.EventTime()) } } } } - testWithTimeOutMode("single instance with single key") { timeoutMode => + testWithTimeMode("single instance with single key") { timeMode => tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) ImplicitGroupingKeyTracker.setImplicitKey("test_key") - val timerState = new TimerStateImpl(store, timeoutMode, + val timerState = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) timerState.registerTimer(1L * 1000) assert(timerState.listTimers().toSet === Set(1000L)) @@ -58,14 +58,14 @@ class TimerSuite extends StateVariableSuiteBase { } } - testWithTimeOutMode("multiple instances with single key") { timeoutMode => + testWithTimeMode("multiple instances with single key") { timeMode => tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) ImplicitGroupingKeyTracker.setImplicitKey("test_key") - val timerState1 = new TimerStateImpl(store, timeoutMode, + val timerState1 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) - val timerState2 = new TimerStateImpl(store, timeoutMode, + val timerState2 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) timerState1.registerTimer(1L * 1000) timerState2.registerTimer(15L * 1000) @@ -83,12 +83,12 @@ class TimerSuite extends StateVariableSuiteBase { } } - testWithTimeOutMode("multiple instances with multiple keys") { timeoutMode => + testWithTimeMode("multiple instances with multiple keys") { timeMode => tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) ImplicitGroupingKeyTracker.setImplicitKey("test_key1") - val timerState1 = new TimerStateImpl(store, timeoutMode, + val timerState1 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) timerState1.registerTimer(1L * 1000) timerState1.registerTimer(2L * 1000) @@ -96,7 +96,7 @@ class TimerSuite extends StateVariableSuiteBase { ImplicitGroupingKeyTracker.removeImplicitKey() ImplicitGroupingKeyTracker.setImplicitKey("test_key2") - val timerState2 = new TimerStateImpl(store, timeoutMode, + val timerState2 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) timerState2.registerTimer(15L * 1000) ImplicitGroupingKeyTracker.removeImplicitKey() @@ -115,13 +115,13 @@ class TimerSuite extends StateVariableSuiteBase { } } - testWithTimeOutMode("Range scan on second index timer key - " + - "verify timestamp is sorted for single instance") { timeoutMode => + testWithTimeMode("Range scan on second index timer key - " + + "verify timestamp is sorted for single instance") { timeMode => tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) ImplicitGroupingKeyTracker.setImplicitKey("test_key") - val timerState = new TimerStateImpl(store, timeoutMode, + val timerState = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) val timerTimerstamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) // register/put unordered timestamp into rocksDB @@ -134,25 +134,25 @@ class TimerSuite extends StateVariableSuiteBase { } } - testWithTimeOutMode("test range scan on second index timer key - " + - "verify timestamp is sorted for multiple instances") { timeoutMode => + testWithTimeMode("test range scan on second index timer key - " + + "verify timestamp is sorted for multiple instances") { timeMode => tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) ImplicitGroupingKeyTracker.setImplicitKey("test_key1") - val timerState1 = new TimerStateImpl(store, timeoutMode, + val timerState1 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, 1L) timerTimestamps1.foreach(timerState1.registerTimer) - val timerState2 = new TimerStateImpl(store, timeoutMode, + val timerState2 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) val timerTimestamps2 = Seq(931L, 8000L, 452300L, 4200L) timerTimestamps2.foreach(timerState2.registerTimer) ImplicitGroupingKeyTracker.removeImplicitKey() ImplicitGroupingKeyTracker.setImplicitKey("test_key3") - val timerState3 = new TimerStateImpl(store, timeoutMode, + val timerState3 = new TimerStateImpl(store, timeMode, Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) val timerTimerStamps3 = Seq(1L, 2L, 8L, 3L) timerTimerStamps3.foreach(timerState3.registerTimer) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 102164d9c15f..d2747e2976f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode, ValueState} +import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -49,8 +49,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val stateName = "testState" val testState: ValueState[Long] = handle.getValueState[Long]("testState", Encoders.scalaLong) @@ -94,8 +93,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ValueState[Long] = handle.getValueState[Long]("testState", Encoders.scalaLong) ImplicitGroupingKeyTracker.setImplicitKey("test_key") @@ -121,8 +119,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState1: ValueState[Long] = handle.getValueState[Long]( "testState1", Encoders.scalaLong) @@ -167,8 +164,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, - UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val cfName = "_testState" val ex = intercept[SparkUnsupportedOperationException] { @@ -208,8 +204,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ValueState[Double] = handle.getValueState[Double]("testState", Encoders.scalaDouble) @@ -235,8 +230,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ValueState[Long] = handle.getValueState[Long]("testState", Encoders.scalaLong) @@ -262,8 +256,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ValueState[TestClass] = handle.getValueState[TestClass]("testState", Encoders.product[TestClass]) @@ -289,8 +282,7 @@ class ValueStateSuite extends StateVariableSuiteBase { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.NoTTL(), TimeoutMode.NoTimeouts()) + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None()) val testState: ValueState[POJOTestClass] = handle.getValueState[POJOTestClass]("testState", Encoders.bean(classOf[POJOTestClass])) @@ -318,8 +310,7 @@ class ValueStateSuite extends StateVariableSuiteBase { val store = provider.getStore(0) val timestampMs = 10 val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), - Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(), + Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.ProcessingTime(), batchTimestampMs = Some(timestampMs)) val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1)) @@ -340,8 +331,7 @@ class ValueStateSuite extends StateVariableSuiteBase { // increment batchProcessingTime, or watermark and ensure expired value is not returned val nextBatchHandle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(), - batchTimestampMs = Some(ttlExpirationMs)) + TimeMode.ProcessingTime(), batchTimestampMs = Some(ttlExpirationMs)) val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle.getValueState[String]("testState", Encoders.STRING, ttlConfig) @@ -377,8 +367,7 @@ class ValueStateSuite extends StateVariableSuiteBase { val batchTimestampMs = 10 val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(), - batchTimestampMs = Some(batchTimestampMs)) + TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs)) Seq(null, Duration.ZERO, Duration.ofMinutes(-1)).foreach { ttlDuration => val ttlConfig = TTLConfig(ttlDuration) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala index 5ccc14ab8a77..705226d51332 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala @@ -32,8 +32,7 @@ class TestListStateProcessor override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _listState = getHandle.getListState("testListState", Encoders.STRING) } @@ -90,8 +89,7 @@ class ToggleSaveAndEmitProcessor override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _listState = getHandle.getListState("testListState", Encoders.STRING) _valueState = getHandle.getValueState("testValueState", Encoders.scalaBoolean) } @@ -141,8 +139,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update()) ( @@ -162,8 +159,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -183,8 +179,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -204,8 +199,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -225,8 +219,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -246,8 +239,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -267,8 +259,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestListStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update()) ( @@ -320,8 +311,7 @@ class TransformWithListStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new ToggleSaveAndEmitProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala index d32b9687d95f..0eafb16a5350 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala @@ -32,8 +32,7 @@ class TestMapStateProcessor override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _mapState = getHandle.getMapState("sessionState", Encoders.STRING, Encoders.STRING) } @@ -95,8 +94,7 @@ class TransformWithMapStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestMapStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) @@ -122,8 +120,7 @@ class TransformWithMapStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestMapStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -147,8 +144,7 @@ class TransformWithMapStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestMapStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -171,8 +167,7 @@ class TransformWithMapStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestMapStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append()) testStream(result, OutputMode.Append())( // Test exists() @@ -226,8 +221,7 @@ class TransformWithMapStateSuite extends StreamTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new TestMapStateProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append()) val df = result.toDF() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala index 106f228ba78b..54cff6fc44c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala @@ -38,8 +38,7 @@ abstract class StatefulProcessorWithInitialStateTestClass[V] override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _valState = getHandle.getValueState[Double]("testValueInit", Encoders.scalaDouble) _listState = getHandle.getListState[Double]("testListInit", Encoders.scalaDouble) _mapState = getHandle.getMapState[Double, Int]( @@ -171,8 +170,7 @@ class StatefulProcessorWithInitialStateProcTimerClass override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode) : Unit = { + timeMode: TimeMode) : Unit = { _countState = getHandle.getValueState[Long]("countState", Encoders.scalaLong) _timerState = getHandle.getValueState[Long]("timerState", Encoders.scalaLong) } @@ -215,8 +213,7 @@ class StatefulProcessorWithInitialStateEventTimerClass override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState", Encoders.scalaLong) _timerState = getHandle.getValueState[Long]("timerState", Encoders.scalaLong) @@ -293,7 +290,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest InputRowForInitialState("init_2", 100.0, List(100.0), Map(100.0 -> 1))) .toDS().groupByKey(x => x.key).mapValues(x => x) val query = kvDataSet.transformWithState(new InitialStateInMemoryTestClass(), - TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), initStateDf) + TimeMode.None(), OutputMode.Append(), initStateDf) testStream(query, OutputMode.Update())( // non-exist key test @@ -371,7 +368,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest val query = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new AccumulateStatefulProcessorWithInitState(), - TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), initStateDf + TimeMode.None(), OutputMode.Append(), initStateDf ) testStream(query, OutputMode.Update())( AddData(inputData, InitInputRow("init_1", "add", 50.0)), @@ -391,8 +388,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new AccumulateStatefulProcessorWithInitState(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append(), createInitialDfForTest) @@ -410,8 +406,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest val query = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new AccumulateStatefulProcessorWithInitState(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append(), initDf) @@ -443,8 +438,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest val result = inputData.toDS().groupByKey(x => x) .transformWithState( new StatefulProcessorWithInitialStateProcTimerClass(), - TimeoutMode.ProcessingTime(), - TTLMode.NoTTL(), + TimeMode.ProcessingTime(), OutputMode.Update(), initDf) @@ -488,8 +482,7 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest val result = eventTimeDf(inputData.toDS()) .transformWithState( new StatefulProcessorWithInitialStateEventTimerClass(), - TimeoutMode.EventTime(), - TTLMode.NoTTL(), + TimeMode.EventTime(), OutputMode.Update(), initDf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 735c53bf3c91..7dec14d3e435 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -40,8 +40,7 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _countState = getHandle.getValueState[Long]("countState", Encoders.scalaLong) } @@ -104,9 +103,8 @@ class RunningCountStatefulProcessorWithProcTimeTimerUpdates override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode) : Unit = { - super.init(outputMode, timeoutMode, ttlMode) + timeMode: TimeMode) : Unit = { + super.init(outputMode, timeMode) _timerState = getHandle.getValueState[Long]("timerState", Encoders.scalaLong) } @@ -196,8 +194,7 @@ class MaxEventTimeStatefulProcessor override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState", Encoders.scalaLong) _timerState = getHandle.getValueState[Long]("timerState", Encoders.scalaLong) @@ -242,8 +239,7 @@ class RunningCountMostRecentStatefulProcessor override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _countState = getHandle.getValueState[Long]("countState", Encoders.scalaLong) _mostRecent = getHandle.getValueState[String]("mostRecent", Encoders.STRING) } @@ -273,8 +269,7 @@ class MostRecentStatefulProcessorWithDeletion override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { getHandle.deleteIfExists("countState") _mostRecent = getHandle.getValueState[String]("mostRecent", Encoders.STRING) } @@ -327,8 +322,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessorWithError(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -349,8 +343,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -380,8 +373,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), - TimeoutMode.ProcessingTime(), - TTLMode.NoTTL(), + TimeMode.ProcessingTime(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -424,8 +416,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .groupByKey(x => x) .transformWithState( new RunningCountStatefulProcessorWithProcTimeTimerUpdates(), - TimeoutMode.ProcessingTime(), - TTLMode.NoTTL(), + TimeMode.ProcessingTime(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -461,8 +452,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .groupByKey(x => x) .transformWithState( new RunningCountStatefulProcessorWithMultipleTimers(), - TimeoutMode.ProcessingTime(), - TTLMode.NoTTL(), + TimeMode.ProcessingTime(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -497,8 +487,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .groupByKey(_._1) .transformWithState( new MaxEventTimeStatefulProcessor(), - TimeoutMode.EventTime(), - TTLMode.NoTTL(), + TimeMode.EventTime(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -539,8 +528,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Append()) val df = result.toDF() @@ -558,15 +546,13 @@ class TransformWithStateSuite extends StateStoreMetricsTest val stream1 = inputData.toDS() .groupByKey(x => x._1) .transformWithState(new RunningCountMostRecentStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) val stream2 = inputData.toDS() .groupByKey(x => x._1) .transformWithState(new MostRecentStatefulProcessorWithDeletion(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(stream1, OutputMode.Update())( @@ -598,8 +584,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .union(inputData2.toDS()) .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -632,8 +617,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .union(inputData3.toDS()) .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -666,8 +650,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .union(inputData2.toDS().map(_.toString)) .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -697,8 +680,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest .select("value").as[String] .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) } @@ -790,8 +772,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest { val result = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessor(), - TimeoutMode.NoTimeouts(), - TTLMode.NoTTL(), + TimeMode.None(), OutputMode.Update()) testStream(result, OutputMode.Update())( @@ -810,7 +791,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest { val result = inputData.toDS() .groupByKey(x => x.key) .transformWithState(new AccumulateStatefulProcessorWithInitState(), - TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), initDf + TimeMode.None(), OutputMode.Append(), initDf ) testStream(result, OutputMode.Update())( AddData(inputData, InitInputRow("a", "add", -1.0)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 759d535c18a3..e6dd0ace766a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -98,8 +98,7 @@ class ValueStateTTLProcessor(ttlConfig: TTLConfig) override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _valueState = getHandle .getValueState("valueState", Encoders.scalaInt, ttlConfig) .asInstanceOf[ValueStateImplWithTTL[Int]] @@ -135,8 +134,7 @@ case class MultipleValueStatesTTLProcessor( override def init( outputMode: OutputMode, - timeoutMode: TimeoutMode, - ttlMode: TTLMode): Unit = { + timeMode: TimeMode): Unit = { _valueStateWithTTL = getHandle .getValueState("valueState", Encoders.scalaInt, ttlConfig) .asInstanceOf[ValueStateImplWithTTL[Int]] @@ -191,8 +189,7 @@ class TransformWithValueStateTTLSuite .groupByKey(x => x.key) .transformWithState( new ValueStateTTLProcessor(ttlConfig), - TimeoutMode.NoTimeouts(), - TTLMode.ProcessingTimeTTL(), + TimeMode.ProcessingTime(), OutputMode.Append()) val clock = new StreamManualClock @@ -258,8 +255,7 @@ class TransformWithValueStateTTLSuite .groupByKey(x => x.key) .transformWithState( new ValueStateTTLProcessor(ttlConfig), - TimeoutMode.NoTimeouts(), - TTLMode.ProcessingTimeTTL(), + TimeMode.ProcessingTime(), OutputMode.Append()) val clock = new StreamManualClock @@ -321,8 +317,7 @@ class TransformWithValueStateTTLSuite .groupByKey(x => x.key) .transformWithState( new ValueStateTTLProcessor(ttlConfig), - TimeoutMode.NoTimeouts(), - TTLMode.ProcessingTimeTTL(), + TimeMode.ProcessingTime(), OutputMode.Append()) val clock = new StreamManualClock @@ -375,8 +370,7 @@ class TransformWithValueStateTTLSuite .groupByKey(x => x.key) .transformWithState( MultipleValueStatesTTLProcessor(ttlKey, noTtlKey, ttlConfig), - TimeoutMode.NoTimeouts(), - TTLMode.ProcessingTimeTTL(), + TimeMode.ProcessingTime(), OutputMode.Append()) val clock = new StreamManualClock @@ -430,8 +424,7 @@ class TransformWithValueStateTTLSuite .groupByKey(x => x.key) .transformWithState( new ValueStateTTLProcessor(ttlConfig), - TimeoutMode.NoTimeouts(), - TTLMode.ProcessingTimeTTL(), + TimeMode.ProcessingTime(), OutputMode.Append()) val clock = new StreamManualClock --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org