This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ffdbc65029a [SPARK-47784][SS] Merge TTLMode and TimeoutMode into a 
single TimeMode
9ffdbc65029a is described below

commit 9ffdbc65029a08ce621ca50037683db05dc55761
Author: Bhuwan Sahni <bhuwan.sa...@databricks.com>
AuthorDate: Fri Apr 12 13:47:07 2024 +0900

    [SPARK-47784][SS] Merge TTLMode and TimeoutMode into a single TimeMode
    
    ### What changes were proposed in this pull request?
    
    This PR merges the `TimeoutMode` and `TTLMode` parameter for 
`transformWithState` into a single `TimeMode`. Currently, users need to specify 
the notion of time (ProcessingTime/EventTime) for timers and ttl separately. 
This allows users to use a single parameter.
    
    We do not expect users to use mix/match EventTime/ProcessingTime for timers 
and ttl in a single query because it makes hard to reason about the time 
semantics (when will timer be fired?, when will the state be evicted? etc.). 
Its simpler to stick to one notion of time throughout timers and ttl.
    
    ### Why are the changes needed?
    
    Changes are needed to simplify Arbitrary State API `transformWithState` 
interface by merging TTLMode/TimeoutMode into a single TimeMode.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR changes the API parameters for `transformWithState`.
    
    ### How was this patch tested?
    
    All existing testcases for `transformWithState` API pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45960 from sahnib/introduce-timeMode.
    
    Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    | 16 +++---
 .../apache/spark/sql/KeyValueGroupedDataset.scala  | 38 +++++--------
 dev/checkstyle-suppressions.xml                    |  4 +-
 docs/sql-error-conditions.md                       | 16 +++---
 .../sql/streaming/{TTLMode.java => TimeMode.java}  | 34 ++++++-----
 .../apache/spark/sql/streaming/TimeoutMode.java    | 51 -----------------
 .../logical/{TTLMode.scala => TimeMode.scala}      | 10 ++--
 .../logical/TransformWithStateTimeoutModes.scala   | 24 --------
 .../spark/sql/streaming/StatefulProcessor.scala    |  5 +-
 .../spark/sql/catalyst/plans/logical/object.scala  | 17 ++----
 .../apache/spark/sql/KeyValueGroupedDataset.scala  | 36 +++++-------
 .../spark/sql/execution/SparkStrategies.scala      |  9 ++-
 .../execution/streaming/ExpiredTimerInfoImpl.scala |  4 +-
 .../streaming/StatefulProcessorHandleImpl.scala    | 17 +++---
 .../sql/execution/streaming/TimerStateImpl.scala   | 14 ++---
 .../streaming/TransformWithStateExec.scala         | 65 +++++++++-------------
 .../streaming/state/StateStoreErrors.scala         | 37 +++++-------
 .../org/apache/spark/sql/JavaDatasetSuite.java     |  6 +-
 .../apache/spark/sql/TestStatefulProcessor.java    |  3 +-
 .../sql/TestStatefulProcessorWithInitialState.java |  3 +-
 .../execution/streaming/state/ListStateSuite.scala | 14 ++---
 .../execution/streaming/state/MapStateSuite.scala  | 11 ++--
 .../state/StatefulProcessorHandleSuite.scala       | 64 ++++++++++-----------
 .../sql/execution/streaming/state/TimerSuite.scala | 42 +++++++-------
 .../streaming/state/ValueStateSuite.scala          | 35 ++++--------
 .../streaming/TransformWithListStateSuite.scala    | 30 ++++------
 .../sql/streaming/TransformWithMapStateSuite.scala | 18 ++----
 .../TransformWithStateInitialStateSuite.scala      | 25 +++------
 .../sql/streaming/TransformWithStateSuite.scala    | 61 +++++++-------------
 .../TransformWithValueStateTTLSuite.scala          | 21 +++----
 30 files changed, 267 insertions(+), 463 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 62581116000b..7b13fa4278e4 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3591,21 +3591,15 @@
     ],
     "sqlState" : "0A000"
   },
-  "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE" : {
-    "message" : [
-      "Cannot use TTL for state=<stateName> in NoTTL() mode."
-    ],
-    "sqlState" : "42802"
-  },
   "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : {
     "message" : [
       "Failed to perform stateful processor operation=<operationType> with 
invalid handle state=<handleState>."
     ],
     "sqlState" : "42802"
   },
-  "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE" : {
+  "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE" : {
     "message" : [
-      "Failed to perform stateful processor operation=<operationType> with 
invalid timeoutMode=<timeoutMode>"
+      "Failed to perform stateful processor operation=<operationType> with 
invalid timeMode=<timeMode>"
     ],
     "sqlState" : "42802"
   },
@@ -3615,6 +3609,12 @@
     ],
     "sqlState" : "42802"
   },
+  "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL" : {
+    "message" : [
+      "Cannot use TTL for state=<stateName> in timeMode=<timeMode>, use 
TimeMode.ProcessingTime() instead."
+    ],
+    "sqlState" : "42802"
+  },
   "STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE" : {
     "message" : [
       "TTL duration must be greater than zero for State store 
operation=<operationType> on state=<stateName>."
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 39e0c429046d..e38adb9b0b27 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder
 import org.apache.spark.sql.connect.common.UdfUtils
 import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
 import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode, 
TTLMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode}
 
 /**
  * A [[Dataset]] has been logically grouped by a user specified grouping key. 
Users should not
@@ -827,17 +827,14 @@ class KeyValueGroupedDataset[K, V] private[sql] () 
extends Serializable {
    *   The type of the output objects. Must be encodable to Spark SQL types.
    * @param statefulProcessor
    *   Instance of statefulProcessor whose functions will be invoked by the 
operator.
-   * @param timeoutMode
-   *   The timeout mode of the stateful processor.
-   * @param ttlMode
-   *   The ttlMode to evict user state on ttl expiration.
+   * @param timeMode
+   *   The time mode semantics of the stateful processor for timers and TTL.
    * @param outputMode
    *   The output mode of the stateful processor.
    */
   def transformWithState[U: Encoder](
       statefulProcessor: StatefulProcessor[K, V, U],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode): Dataset[U] = {
     throw new UnsupportedOperationException
   }
@@ -854,10 +851,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    *   The type of the output objects. Must be encodable to Spark SQL types.
    * @param statefulProcessor
    *   Instance of statefulProcessor whose functions will be invoked by the 
operator.
-   * @param timeoutMode
-   *   The timeout mode of the stateful processor.
-   * @param ttlMode
-   *   The ttlMode to evict user state on ttl expiration.
+   * @param timeMode
+   *   The time mode semantics of the stateful processor for timers and TTL.
    * @param outputMode
    *   The output mode of the stateful processor.
    * @param outputEncoder
@@ -865,8 +860,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    */
   def transformWithState[U: Encoder](
       statefulProcessor: StatefulProcessor[K, V, U],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       outputEncoder: Encoder[U]): Dataset[U] = {
     throw new UnsupportedOperationException
@@ -883,10 +877,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    *   The type of initial state objects. Must be encodable to Spark SQL types.
    * @param statefulProcessor
    *   Instance of statefulProcessor whose functions will be invoked by the 
operator.
-   * @param timeoutMode
-   *   The timeout mode of the stateful processor.
-   * @param ttlMode
-   *   The ttlMode to evict user state on ttl expiration.
+   * @param timeMode
+   *   The time mode semantics of the stateful processor for timers and TTL.
    * @param outputMode
    *   The output mode of the stateful processor.
    * @param initialState
@@ -897,8 +889,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    */
   def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
     throw new UnsupportedOperationException
@@ -915,10 +906,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    *   The type of initial state objects. Must be encodable to Spark SQL types.
    * @param statefulProcessor
    *   Instance of statefulProcessor whose functions will be invoked by the 
operator.
-   * @param timeoutMode
-   *   The timeout mode of the stateful processor.
-   * @param ttlMode
-   *   The ttlMode to evict user state on ttl expiration
+   * @param timeMode
+   *   The time mode semantics of the stateful processor for timers and TTL.
    * @param outputMode
    *   The output mode of the stateful processor.
    * @param initialState
@@ -933,8 +922,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       initialState: KeyValueGroupedDataset[K, S],
       outputEncoder: Encoder[U],
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index 94dfe20af56e..834265f48aa8 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -57,11 +57,9 @@
     <suppress checks="MethodName"
               
files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
     <suppress checks="MethodName"
-              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java"/>
+              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java"/>
     <suppress checks="MethodName"
               
files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
-    <suppress checks="MethodName"
-              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java"/>
     <suppress checks="LineLength"
               files="src/main/java/org/apache/spark/sql/api/java/*"/>
     <suppress checks="IllegalImport"
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 827ee04b7606..3f9ae72a50a6 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2185,23 +2185,17 @@ The SQL config `<sqlConf>` cannot be found. Please 
verify that the config exists
 
 Star (*) is not allowed in a select list when GROUP BY an ordinal position is 
used.
 
-### STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE
-
-[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
-
-Cannot use TTL for state=`<stateName>` in NoTTL() mode.
-
 ### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
 
 Failed to perform stateful processor operation=`<operationType>` with invalid 
handle state=`<handleState>`.
 
-### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE
+### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
 
-Failed to perform stateful processor operation=`<operationType>` with invalid 
timeoutMode=`<timeoutMode>`
+Failed to perform stateful processor operation=`<operationType>` with invalid 
timeMode=`<timeMode>`
 
 ### STATEFUL_PROCESSOR_CANNOT_REINITIALIZE_STATE_ON_KEY
 
@@ -2209,6 +2203,12 @@ Failed to perform stateful processor 
operation=`<operationType>` with invalid ti
 
 Cannot re-initialize state on the same grouping key during initial state 
handling for stateful processor. Invalid grouping key=`<groupingKey>`.
 
+### STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot use TTL for state=`<stateName>` in timeMode=`<timeMode>`, use 
TimeMode.ProcessingTime() instead.
+
 ### STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java 
b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
similarity index 51%
rename from sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
rename to sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
index 30594770b3e1..a45a31bd1a05 100644
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
@@ -19,24 +19,32 @@ package org.apache.spark.sql.streaming;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.catalyst.plans.logical.*;
+import org.apache.spark.sql.catalyst.plans.logical.EventTime$;
+import org.apache.spark.sql.catalyst.plans.logical.NoTime$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$;
 
 /**
- * Represents the type of ttl modes possible for the Dataset operations
- * {@code transformWithState}.
+ * Represents the time modes (used for specifying timers and ttl) possible for
+ * the Dataset operations {@code transformWithState}.
  */
 @Experimental
 @Evolving
-public class TTLMode {
+public class TimeMode {
 
-  /**
-   * Specifies that there is no TTL for the user state. User state would not
-   * be cleaned up by Spark automatically.
-   */
-  public static final TTLMode NoTTL() { return NoTTL$.MODULE$; }
+    /**
+     * Neither timers nor ttl is supported in this mode.
+     */
+    public static final TimeMode None() { return NoTime$.MODULE$; }
 
-  /**
-   * Specifies that all ttl durations for user state are in processing time.
-   */
-  public static final TTLMode ProcessingTimeTTL() { return 
ProcessingTimeTTL$.MODULE$; }
+    /**
+     * Stateful processor that uses query processing time to register timers 
and
+     * calculate ttl expiration.
+     */
+    public static final TimeMode ProcessingTime() { return 
ProcessingTime$.MODULE$; }
+
+    /**
+     * Stateful processor that uses event time to register timers. Note that 
ttl is not
+     * supported in this TimeMode.
+     */
+    public static final TimeMode EventTime() { return EventTime$.MODULE$; }
 }
diff --git 
a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java 
b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java
deleted file mode 100644
index 68b8134cda6c..000000000000
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.catalyst.plans.logical.*;
-
-/**
- * Represents the type of timeouts possible for the Dataset operations
- * {@code transformWithState}.
- */
-@Experimental
-@Evolving
-public class TimeoutMode {
-  /**
-   * Stateful processor that does not register timers
-   */
-  public static final TimeoutMode NoTimeouts() {
-    return NoTimeouts$.MODULE$;
-  }
-
-  /**
-   * Stateful processor that only registers processing time based timers
-   */
-  public static final TimeoutMode ProcessingTime() {
-    return ProcessingTime$.MODULE$;
-  }
-
-  /**
-   * Stateful processor that only registers event time based timers
-   */
-  public static final TimeoutMode EventTime() {
-    return EventTime$.MODULE$;
-  }
-}
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
similarity index 79%
rename from 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
index be4794a5f40b..b6248e97aa3d 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
@@ -16,9 +16,11 @@
  */
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.streaming.TTLMode
+import org.apache.spark.sql.streaming.TimeMode
 
-/** TTL types used in tranformWithState operator */
-case object NoTTL extends TTLMode
+/** TimeMode types used in transformWithState operator */
+case object NoTime extends TimeMode
 
-case object ProcessingTimeTTL extends TTLMode
+case object ProcessingTime extends TimeMode
+
+case object EventTime extends TimeMode
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
deleted file mode 100644
index e420f7821b50..000000000000
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.spark.sql.streaming.TimeoutMode
-
-/** Types of timeouts used in transformWithState operator */
-case object NoTimeouts extends TimeoutMode
-case object ProcessingTime extends TimeoutMode
-case object EventTime extends TimeoutMode
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
index 70f9cdfa399a..54e6a9a4ab67 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
@@ -40,12 +40,11 @@ private[sql] abstract class StatefulProcessor[K, I, O] 
extends Serializable {
    * Function that will be invoked as the first method that allows for users to
    * initialize all their state variables and perform other init actions 
before handling data.
    * @param outputMode - output mode for the stateful processor
-   * @param timeoutMode - timeout mode for the stateful processor
+   * @param timeMode - time mode for the stateful processor.
    */
   def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit
+      timeMode: TimeMode): Unit
 
   /**
    * Function that will allow users to interact with input data rows along 
with the grouping key
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index ff7c8fb3df4b..28d52d39093b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, 
StatefulProcessor, TimeoutMode, TTLMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, 
StatefulProcessor, TimeMode}
 import org.apache.spark.sql.types._
 
 object CatalystSerde {
@@ -574,8 +574,7 @@ object TransformWithState {
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       statefulProcessor: StatefulProcessor[K, V, U],
-      ttlMode: TTLMode,
-      timeoutMode: TimeoutMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       child: LogicalPlan): LogicalPlan = {
     val keyEncoder = encoderFor[K]
@@ -585,8 +584,7 @@ object TransformWithState {
       groupingAttributes,
       dataAttributes,
       statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]],
-      ttlMode,
-      timeoutMode,
+      timeMode,
       outputMode,
       keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
       CatalystSerde.generateObjAttr[U],
@@ -607,8 +605,7 @@ object TransformWithState {
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       statefulProcessor: StatefulProcessor[K, V, U],
-      ttlMode: TTLMode,
-      timeoutMode: TimeoutMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       child: LogicalPlan,
       initialStateGroupingAttrs: Seq[Attribute],
@@ -621,8 +618,7 @@ object TransformWithState {
       groupingAttributes,
       dataAttributes,
       statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]],
-      ttlMode,
-      timeoutMode,
+      timeMode,
       outputMode,
       keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
       CatalystSerde.generateObjAttr[U],
@@ -643,8 +639,7 @@ case class TransformWithState(
     groupingAttributes: Seq[Attribute],
     dataAttributes: Seq[Attribute],
     statefulProcessor: StatefulProcessor[Any, Any, Any],
-    ttlMode: TTLMode,
-    timeoutMode: TimeoutMode,
+    timeMode: TimeMode,
     outputMode: OutputMode,
     keyEncoder: ExpressionEncoder[Any],
     outputObjAttr: Attribute,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index f3713edd0ec0..862268eba666 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.expressions.ReduceAggregator
 import org.apache.spark.sql.internal.TypedAggUtils
-import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode, 
TTLMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode}
 
 /**
  * A [[Dataset]] has been logically grouped by a user specified grouping key.  
Users should not
@@ -654,16 +654,14 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
    * @param statefulProcessor Instance of statefulProcessor whose functions 
will be invoked
    *                          by the operator.
-   * @param timeoutMode       The timeout mode of the stateful processor.
-   * @param ttlMode           The ttlMode to evict user state on ttl expiration
+   * @param timeMode          The time mode semantics of the stateful 
processor for timers and TTL.
    * @param outputMode        The output mode of the stateful processor.
    *
    * See [[Encoder]] for more details on what types are encodable to Spark SQL.
    */
   private[sql] def transformWithState[U: Encoder](
       statefulProcessor: StatefulProcessor[K, V, U],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode): Dataset[U] = {
     Dataset[U](
       sparkSession,
@@ -671,8 +669,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
         groupingAttributes,
         dataAttributes,
         statefulProcessor,
-        ttlMode,
-        timeoutMode,
+        timeMode,
         outputMode,
         child = logicalPlan
       )
@@ -691,8 +688,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
    * @param statefulProcessor Instance of statefulProcessor whose functions 
will be invoked by the
    *                          operator.
-   * @param timeoutMode The timeout mode of the stateful processor.
-   * @param ttlMode The ttlMode to evict user state on ttl expiration
+   * @param timeMode The time mode semantics of the stateful processor for 
timers and TTL.
    * @param outputMode The output mode of the stateful processor.
    * @param outputEncoder Encoder for the output type.
    *
@@ -700,11 +696,10 @@ class KeyValueGroupedDataset[K, V] private[sql](
    */
   private[sql] def transformWithState[U: Encoder](
       statefulProcessor: StatefulProcessor[K, V, U],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       outputEncoder: Encoder[U]): Dataset[U] = {
-    transformWithState(statefulProcessor, timeoutMode, ttlMode, 
outputMode)(outputEncoder)
+    transformWithState(statefulProcessor, timeMode, outputMode)(outputEncoder)
   }
 
   /**
@@ -716,8 +711,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @tparam S The type of initial state objects. Must be encodable to Spark 
SQL types.
    * @param statefulProcessor Instance of statefulProcessor whose functions 
will
    *                          be invoked by the operator.
-   * @param timeoutMode       The timeout mode of the stateful processor.
-   * @param ttlMode           The ttlMode to evict user state on ttl expiration
+   * @param timeMode          The time mode semantics of the stateful 
processor for timers and TTL.
    * @param outputMode        The output mode of the stateful processor.
    * @param initialState      User provided initial state that will be used to 
initiate state for
    *                          the query in the first batch.
@@ -726,8 +720,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
     Dataset[U](
@@ -736,8 +729,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
         groupingAttributes,
         dataAttributes,
         statefulProcessor,
-        ttlMode,
-        timeoutMode,
+        timeMode,
         outputMode,
         child = logicalPlan,
         initialState.groupingAttributes,
@@ -756,8 +748,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @tparam S The type of initial state objects. Must be encodable to Spark 
SQL types.
    * @param statefulProcessor Instance of statefulProcessor whose functions 
will
    *                          be invoked by the operator.
-   * @param timeoutMode       The timeout mode of the stateful processor.
-   * @param ttlMode           The ttlMode to evict user state on ttl expiration
+   * @param timeMode          The time mode semantics of the stateful 
processor for timers and TTL.
    * @param outputMode        The output mode of the stateful processor.
    * @param initialState      User provided initial state that will be used to 
initiate state for
    *                          the query in the first batch.
@@ -768,13 +759,12 @@ class KeyValueGroupedDataset[K, V] private[sql](
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       initialState: KeyValueGroupedDataset[K, S],
       outputEncoder: Encoder[U],
       initialStateEncoder: Encoder[S]): Dataset[U] = {
-    transformWithState(statefulProcessor, timeoutMode, ttlMode,
+    transformWithState(statefulProcessor, timeMode,
       outputMode, initialState)(outputEncoder, initialStateEncoder)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2c534eb36f9d..d7ebf786168b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -751,7 +751,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case TransformWithState(
         keyDeserializer, valueDeserializer, groupingAttributes,
-        dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode,
+        dataAttributes, statefulProcessor, timeMode, outputMode,
         keyEncoder, outputAttr, child, hasInitialState,
         initialStateGroupingAttrs, initialStateDataAttrs,
         initialStateDeserializer, initialState) =>
@@ -761,8 +761,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           groupingAttributes,
           dataAttributes,
           statefulProcessor,
-          ttlMode,
-          timeoutMode,
+          timeMode,
           outputMode,
           keyEncoder,
           outputAttr,
@@ -926,12 +925,12 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           hasInitialState, planLater(initialState), planLater(child)
         ) :: Nil
       case logical.TransformWithState(keyDeserializer, valueDeserializer, 
groupingAttributes,
-          dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode, 
keyEncoder,
+          dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder,
           outputObjAttr, child, hasInitialState,
           initialStateGroupingAttrs, initialStateDataAttrs,
           initialStateDeserializer, initialState) =>
         
TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, 
valueDeserializer,
-          groupingAttributes, dataAttributes, statefulProcessor, ttlMode, 
timeoutMode, outputMode,
+          groupingAttributes, dataAttributes, statefulProcessor, timeMode, 
outputMode,
           keyEncoder, outputObjAttr, planLater(child), hasInitialState,
           initialStateGroupingAttrs, initialStateDataAttrs,
           initialStateDeserializer, planLater(initialState)) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
index 8ab05ef852b8..e0bfc684585d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode}
+import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeMode}
 
 /**
  * Class that provides a concrete implementation that can be used to provide 
access to expired
@@ -28,7 +28,7 @@ import org.apache.spark.sql.streaming.{ExpiredTimerInfo, 
TimeoutMode}
 class ExpiredTimerInfoImpl(
     isValid: Boolean,
     expiryTimeInMsOpt: Option[Long] = None,
-    timeoutMode: TimeoutMode = TimeoutMode.NoTimeouts()) extends 
ExpiredTimerInfo {
+    timeMode: TimeMode = TimeMode.None()) extends ExpiredTimerInfo {
 
   override def isValid(): Boolean = isValid
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
index 7bef62b7fcce..2b51361f651d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, 
StatefulProcessorHandle, TimeoutMode, TTLConfig, TTLMode, ValueState}
+import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, 
StatefulProcessorHandle, TimeMode, TTLConfig, ValueState}
 import org.apache.spark.util.Utils
 
 /**
@@ -78,8 +78,7 @@ class StatefulProcessorHandleImpl(
     store: StateStore,
     runId: UUID,
     keyEncoder: ExpressionEncoder[Any],
-    ttlMode: TTLMode,
-    timeoutMode: TimeoutMode,
+    timeMode: TimeMode,
     isStreaming: Boolean = true,
     batchTimestampMs: Option[Long] = None)
   extends StatefulProcessorHandle with Logging {
@@ -143,7 +142,7 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
-  private lazy val timerState = new TimerStateImpl(store, timeoutMode, 
keyEncoder)
+  private lazy val timerState = new TimerStateImpl(store, timeMode, keyEncoder)
 
   private def verifyStateVarOperations(operationType: String): Unit = {
     if (currState != CREATED) {
@@ -153,9 +152,9 @@ class StatefulProcessorHandleImpl(
   }
 
   private def verifyTimerOperations(operationType: String): Unit = {
-    if (timeoutMode == NoTimeouts) {
-      throw 
StateStoreErrors.cannotPerformOperationWithInvalidTimeoutMode(operationType,
-        timeoutMode.toString)
+    if (timeMode == NoTime) {
+      throw 
StateStoreErrors.cannotPerformOperationWithInvalidTimeMode(operationType,
+        timeMode.toString)
     }
 
     if (currState < INITIALIZED || currState >= TIMER_PROCESSED) {
@@ -242,8 +241,8 @@ class StatefulProcessorHandleImpl(
 
   private def validateTTLConfig(ttlConfig: TTLConfig, stateName: String): Unit 
= {
     val ttlDuration = ttlConfig.ttlDuration
-    if (ttlMode != TTLMode.ProcessingTimeTTL()) {
-      throw StateStoreErrors.cannotProvideTTLConfigForNoTTLMode(stateName)
+    if (timeMode != TimeMode.ProcessingTime()) {
+      throw StateStoreErrors.cannotProvideTTLConfigForTimeMode(stateName, 
timeMode.toString)
     } else if (ttlDuration == null || ttlDuration.isNegative || 
ttlDuration.isZero) {
       throw StateStoreErrors.ttlMustBePositive("update", stateName)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
index 55acc4953c50..e83c83df5322 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
@@ -16,14 +16,12 @@
  */
 package org.apache.spark.sql.execution.streaming
 
-import java.io.Serializable
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.streaming.TimeMode
 import org.apache.spark.sql.types._
 import org.apache.spark.util.NextIterator
 
@@ -31,10 +29,6 @@ import org.apache.spark.util.NextIterator
  * Singleton utils class used primarily while interacting with TimerState
  */
 object TimerStateUtils {
-  case class TimestampWithKey(
-      key: Any,
-      expiryTimestampMs: Long) extends Serializable
-
   val PROC_TIMERS_STATE_NAME = "_procTimers"
   val EVENT_TIMERS_STATE_NAME = "_eventTimers"
   val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
@@ -45,12 +39,12 @@ object TimerStateUtils {
  * Class that provides the implementation for storing timers
  * used within the `transformWithState` operator.
  * @param store - state store to be used for storing timer data
- * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param timeMode - mode of timeout (event time or processing time)
  * @param keyExprEnc - encoder for key expression
  */
 class TimerStateImpl(
     store: StateStore,
-    timeoutMode: TimeoutMode,
+    timeMode: TimeMode,
     keyExprEnc: ExpressionEncoder[Any]) extends Logging {
 
   private val EMPTY_ROW =
@@ -78,7 +72,7 @@ class TimerStateImpl(
 
   private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
 
-  private val timerCFName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+  private val timerCFName = if (timeMode == TimeMode.ProcessingTime) {
     TimerStateUtils.PROC_TIMERS_STATE_NAME
   } else {
     TimerStateUtils.EVENT_TIMERS_STATE_NAME
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index eaf51614d7cb..9f4cad1e348d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -42,8 +42,7 @@ import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration, Uti
  * @param groupingAttributes used to group the data
  * @param dataAttributes used to read the data
  * @param statefulProcessor processor methods called on underlying data
- * @param ttlMode defines the ttl Mode for user state
- * @param timeoutMode defines the timeout mode
+ * @param timeMode The time mode semantics of the stateful processor for 
timers and TTL.
  * @param outputMode defines the output mode for the statefulProcessor
  * @param keyEncoder expression encoder for the key type
  * @param outputObjAttr Defines the output object
@@ -59,8 +58,7 @@ case class TransformWithStateExec(
     groupingAttributes: Seq[Attribute],
     dataAttributes: Seq[Attribute],
     statefulProcessor: StatefulProcessor[Any, Any, Any],
-    ttlMode: TTLMode,
-    timeoutMode: TimeoutMode,
+    timeMode: TimeMode,
     outputMode: OutputMode,
     keyEncoder: ExpressionEncoder[Any],
     outputObjAttr: Attribute,
@@ -80,14 +78,15 @@ case class TransformWithStateExec(
   override def shortName: String = "transformWithStateExec"
 
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
-    if (ttlMode == TTLMode.ProcessingTimeTTL() || timeoutMode == 
TimeoutMode.ProcessingTime()) {
-      // TODO: check if we can return true only if actual timers are registered
-      true
-    } else if (timeoutMode == TimeoutMode.EventTime()) {
-      eventTimeWatermarkForEviction.isDefined &&
-        newInputWatermark > eventTimeWatermarkForEviction.get
-    } else {
-      false
+    timeMode match {
+      case ProcessingTime =>
+        // TODO: check if we can return true only if actual timers are 
registered, or there is
+        // expired state
+        true
+      case EventTime =>
+        eventTimeWatermarkForEviction.isDefined &&
+          newInputWatermark > eventTimeWatermarkForEviction.get
+      case _ => false
     }
   }
 
@@ -200,9 +199,9 @@ case class TransformWithStateExec(
   }
 
   private def processTimers(
-      timeoutMode: TimeoutMode,
+      timeMode: TimeMode,
       processorHandle: StatefulProcessorHandleImpl): Iterator[InternalRow] = {
-    timeoutMode match {
+    timeMode match {
       case ProcessingTime =>
         assert(batchTimestampMs.isDefined)
         val batchTimestamp = batchTimestampMs.get
@@ -262,7 +261,7 @@ case class TransformWithStateExec(
       override def next() = itr.next()
       private def getIterator(): Iterator[InternalRow] =
         CompletionIterator[InternalRow, Iterator[InternalRow]](
-          processTimers(timeoutMode, processorHandle), {
+          processTimers(timeMode, processorHandle), {
           // Note: `timeoutLatencyMs` also includes the time the parent 
operator took for
           // processing output returned through iterator.
           timeoutLatencyMs += NANOSECONDS.toMillis(System.nanoTime - 
timeoutProcessingStartTimeNs)
@@ -297,8 +296,7 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    validateTTLMode()
-    validateTimeoutMode()
+    validateTimeMode()
 
     if (hasInitialState) {
       val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
@@ -413,11 +411,11 @@ case class TransformWithStateExec(
   private def processData(store: StateStore, singleIterator: 
Iterator[InternalRow]):
     CompletionIterator[InternalRow, Iterator[InternalRow]] = {
     val processorHandle = new StatefulProcessorHandleImpl(
-      store, getStateInfo.queryRunId, keyEncoder, ttlMode, timeoutMode,
+      store, getStateInfo.queryRunId, keyEncoder, timeMode,
       isStreaming, batchTimestampMs)
     assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
     statefulProcessor.setHandle(processorHandle)
-    statefulProcessor.init(outputMode, timeoutMode, ttlMode)
+    statefulProcessor.init(outputMode, timeMode)
     processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
     processDataWithPartition(singleIterator, store, processorHandle)
   }
@@ -428,10 +426,10 @@ case class TransformWithStateExec(
       initStateIterator: Iterator[InternalRow]):
     CompletionIterator[InternalRow, Iterator[InternalRow]] = {
     val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-      keyEncoder, ttlMode, timeoutMode, isStreaming)
+      keyEncoder, timeMode, isStreaming)
     assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
     statefulProcessor.setHandle(processorHandle)
-    statefulProcessor.init(outputMode, timeoutMode, ttlMode)
+    statefulProcessor.init(outputMode, timeMode)
     processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
 
     // Check if is first batch
@@ -450,27 +448,16 @@ case class TransformWithStateExec(
     processDataWithPartition(childDataIterator, store, processorHandle)
   }
 
-  private def validateTimeoutMode(): Unit = {
-    timeoutMode match {
+  private def validateTimeMode(): Unit = {
+    timeMode match {
       case ProcessingTime =>
         if (batchTimestampMs.isEmpty) {
-          StateStoreErrors.missingTimeoutValues(timeoutMode.toString)
+          StateStoreErrors.missingTimeValues(timeMode.toString)
         }
 
       case EventTime =>
         if (eventTimeWatermarkForEviction.isEmpty) {
-          StateStoreErrors.missingTimeoutValues(timeoutMode.toString)
-        }
-
-      case _ =>
-    }
-  }
-
-  private def validateTTLMode(): Unit = {
-    ttlMode match {
-      case ProcessingTimeTTL =>
-        if (batchTimestampMs.isEmpty) {
-          StateStoreErrors.missingTTLValues(timeoutMode.toString)
+          StateStoreErrors.missingTimeValues(timeMode.toString)
         }
 
       case _ =>
@@ -488,8 +475,7 @@ object TransformWithStateExec {
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       statefulProcessor: StatefulProcessor[Any, Any, Any],
-      ttlMode: TTLMode,
-      timeoutMode: TimeoutMode,
+      timeMode: TimeMode,
       outputMode: OutputMode,
       keyEncoder: ExpressionEncoder[Any],
       outputObjAttr: Attribute,
@@ -514,8 +500,7 @@ object TransformWithStateExec {
       groupingAttributes,
       dataAttributes,
       statefulProcessor,
-      ttlMode,
-      timeoutMode,
+      timeMode,
       outputMode,
       keyEncoder,
       outputObjAttr,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6c63aa94e75b..b8ab32a00851 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -32,16 +32,9 @@ object StateStoreErrors {
     )
   }
 
-  def missingTimeoutValues(timeoutMode: String): SparkException = {
+  def missingTimeValues(timeMode: String): SparkException = {
     SparkException.internalError(
-      msg = s"Failed to find timeout values for timeoutMode=$timeoutMode",
-      category = "TWS"
-    )
-  }
-
-  def missingTTLValues(ttlMode: String): SparkException = {
-    SparkException.internalError(
-      msg = s"Failed to find timeout values for ttlMode=$ttlMode",
+      msg = s"Failed to find time values for timeMode=$timeMode",
       category = "TWS"
     )
   }
@@ -108,10 +101,10 @@ object StateStoreErrors {
     new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName)
   }
 
-  def cannotPerformOperationWithInvalidTimeoutMode(
+  def cannotPerformOperationWithInvalidTimeMode(
       operationType: String,
-      timeoutMode: String): 
StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode = {
-    new 
StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode(operationType, 
timeoutMode)
+      timeMode: String): 
StatefulProcessorCannotPerformOperationWithInvalidTimeMode = {
+    new 
StatefulProcessorCannotPerformOperationWithInvalidTimeMode(operationType, 
timeMode)
   }
 
   def cannotPerformOperationWithInvalidHandleState(
@@ -125,9 +118,9 @@ object StateStoreErrors {
     new StatefulProcessorCannotReInitializeState(groupingKey)
   }
 
-  def cannotProvideTTLConfigForNoTTLMode(stateName: String):
-    StatefulProcessorCannotAssignTTLInNoTTLMode = {
-    new StatefulProcessorCannotAssignTTLInNoTTLMode(stateName)
+  def cannotProvideTTLConfigForTimeMode(stateName: String, timeMode: String):
+    StatefulProcessorCannotAssignTTLInTimeMode = {
+    new StatefulProcessorCannotAssignTTLInTimeMode(stateName, timeMode)
   }
 
   def ttlMustBePositive(operationType: String,
@@ -163,12 +156,12 @@ class 
StateStoreUnsupportedOperationException(operationType: String, entity: Str
     messageParameters = Map("operationType" -> operationType, "entity" -> 
entity)
   )
 
-class StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode(
+class StatefulProcessorCannotPerformOperationWithInvalidTimeMode(
     operationType: String,
-    timeoutMode: String)
+    timeMode: String)
   extends SparkUnsupportedOperationException(
-    errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
-    messageParameters = Map("operationType" -> operationType, "timeoutMode" -> 
timeoutMode)
+    errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
+    messageParameters = Map("operationType" -> operationType, "timeMode" -> 
timeMode)
   )
 
 class StatefulProcessorCannotPerformOperationWithInvalidHandleState(
@@ -210,10 +203,10 @@ class 
StateStoreNullTypeOrderingColsNotSupported(fieldName: String, index: Strin
     errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
     messageParameters = Map("fieldName" -> fieldName, "index" -> index))
 
-class StatefulProcessorCannotAssignTTLInNoTTLMode(stateName: String)
+class StatefulProcessorCannotAssignTTLInTimeMode(stateName: String, timeMode: 
String)
   extends SparkUnsupportedOperationException(
-    errorClass = "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE",
-    messageParameters = Map("stateName" -> stateName))
+    errorClass = "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL",
+    messageParameters = Map("stateName" -> stateName, "timeMode" -> timeMode))
 
 class StatefulProcessorTTLMustBePositive(
     operationType: String,
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index f9f075f4468d..5d7ae477e089 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -206,8 +206,7 @@ public class JavaDatasetSuite implements Serializable {
 
     Dataset<String> transformWithStateMapped = grouped.transformWithState(
       new TestStatefulProcessorWithInitialState(),
-      TimeoutMode.NoTimeouts(),
-      TTLMode.NoTTL(),
+      TimeMode.None(),
       OutputMode.Append(),
       kvInitStateMappedDS,
       Encoders.STRING(),
@@ -362,8 +361,7 @@ public class JavaDatasetSuite implements Serializable {
     StatefulProcessor<Integer, String, String> testStatefulProcessor = new 
TestStatefulProcessor();
     Dataset<String> transformWithStateMapped = grouped.transformWithState(
       testStatefulProcessor,
-      TimeoutMode.NoTimeouts(),
-      TTLMode.NoTTL(),
+      TimeMode.None(),
       OutputMode.Append(),
       Encoders.STRING());
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
index c6d705af5f2d..e53e977da149 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
@@ -38,8 +38,7 @@ public class TestStatefulProcessor extends 
StatefulProcessor<Integer, String, St
   @Override
   public void init(
       OutputMode outputMode,
-      TimeoutMode timeoutMode,
-      TTLMode ttlMode) {
+      TimeMode timeMode) {
     countState = this.getHandle().getValueState("countState",
       Encoders.LONG());
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
index db0b222145c4..bfa542e81e35 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
@@ -37,8 +37,7 @@ public class TestStatefulProcessorWithInitialState
   @Override
   public void init(
       OutputMode outputMode,
-      TimeoutMode timeoutMode,
-      TTLMode ttlMode) {
+      TimeMode timeMode) {
     testState = this.getHandle().getValueState("testState",
       Encoders.STRING());
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
index 51cfc1548b39..5eb48a86e342 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl}
-import org.apache.spark.sql.streaming.{ListState, TimeoutMode, TTLMode, 
ValueState}
+import org.apache.spark.sql.streaming.{ListState, TimeMode, ValueState}
 
 /**
  * Class that adds unit tests for ListState types used in arbitrary stateful
@@ -37,8 +37,7 @@ class ListStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val listState: ListState[Long] = handle.getListState[Long]("listState", 
Encoders.scalaLong)
 
@@ -71,8 +70,7 @@ class ListStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ListState[Long] = handle.getListState[Long]("testState", 
Encoders.scalaLong)
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
@@ -100,8 +98,7 @@ class ListStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState1: ListState[Long] = 
handle.getListState[Long]("testState1", Encoders.scalaLong)
       val testState2: ListState[Long] = 
handle.getListState[Long]("testState2", Encoders.scalaLong)
@@ -139,8 +136,7 @@ class ListStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val listState1: ListState[Long] = 
handle.getListState[Long]("listState1", Encoders.scalaLong)
       val listState2: ListState[Long] = 
handle.getListState[Long]("listState2", Encoders.scalaLong)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
index 7fa41b12795e..572fc2429273 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
@@ -22,7 +22,7 @@ import java.util.UUID
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl}
-import org.apache.spark.sql.streaming.{ListState, MapState, TimeoutMode, 
TTLMode, ValueState}
+import org.apache.spark.sql.streaming.{ListState, MapState, TimeMode, 
ValueState}
 import org.apache.spark.sql.types.{BinaryType, StructType}
 
 /**
@@ -39,8 +39,7 @@ class MapStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: MapState[String, Double] =
         handle.getMapState[String, Double]("testState", Encoders.STRING, 
Encoders.scalaDouble)
@@ -74,8 +73,7 @@ class MapStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState1: MapState[Long, Double] =
         handle.getMapState[Long, Double]("testState1", Encoders.scalaLong, 
Encoders.scalaDouble)
@@ -114,8 +112,7 @@ class MapStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val mapTestState1: MapState[String, Int] =
         handle.getMapState[String, Int]("mapTestState1", Encoders.STRING, 
Encoders.scalaInt)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
index a32b4111eae8..e9ffe4ca9269 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
-import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode}
+import org.apache.spark.sql.streaming.{TimeMode, TTLConfig}
 
 
 /**
@@ -36,21 +36,21 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
   private def keyExprEncoder: ExpressionEncoder[Any] =
     Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
 
-  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
-    timeoutMode match {
-      case "NoTimeouts" => TimeoutMode.NoTimeouts()
-      case "ProcessingTime" => TimeoutMode.ProcessingTime()
-      case "EventTime" => TimeoutMode.EventTime()
-      case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+  private def getTimeMode(timeMode: String): TimeMode = {
+    timeMode match {
+      case "None" => TimeMode.None()
+      case "ProcessingTime" => TimeMode.ProcessingTime()
+      case "EventTime" => TimeMode.EventTime()
+      case _ => throw new IllegalArgumentException(s"Invalid 
timeMode=$timeMode")
     }
   }
 
-  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
-    test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+  Seq("None", "ProcessingTime", "EventTime").foreach { timeMode =>
+    test(s"value state creation with timeMode=$timeMode should succeed") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
         val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
getTimeoutMode(timeoutMode))
+          UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
         assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
         handle.getValueState[Long]("testState", Encoders.scalaLong)
       }
@@ -85,13 +85,13 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     handle.registerTimer(1000L)
   }
 
-  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
-    test(s"value state creation with timeoutMode=$timeoutMode " +
+  Seq("None", "ProcessingTime", "EventTime").foreach { timeMode =>
+    test(s"value state creation with timeMode=$timeMode " +
       "and invalid state should fail") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
         val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
getTimeoutMode(timeoutMode))
+          UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
 
         Seq(StatefulProcessorHandleState.INITIALIZED,
           StatefulProcessorHandleState.DATA_PROCESSED,
@@ -105,21 +105,21 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  test("registering processing/event time timeouts with NoTimeout mode should 
fail") {
+  test("registering processing/event time timeouts with None timeMode should 
fail") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store,
-        UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
TimeoutMode.NoTimeouts())
+        UUID.randomUUID(), keyExprEncoder, TimeMode.None())
       val ex = intercept[SparkUnsupportedOperationException] {
         handle.registerTimer(10000L)
       }
 
       checkError(
         ex,
-        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
+        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
         parameters = Map(
           "operationType" -> "register_timer",
-          "timeoutMode" -> TimeoutMode.NoTimeouts().toString
+          "timeMode" -> TimeMode.None().toString
         ),
         matchPVals = true
       )
@@ -130,22 +130,22 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
 
       checkError(
         ex2,
-        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
+        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
         parameters = Map(
           "operationType" -> "delete_timer",
-          "timeoutMode" -> TimeoutMode.NoTimeouts().toString
+          "timeMode" -> TimeMode.None().toString
         ),
         matchPVals = true
       )
     }
   }
 
-  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
-    test(s"registering timeouts with timeoutMode=$timeoutMode should succeed") 
{
+  Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+    test(s"registering timeouts with timeMode=$timeMode should succeed") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
         val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
getTimeoutMode(timeoutMode))
+          UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
         handle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
         assert(handle.getHandleState === 
StatefulProcessorHandleState.INITIALIZED)
 
@@ -161,12 +161,12 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
-    test(s"verify listing of registered timers with timeoutMode=$timeoutMode") 
{
+  Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+    test(s"verify listing of registered timers with timeMode=$timeMode") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
         val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
getTimeoutMode(timeoutMode))
+          UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
         handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED)
         assert(handle.getHandleState === 
StatefulProcessorHandleState.DATA_PROCESSED)
 
@@ -201,12 +201,12 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
-    test(s"registering timeouts with timeoutMode=$timeoutMode and invalid 
state should fail") {
+  Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+    test(s"registering timeouts with timeMode=$timeMode and invalid state 
should fail") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
         val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
getTimeoutMode(timeoutMode))
+          UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
 
         Seq(StatefulProcessorHandleState.CREATED,
           StatefulProcessorHandleState.TIMER_PROCESSED,
@@ -219,11 +219,11 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  test(s"ttl States are populated for ttlMode=ProcessingTime") {
+  test(s"ttl States are populated for timeMode=ProcessingTime") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store,
-        UUID.randomUUID(), keyExprEncoder, TTLMode.ProcessingTimeTTL(), 
TimeoutMode.NoTimeouts(),
+        UUID.randomUUID(), keyExprEncoder, TimeMode.ProcessingTime(),
         batchTimestampMs = Some(10))
 
       val valueStateWithTTL = handle.getValueState("testState",
@@ -237,11 +237,11 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  test(s"ttl States are not populated for ttlMode=NoTTL") {
+  test(s"ttl States are not populated for timeMode=None") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store,
-        UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(), 
TimeoutMode.NoTimeouts())
+        UUID.randomUUID(), keyExprEncoder, TimeMode.None())
 
       handle.getValueState("testState", Encoders.STRING)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
index 1af33aa7b5ad..0bf160d8b321 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
@@ -20,31 +20,31 @@ package org.apache.spark.sql.execution.streaming.state
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
TimerStateImpl}
-import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.streaming.TimeMode
 
 /**
  * Class that adds unit tests for Timer State used in arbitrary stateful
  * operators such as transformWithState
  */
 class TimerSuite extends StateVariableSuiteBase {
-  private def testWithTimeOutMode(testName: String)
-      (testFunc: TimeoutMode => Unit): Unit = {
+  private def testWithTimeMode(testName: String)
+      (testFunc: TimeMode => Unit): Unit = {
     Seq("Processing", "Event").foreach { timeoutMode =>
       test(s"$timeoutMode timer - " + testName) {
         timeoutMode match {
-          case "Processing" => testFunc(TimeoutMode.ProcessingTime())
-          case "Event" => testFunc(TimeoutMode.EventTime())
+          case "Processing" => testFunc(TimeMode.ProcessingTime())
+          case "Event" => testFunc(TimeMode.EventTime())
         }
       }
     }
   }
 
-  testWithTimeOutMode("single instance with single key") { timeoutMode =>
+  testWithTimeMode("single instance with single key") { timeMode =>
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-      val timerState = new TimerStateImpl(store, timeoutMode,
+      val timerState = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       timerState.registerTimer(1L * 1000)
       assert(timerState.listTimers().toSet === Set(1000L))
@@ -58,14 +58,14 @@ class TimerSuite extends StateVariableSuiteBase {
     }
   }
 
-  testWithTimeOutMode("multiple instances with single key") { timeoutMode =>
+  testWithTimeMode("multiple instances with single key") { timeMode =>
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-      val timerState1 = new TimerStateImpl(store, timeoutMode,
+      val timerState1 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
-      val timerState2 = new TimerStateImpl(store, timeoutMode,
+      val timerState2 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       timerState1.registerTimer(1L * 1000)
       timerState2.registerTimer(15L * 1000)
@@ -83,12 +83,12 @@ class TimerSuite extends StateVariableSuiteBase {
     }
   }
 
-  testWithTimeOutMode("multiple instances with multiple keys") { timeoutMode =>
+  testWithTimeMode("multiple instances with multiple keys") { timeMode =>
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
-      val timerState1 = new TimerStateImpl(store, timeoutMode,
+      val timerState1 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       timerState1.registerTimer(1L * 1000)
       timerState1.registerTimer(2L * 1000)
@@ -96,7 +96,7 @@ class TimerSuite extends StateVariableSuiteBase {
       ImplicitGroupingKeyTracker.removeImplicitKey()
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key2")
-      val timerState2 = new TimerStateImpl(store, timeoutMode,
+      val timerState2 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       timerState2.registerTimer(15L * 1000)
       ImplicitGroupingKeyTracker.removeImplicitKey()
@@ -115,13 +115,13 @@ class TimerSuite extends StateVariableSuiteBase {
     }
   }
 
-  testWithTimeOutMode("Range scan on second index timer key - " +
-    "verify timestamp is sorted for single instance") { timeoutMode =>
+  testWithTimeMode("Range scan on second index timer key - " +
+    "verify timestamp is sorted for single instance") { timeMode =>
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-      val timerState = new TimerStateImpl(store, timeoutMode,
+      val timerState = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       val timerTimerstamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
       // register/put unordered timestamp into rocksDB
@@ -134,25 +134,25 @@ class TimerSuite extends StateVariableSuiteBase {
     }
   }
 
-  testWithTimeOutMode("test range scan on second index timer key - " +
-    "verify timestamp is sorted for multiple instances") { timeoutMode =>
+  testWithTimeMode("test range scan on second index timer key - " +
+    "verify timestamp is sorted for multiple instances") { timeMode =>
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
-      val timerState1 = new TimerStateImpl(store, timeoutMode,
+      val timerState1 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, 1L)
       timerTimestamps1.foreach(timerState1.registerTimer)
 
-      val timerState2 = new TimerStateImpl(store, timeoutMode,
+      val timerState2 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       val timerTimestamps2 = Seq(931L, 8000L, 452300L, 4200L)
       timerTimestamps2.foreach(timerState2.registerTimer)
       ImplicitGroupingKeyTracker.removeImplicitKey()
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key3")
-      val timerState3 = new TimerStateImpl(store, timeoutMode,
+      val timerState3 = new TimerStateImpl(store, timeMode,
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
       val timerTimerStamps3 = Seq(1L, 2L, 8L, 3L)
       timerTimerStamps3.foreach(timerState3.registerTimer)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 102164d9c15f..d2747e2976f4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, ValueStateImplWithTTL}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode, 
ValueState}
+import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -49,8 +49,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val stateName = "testState"
       val testState: ValueState[Long] = 
handle.getValueState[Long]("testState", Encoders.scalaLong)
@@ -94,8 +93,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ValueState[Long] = 
handle.getValueState[Long]("testState", Encoders.scalaLong)
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
@@ -121,8 +119,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState1: ValueState[Long] = handle.getValueState[Long](
         "testState1", Encoders.scalaLong)
@@ -167,8 +164,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store,
-        UUID.randomUUID(), 
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        UUID.randomUUID(), 
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val cfName = "_testState"
       val ex = intercept[SparkUnsupportedOperationException] {
@@ -208,8 +204,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ValueState[Double] = 
handle.getValueState[Double]("testState",
         Encoders.scalaDouble)
@@ -235,8 +230,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ValueState[Long] = handle.getValueState[Long]("testState",
         Encoders.scalaLong)
@@ -262,8 +256,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ValueState[TestClass] = 
handle.getValueState[TestClass]("testState",
         Encoders.product[TestClass])
@@ -289,8 +282,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
 
       val testState: ValueState[POJOTestClass] = 
handle.getValueState[POJOTestClass]("testState",
         Encoders.bean(classOf[POJOTestClass]))
@@ -318,8 +310,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
       val store = provider.getStore(0)
       val timestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
-        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], 
TimeMode.ProcessingTime(),
         batchTimestampMs = Some(timestampMs))
 
       val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
@@ -340,8 +331,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
       // increment batchProcessingTime, or watermark and ensure expired value 
is not returned
       val nextBatchHandle = new StatefulProcessorHandleImpl(store, 
UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
-        batchTimestampMs = Some(ttlExpirationMs))
+        TimeMode.ProcessingTime(), batchTimestampMs = Some(ttlExpirationMs))
 
       val nextBatchTestState: ValueStateImplWithTTL[String] =
         nextBatchHandle.getValueState[String]("testState", Encoders.STRING, 
ttlConfig)
@@ -377,8 +367,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
       val batchTimestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
-        batchTimestampMs = Some(batchTimestampMs))
+        TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs))
 
       Seq(null, Duration.ZERO, Duration.ofMinutes(-1)).foreach { ttlDuration =>
         val ttlConfig = TTLConfig(ttlDuration)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
index 5ccc14ab8a77..705226d51332 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
@@ -32,8 +32,7 @@ class TestListStateProcessor
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _listState = getHandle.getListState("testListState", Encoders.STRING)
   }
 
@@ -90,8 +89,7 @@ class ToggleSaveAndEmitProcessor
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _listState = getHandle.getListState("testListState", Encoders.STRING)
     _valueState = getHandle.getValueState("testValueState", 
Encoders.scalaBoolean)
   }
@@ -141,8 +139,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update()) (
@@ -162,8 +159,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -183,8 +179,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -204,8 +199,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -225,8 +219,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -246,8 +239,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -267,8 +259,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestListStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update()) (
@@ -320,8 +311,7 @@ class TransformWithListStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x)
         .transformWithState(new ToggleSaveAndEmitProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
index d32b9687d95f..0eafb16a5350 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
@@ -32,8 +32,7 @@ class TestMapStateProcessor
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _mapState = getHandle.getMapState("sessionState", Encoders.STRING, 
Encoders.STRING)
   }
 
@@ -95,8 +94,7 @@ class TransformWithMapStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestMapStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
 
@@ -122,8 +120,7 @@ class TransformWithMapStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestMapStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -147,8 +144,7 @@ class TransformWithMapStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestMapStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -171,8 +167,7 @@ class TransformWithMapStateSuite extends StreamTest
       val result = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new TestMapStateProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Append())
       testStream(result, OutputMode.Append())(
         // Test exists()
@@ -226,8 +221,7 @@ class TransformWithMapStateSuite extends StreamTest
     val result = inputData.toDS()
       .groupByKey(x => x.key)
       .transformWithState(new TestMapStateProcessor(),
-        TimeoutMode.NoTimeouts(),
-        TTLMode.NoTTL(),
+        TimeMode.None(),
         OutputMode.Append())
 
     val df = result.toDF()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
index 106f228ba78b..54cff6fc44c0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
@@ -38,8 +38,7 @@ abstract class StatefulProcessorWithInitialStateTestClass[V]
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _valState = getHandle.getValueState[Double]("testValueInit", 
Encoders.scalaDouble)
     _listState = getHandle.getListState[Double]("testListInit", 
Encoders.scalaDouble)
     _mapState = getHandle.getMapState[Double, Int](
@@ -171,8 +170,7 @@ class StatefulProcessorWithInitialStateProcTimerClass
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode) : Unit = {
+      timeMode: TimeMode) : Unit = {
     _countState = getHandle.getValueState[Long]("countState", 
Encoders.scalaLong)
     _timerState = getHandle.getValueState[Long]("timerState", 
Encoders.scalaLong)
   }
@@ -215,8 +213,7 @@ class StatefulProcessorWithInitialStateEventTimerClass
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState",
       Encoders.scalaLong)
     _timerState = getHandle.getValueState[Long]("timerState", 
Encoders.scalaLong)
@@ -293,7 +290,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
           InputRowForInitialState("init_2", 100.0, List(100.0), Map(100.0 -> 
1)))
           .toDS().groupByKey(x => x.key).mapValues(x => x)
       val query = kvDataSet.transformWithState(new 
InitialStateInMemoryTestClass(),
-        TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), 
initStateDf)
+        TimeMode.None(), OutputMode.Append(), initStateDf)
 
       testStream(query, OutputMode.Update())(
         // non-exist key test
@@ -371,7 +368,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
       val query = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new AccumulateStatefulProcessorWithInitState(),
-          TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), 
initStateDf
+          TimeMode.None(), OutputMode.Append(), initStateDf
         )
       testStream(query, OutputMode.Update())(
         AddData(inputData, InitInputRow("init_1", "add", 50.0)),
@@ -391,8 +388,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
     val result = inputData.toDS()
       .groupByKey(x => x.key)
       .transformWithState(new AccumulateStatefulProcessorWithInitState(),
-        TimeoutMode.NoTimeouts(),
-        TTLMode.NoTTL(),
+        TimeMode.None(),
         OutputMode.Append(),
         createInitialDfForTest)
 
@@ -410,8 +406,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
       val query = inputData.toDS()
         .groupByKey(x => x.key)
         .transformWithState(new AccumulateStatefulProcessorWithInitState(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Append(),
           initDf)
 
@@ -443,8 +438,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
       val result = inputData.toDS().groupByKey(x => x)
         .transformWithState(
           new StatefulProcessorWithInitialStateProcTimerClass(),
-          TimeoutMode.ProcessingTime(),
-          TTLMode.NoTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Update(),
           initDf)
 
@@ -488,8 +482,7 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
       val result = eventTimeDf(inputData.toDS())
         .transformWithState(
           new StatefulProcessorWithInitialStateEventTimerClass(),
-          TimeoutMode.EventTime(),
-          TTLMode.NoTTL(),
+          TimeMode.EventTime(),
           OutputMode.Update(),
           initDf)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 735c53bf3c91..7dec14d3e435 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -40,8 +40,7 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _countState = getHandle.getValueState[Long]("countState", 
Encoders.scalaLong)
   }
 
@@ -104,9 +103,8 @@ class RunningCountStatefulProcessorWithProcTimeTimerUpdates
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode) : Unit = {
-    super.init(outputMode, timeoutMode, ttlMode)
+      timeMode: TimeMode) : Unit = {
+    super.init(outputMode, timeMode)
     _timerState = getHandle.getValueState[Long]("timerState", 
Encoders.scalaLong)
   }
 
@@ -196,8 +194,7 @@ class MaxEventTimeStatefulProcessor
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState",
       Encoders.scalaLong)
     _timerState = getHandle.getValueState[Long]("timerState", 
Encoders.scalaLong)
@@ -242,8 +239,7 @@ class RunningCountMostRecentStatefulProcessor
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _countState = getHandle.getValueState[Long]("countState", 
Encoders.scalaLong)
     _mostRecent = getHandle.getValueState[String]("mostRecent", 
Encoders.STRING)
   }
@@ -273,8 +269,7 @@ class MostRecentStatefulProcessorWithDeletion
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     getHandle.deleteIfExists("countState")
     _mostRecent = getHandle.getValueState[String]("mostRecent", 
Encoders.STRING)
   }
@@ -327,8 +322,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       val result = inputData.toDS()
         .groupByKey(x => x)
         .transformWithState(new RunningCountStatefulProcessorWithError(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -349,8 +343,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       val result = inputData.toDS()
         .groupByKey(x => x)
         .transformWithState(new RunningCountStatefulProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -380,8 +373,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       val result = inputData.toDS()
         .groupByKey(x => x)
         .transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
-          TimeoutMode.ProcessingTime(),
-          TTLMode.NoTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -424,8 +416,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .groupByKey(x => x)
         .transformWithState(
           new RunningCountStatefulProcessorWithProcTimeTimerUpdates(),
-          TimeoutMode.ProcessingTime(),
-          TTLMode.NoTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -461,8 +452,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .groupByKey(x => x)
         .transformWithState(
           new RunningCountStatefulProcessorWithMultipleTimers(),
-          TimeoutMode.ProcessingTime(),
-          TTLMode.NoTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -497,8 +487,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .groupByKey(_._1)
         .transformWithState(
           new MaxEventTimeStatefulProcessor(),
-          TimeoutMode.EventTime(),
-          TTLMode.NoTTL(),
+          TimeMode.EventTime(),
           OutputMode.Update())
 
     testStream(result, OutputMode.Update())(
@@ -539,8 +528,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
     val result = inputData.toDS()
       .groupByKey(x => x)
       .transformWithState(new RunningCountStatefulProcessor(),
-        TimeoutMode.NoTimeouts(),
-        TTLMode.NoTTL(),
+        TimeMode.None(),
         OutputMode.Append())
 
     val df = result.toDF()
@@ -558,15 +546,13 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
         val stream1 = inputData.toDS()
           .groupByKey(x => x._1)
           .transformWithState(new RunningCountMostRecentStatefulProcessor(),
-            TimeoutMode.NoTimeouts(),
-            TTLMode.NoTTL(),
+            TimeMode.None(),
             OutputMode.Update())
 
         val stream2 = inputData.toDS()
           .groupByKey(x => x._1)
           .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
-            TimeoutMode.NoTimeouts(),
-            TTLMode.NoTTL(),
+            TimeMode.None(),
             OutputMode.Update())
 
         testStream(stream1, OutputMode.Update())(
@@ -598,8 +584,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .union(inputData2.toDS())
         .groupByKey(x => x)
         .transformWithState(new RunningCountStatefulProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -632,8 +617,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .union(inputData3.toDS())
         .groupByKey(x => x)
         .transformWithState(new RunningCountStatefulProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -666,8 +650,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
         .union(inputData2.toDS().map(_.toString))
         .groupByKey(x => x)
         .transformWithState(new RunningCountStatefulProcessor(),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.NoTTL(),
+          TimeMode.None(),
           OutputMode.Update())
 
       testStream(result, OutputMode.Update())(
@@ -697,8 +680,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       .select("value").as[String]
       .groupByKey(x => x)
       .transformWithState(new RunningCountStatefulProcessor(),
-        TimeoutMode.NoTimeouts(),
-        TTLMode.NoTTL(),
+        TimeMode.None(),
         OutputMode.Update())
   }
 
@@ -790,8 +772,7 @@ class TransformWithStateValidationSuite extends 
StateStoreMetricsTest {
     val result = inputData.toDS()
       .groupByKey(x => x)
       .transformWithState(new RunningCountStatefulProcessor(),
-        TimeoutMode.NoTimeouts(),
-        TTLMode.NoTTL(),
+        TimeMode.None(),
         OutputMode.Update())
 
     testStream(result, OutputMode.Update())(
@@ -810,7 +791,7 @@ class TransformWithStateValidationSuite extends 
StateStoreMetricsTest {
     val result = inputData.toDS()
       .groupByKey(x => x.key)
       .transformWithState(new AccumulateStatefulProcessorWithInitState(),
-        TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), initDf
+        TimeMode.None(), OutputMode.Append(), initDf
       )
     testStream(result, OutputMode.Update())(
       AddData(inputData, InitInputRow("a", "add", -1.0)),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
index 759d535c18a3..e6dd0ace766a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
@@ -98,8 +98,7 @@ class ValueStateTTLProcessor(ttlConfig: TTLConfig)
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _valueState = getHandle
       .getValueState("valueState", Encoders.scalaInt, ttlConfig)
       .asInstanceOf[ValueStateImplWithTTL[Int]]
@@ -135,8 +134,7 @@ case class MultipleValueStatesTTLProcessor(
 
   override def init(
       outputMode: OutputMode,
-      timeoutMode: TimeoutMode,
-      ttlMode: TTLMode): Unit = {
+      timeMode: TimeMode): Unit = {
     _valueStateWithTTL = getHandle
       .getValueState("valueState", Encoders.scalaInt, ttlConfig)
       .asInstanceOf[ValueStateImplWithTTL[Int]]
@@ -191,8 +189,7 @@ class TransformWithValueStateTTLSuite
           .groupByKey(x => x.key)
           .transformWithState(
             new ValueStateTTLProcessor(ttlConfig),
-            TimeoutMode.NoTimeouts(),
-            TTLMode.ProcessingTimeTTL(),
+            TimeMode.ProcessingTime(),
             OutputMode.Append())
 
         val clock = new StreamManualClock
@@ -258,8 +255,7 @@ class TransformWithValueStateTTLSuite
         .groupByKey(x => x.key)
         .transformWithState(
           new ValueStateTTLProcessor(ttlConfig),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.ProcessingTimeTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Append())
 
       val clock = new StreamManualClock
@@ -321,8 +317,7 @@ class TransformWithValueStateTTLSuite
         .groupByKey(x => x.key)
         .transformWithState(
           new ValueStateTTLProcessor(ttlConfig),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.ProcessingTimeTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Append())
 
       val clock = new StreamManualClock
@@ -375,8 +370,7 @@ class TransformWithValueStateTTLSuite
         .groupByKey(x => x.key)
         .transformWithState(
           MultipleValueStatesTTLProcessor(ttlKey, noTtlKey, ttlConfig),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.ProcessingTimeTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Append())
 
       val clock = new StreamManualClock
@@ -430,8 +424,7 @@ class TransformWithValueStateTTLSuite
         .groupByKey(x => x.key)
         .transformWithState(
           new ValueStateTTLProcessor(ttlConfig),
-          TimeoutMode.NoTimeouts(),
-          TTLMode.ProcessingTimeTTL(),
+          TimeMode.ProcessingTime(),
           OutputMode.Append())
 
       val clock = new StreamManualClock


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to