This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff38378d7e42 [SPARK-47372][SS] Add support for range scan based key 
state encoder for use with state store provider
ff38378d7e42 is described below

commit ff38378d7e425c3e810e6556a3916f4594f2aec0
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Tue Mar 26 13:48:40 2024 +0900

    [SPARK-47372][SS] Add support for range scan based key state encoder for 
use with state store provider
    
    ### What changes were proposed in this pull request?
    Add support for range scan based key state encoder for use with state store 
provider
    
    ### Why are the changes needed?
    Changes are needed to allow range scan of fixed size initial cols 
especially with RocksDB state store provider. Earlier we had tried to use the 
existing key state encoder to encapsulate state for ordering columns using 
BIG_ENDIAN encoding. However, that model does not work with variable sized 
non-ordering columns part of the grouping key, since in that case the variable 
length portion gets encoded upfront in the `UnsafeRow` thereby breaking the 
range scan/sorting functionality. In ord [...]
    - create the state store instance or any column family with a key schema 
and the `RangeScanKeyStateEncoder` encoder type and the num of ordering cols 
being specified
    - with the new encoder, user can perform range scan in sorted order using 
ordering cols
    - with the new encoder, user can perform prefix scan using ordering cols
    - ordering cols have to be > 0 and <= num_key_schema_cols which is a more 
lenient requirement than `PrefixScanKeyStateEncoder`
    - only fixed size (primitive type) ordering cols can be used with this 
encoder
    
    Internally, we will convert the passed `UnsafeRow` into a new one that has 
the BIG_ENDIAN encoded byte arrays that are eventually written out to RocksDB.
    
    Note that the existing - `NoPrefixKeyStateEncoder` and 
`PrefixScanKeyStateEncoder` will also continue to be supported. The user can 
decide which encoder best fits the need of their store/col family schema/use.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    ...
    [info] - rocksdb range scan validation - variable sized columns - with 
colFamiliesEnabled=true (without changelog checkpointing) (1 millisecond)
    [info] - rocksdb range scan validation - variable sized columns - with 
colFamiliesEnabled=false (with changelog checkpointing) (1 millisecond)
    [info] - rocksdb range scan validation - variable sized columns - with 
colFamiliesEnabled=false (without changelog checkpointing) (1 millisecond)
    [info] - rocksdb range scan - fixed size non-ordering columns - with 
colFamiliesEnabled=true (with changelog checkpointing) (782 milliseconds)
    [info] - rocksdb range scan - fixed size non-ordering columns - with 
colFamiliesEnabled=true (without changelog checkpointing) (125 milliseconds)
    [info] - rocksdb range scan - fixed size non-ordering columns - with 
colFamiliesEnabled=false (with changelog checkpointing) (177 milliseconds)
    [info] - rocksdb range scan - fixed size non-ordering columns - with 
colFamiliesEnabled=false (without changelog checkpointing) (89 milliseconds)
    [info] - rocksdb range scan - variable size non-ordering columns - with 
colFamiliesEnabled=true (with changelog checkpointing) (192 milliseconds)
    [info] - rocksdb range scan - variable size non-ordering columns - with 
colFamiliesEnabled=true (without changelog checkpointing) (107 milliseconds)
    [info] - rocksdb range scan - variable size non-ordering columns - with 
colFamiliesEnabled=false (with changelog checkpointing) (185 milliseconds)
    [info] - rocksdb range scan - variable size non-ordering columns - with 
colFamiliesEnabled=false (without changelog checkpointing) (96 milliseconds)
    [info] - rocksdb range scan - ordering cols and key schema cols are same - 
with colFamiliesEnabled=true (with changelog checkpointing) (195 milliseconds)
    [info] - rocksdb range scan - ordering cols and key schema cols are same - 
with colFamiliesEnabled=true (without changelog checkpointing) (106 
milliseconds)
    [info] - rocksdb range scan - ordering cols and key schema cols are same - 
with colFamiliesEnabled=false (with changelog checkpointing) (161 milliseconds)
    [info] - rocksdb range scan - ordering cols and key schema cols are same - 
with colFamiliesEnabled=false (without changelog checkpointing) (88 
milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=true (with changelog checkpointing) (169 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=true (without changelog checkpointing) (105 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=false (with changelog checkpointing) (185 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=false (without changelog checkpointing) (94 milliseconds)
    16:24:18.401 WARN 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: 
rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
    [info] Run completed in 5 seconds, 247 milliseconds.
    [info] Total number of tests run: 24
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 24, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 22 s, completed Mar 15, 2024, 4:24:18 PM
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45503 from anishshri-db/task/SPARK-47372.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  24 +
 docs/sql-error-conditions.md                       |  24 +
 .../v2/state/StatePartitionReader.scala            |  16 +-
 .../streaming/FlatMapGroupsWithStateExec.scala     |   4 +-
 .../sql/execution/streaming/ListStateImpl.scala    |   6 +-
 .../sql/execution/streaming/MapStateImpl.scala     |   6 +-
 .../sql/execution/streaming/TimerStateImpl.scala   |  14 +-
 .../streaming/TransformWithStateExec.scala         |   4 +-
 .../sql/execution/streaming/ValueStateImpl.scala   |   6 +-
 .../state/HDFSBackedStateStoreProvider.scala       |  47 +-
 .../streaming/state/RocksDBStateEncoder.scala      | 281 ++++++++++-
 .../state/RocksDBStateStoreProvider.scala          |  14 +-
 .../sql/execution/streaming/state/StateStore.scala |  40 +-
 .../streaming/state/StateStoreErrors.scala         |  42 +-
 .../execution/streaming/state/StateStoreRDD.scala  |  11 +-
 .../state/SymmetricHashJoinStateManager.scala      |   4 +-
 .../sql/execution/streaming/state/package.scala    |  15 +-
 .../execution/streaming/statefulOperators.scala    |  12 +-
 .../sql/execution/streaming/streamingLimits.scala  |   4 +-
 .../StateStoreBasicOperationsBenchmark.scala       |  10 +-
 ...ngSortWithSessionWindowStateIteratorSuite.scala |   6 +-
 .../streaming/state/MemoryStateStore.scala         |   2 +-
 .../streaming/state/RocksDBStateStoreSuite.scala   | 541 ++++++++++++++++++++-
 .../streaming/state/StateStoreRDDSuite.scala       |  27 +-
 .../streaming/state/StateStoreSuite.scala          | 111 ++++-
 .../StreamingSessionWindowStateManagerSuite.scala  |   4 +-
 .../streaming/state/ValueStateSuite.scala          |  10 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |   4 +-
 28 files changed, 1127 insertions(+), 162 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 99aeca33dfb4..717d5e6631ec 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3561,6 +3561,24 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN" : {
+    "message" : [
+      "Incorrect number of ordering columns=<numOrderingCols> for range scan 
encoder. Ordering columns cannot be zero or greater than num of schema columns."
+    ],
+    "sqlState" : "42802"
+  },
+  "STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN" : {
+    "message" : [
+      "Incorrect number of prefix columns=<numPrefixCols> for prefix scan 
encoder. Prefix columns cannot be zero or greater than or equal to num of 
schema columns."
+    ],
+    "sqlState" : "42802"
+  },
+  "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED" : {
+    "message" : [
+      "Null type ordering column with name=<fieldName> at index=<index> is not 
supported for range scan encoder."
+    ],
+    "sqlState" : "42802"
+  },
   "STATE_STORE_UNSUPPORTED_OPERATION" : {
     "message" : [
       "<operationType> operation not supported with <entity>"
@@ -3573,6 +3591,12 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED" : {
+    "message" : [
+      "Variable size ordering column with name=<fieldName> at index=<index> is 
not supported for range scan encoder."
+    ],
+    "sqlState" : "42802"
+  },
   "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
     "message" : [
       "Static partition column <staticName> is also specified in the column 
list."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 359e120c9cd0..b05a8d1ff61e 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2193,6 +2193,24 @@ Failed to perform column family 
operation=`<operationName>` with invalid name=`<
 The handle has not been initialized for this StatefulProcessor.
 Please only use the StatefulProcessor within the transformWithState operator.
 
+### STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Incorrect number of ordering columns=`<numOrderingCols>` for range scan 
encoder. Ordering columns cannot be zero or greater than num of schema columns.
+
+### STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Incorrect number of prefix columns=`<numPrefixCols>` for prefix scan encoder. 
Prefix columns cannot be zero or greater than or equal to num of schema columns.
+
+### STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Null type ordering column with name=`<fieldName>` at index=`<index>` is not 
supported for range scan encoder.
+
 ### STATE_STORE_UNSUPPORTED_OPERATION
 
 [SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
@@ -2205,6 +2223,12 @@ Please only use the StatefulProcessor within the 
transformWithState operator.
 
 State store operation=`<operationType>` not supported on missing column 
family=`<colFamilyName>`.
 
+### STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Variable size ordering column with name=`<fieldName>` at index=`<index>` is 
not supported for range scan encoder.
+
 ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST
 
 [SQLSTATE: 
42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
index e4e28d5f8ac3..bbfe3a3f373e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -22,7 +22,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import 
org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, ReadStateStore, StateStoreConf, StateStoreId, 
StateStoreProvider, StateStoreProviderId}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
@@ -77,9 +77,19 @@ class StatePartitionReader(
       stateStoreMetadata.head.numColsPrefixKey
     }
 
+    // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support 
for this will be
+    // added in the future along with state metadata changes.
+    // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524
+    val keyStateEncoderType = if (numColsPrefixKey > 0) {
+      PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+    } else {
+      NoPrefixKeyStateEncoderSpec(keySchema)
+    }
+
     StateStoreProvider.createAndInit(
-      stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey,
-      useColumnFamilies = false, storeConf, hadoopConf.value, 
useMultipleValuesPerKey = false)
+      stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType,
+      useColumnFamilies = false, storeConf, hadoopConf.value,
+      useMultipleValuesPerKey = false)
   }
 
   private lazy val store: ReadStateStore = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 26a2ee71e7b6..01b16b63fa27 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -226,7 +226,7 @@ trait FlatMapGroupsWithStateExecBase
             storeProviderId,
             groupingAttributes.toStructType,
             stateManager.stateSchema,
-            numColsPrefixKey = 0,
+            NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType),
             stateInfo.get.storeVersion,
             useColumnFamilies = false,
             storeConf, hadoopConfBroadcast.value.value)
@@ -238,7 +238,7 @@ trait FlatMapGroupsWithStateExecBase
         getStateInfo,
         groupingAttributes.toStructType,
         stateManager.stateSchema,
-        numColsPrefixKey = 0,
+        NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType),
         session.sqlContext.sessionState,
         Some(session.sqlContext.streams.stateStoreCoordinator)
       ) { case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
index d0be62293d05..662bef5716ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import 
org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA,
 VALUE_ROW_SCHEMA}
-import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore, StateStoreErrors}
 import org.apache.spark.sql.streaming.ListState
 
 /**
@@ -44,8 +44,8 @@ class ListStateImpl[S](
 
   private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, 
stateName)
 
-  store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, numColsPrefixKey = 
0,
-    VALUE_ROW_SCHEMA, useMultipleValuesPerKey = true)
+  store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA,
+    NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), useMultipleValuesPerKey = 
true)
 
   /** Whether state exists or not. */
    override def exists(): Boolean = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
index 91f6be4ddfd1..d2ccd0a77807 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors, UnsafeRowPair}
+import 
org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, 
StateStore, StateStoreErrors, UnsafeRowPair}
 import org.apache.spark.sql.streaming.MapState
 import org.apache.spark.sql.types.{BinaryType, StructType}
 
@@ -40,8 +40,8 @@ class MapStateImpl[K, V](
   private val stateTypesEncoder = new CompositeKeyStateEncoder(
     keySerializer, userKeyEnc, valEncoder, schemaForCompositeKeyRow, stateName)
 
-  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, 
numColsPrefixKey = 1,
-    schemaForValueRow)
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, 
schemaForValueRow,
+    PrefixKeyScanStateEncoderSpec(schemaForCompositeKeyRow, 1))
 
   /** Whether state exists or not. */
   override def exists(): Boolean = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
index d8b5cb7ef073..6166374d25e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
@@ -85,16 +85,14 @@ class TimerStateImpl(
   }
 
   val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
-  store.createColFamilyIfAbsent(keyToTsCFName,
-    schemaForKeyRow, numColsPrefixKey = 1,
-    schemaForValueRow, useMultipleValuesPerKey = false,
-    isInternal = true)
+  store.createColFamilyIfAbsent(keyToTsCFName, schemaForKeyRow,
+    schemaForValueRow, PrefixKeyScanStateEncoderSpec(schemaForKeyRow, 1),
+    useMultipleValuesPerKey = false, isInternal = true)
 
   val tsToKeyCFName = timerCFName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
-  store.createColFamilyIfAbsent(tsToKeyCFName,
-    keySchemaForSecIndex, numColsPrefixKey = 0,
-    schemaForValueRow, useMultipleValuesPerKey = false,
-    isInternal = true)
+  store.createColFamilyIfAbsent(tsToKeyCFName, keySchemaForSecIndex,
+    schemaForValueRow, NoPrefixKeyStateEncoderSpec(keySchemaForSecIndex),
+    useMultipleValuesPerKey = false, isInternal = true)
 
   private def getGroupingKey(cfName: String): Any = {
     val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 500da5492f88..39365e92185a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -276,7 +276,7 @@ case class TransformWithStateExec(
         getStateInfo,
         schemaForKeyRow,
         schemaForValueRow,
-        numColsPrefixKey = 0,
+        NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
         session.sqlContext.sessionState,
         Some(session.sqlContext.streams.stateStoreCoordinator),
         useColumnFamilies = true,
@@ -308,7 +308,7 @@ case class TransformWithStateExec(
             providerId,
             schemaForKeyRow,
             schemaForValueRow,
-            numColsPrefixKey = 0,
+            NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
             useColumnFamilies = true,
             storeConf = storeConf,
             hadoopConf = broadcastedHadoopConf.value,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
index 5d2b9881c78d..08876ca3032e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import 
org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA,
 VALUE_ROW_SCHEMA}
-import org.apache.spark.sql.execution.streaming.state.StateStore
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore}
 import org.apache.spark.sql.streaming.ValueState
 
 /**
@@ -43,8 +43,8 @@ class ValueStateImpl[S](
 
   private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, 
stateName)
 
-  store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, numColsPrefixKey = 
0,
-    VALUE_ROW_SCHEMA)
+  store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA,
+    NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))
 
   /** Function to check if state exists. Returns true if present and false 
otherwise */
   override def exists(): Boolean = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 42d10f6c1bd5..5fbc69ac6093 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -123,8 +123,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     override def createColFamilyIfAbsent(
         colFamilyName: String,
         keySchema: StructType,
-        numColsPrefixKey: Int,
         valueSchema: StructType,
+        keyStateEncoderSpec: KeyStateEncoderSpec,
         useMultipleValuesPerKey: Boolean = false,
         isInternal: Boolean = false): Unit = {
       throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
@@ -280,11 +280,39 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
   }
 
+  // Run bunch of validations specific to HDFSBackedStateStoreProvider
+  private def runValidation(
+      useColumnFamilies: Boolean,
+      useMultipleValuesPerKey: Boolean,
+      keyStateEncoderSpec: KeyStateEncoderSpec): Unit = {
+    // TODO: add support for multiple col families with 
HDFSBackedStateStoreProvider
+    if (useColumnFamilies) {
+      throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
+    }
+
+    if (useMultipleValuesPerKey) {
+      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
providerName)
+    }
+
+    if (keyStateEncoderSpec.isInstanceOf[RangeKeyScanStateEncoderSpec]) {
+      throw StateStoreErrors.unsupportedOperationException("Range scan", 
providerName)
+    }
+  }
+
+  private def getNumColsPrefixKey(keyStateEncoderSpec: KeyStateEncoderSpec): 
Int = {
+    keyStateEncoderSpec match {
+      case NoPrefixKeyStateEncoderSpec(_) => 0
+      case PrefixKeyScanStateEncoderSpec(_, numColsPrefixKey) => 
numColsPrefixKey
+      case _ => throw StateStoreErrors.unsupportedOperationException("Invalid 
key state encoder",
+        providerName)
+    }
+  }
+
   override def init(
       stateStoreId: StateStoreId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
       hadoopConf: Configuration,
@@ -296,19 +324,10 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     this.hadoopConf = hadoopConf
     this.numberOfVersionsToRetainInMemory = 
storeConf.maxVersionsToRetainInMemory
 
-    // TODO: add support for multiple col families with 
HDFSBackedStateStoreProvider
-    if (useColumnFamilies) {
-      throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
-    }
-
-    if (useMultipleValuesPerKey) {
-      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
providerName)
-    }
+    // run a bunch of validation checks for this state store provider
+    runValidation(useColumnFamilies, useMultipleValuesPerKey, 
keyStateEncoderSpec)
 
-    require((keySchema.length == 0 && numColsPrefixKey == 0) ||
-      (keySchema.length > numColsPrefixKey), "The number of columns in the key 
must be " +
-      "greater than the number of columns for prefix key!")
-    this.numColsPrefixKey = numColsPrefixKey
+    this.numColsPrefixKey = getNumColsPrefixKey(keyStateEncoderSpec)
 
     fm.mkdirs(baseDir)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 8f58bccd948b..f342853514d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -17,16 +17,18 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 
 sealed trait RocksDBKeyStateEncoder {
   def supportPrefixKeyScan: Boolean
   def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
-  def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
 }
@@ -39,13 +41,21 @@ sealed trait RocksDBValueStateEncoder {
 }
 
 object RocksDBStateEncoder {
-  def getKeyEncoder(
-      keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+  def getKeyEncoder(keyStateEncoderSpec: KeyStateEncoderSpec): 
RocksDBKeyStateEncoder = {
+    // Return the key state encoder based on the requested type
+    keyStateEncoderSpec match {
+      case NoPrefixKeyStateEncoderSpec(keySchema) =>
+        new NoPrefixKeyStateEncoder(keySchema)
+
+      case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
+        new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
+
+      case RangeKeyScanStateEncoderSpec(keySchema, numOrderingCols) =>
+        new RangeKeyScanStateEncoder(keySchema, numOrderingCols)
+
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported key state encoder 
spec: " +
+          s"$keyStateEncoderSpec")
     }
   }
 
@@ -110,9 +120,6 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the 
key must be " +
-    "greater than the number of columns for prefix key!")
-
   private val prefixKeyFieldsWithIdx: Seq[(StructField, Int)] = {
     keySchema.zipWithIndex.take(numColsPrefixKey)
   }
@@ -176,7 +183,7 @@ class PrefixKeyScanStateEncoder(
     
restoreKeyProjection(joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded))
   }
 
-  override def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
     prefixKeyProjection(key)
   }
 
@@ -192,6 +199,250 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields
+ *
+ * To encode a row for range scan, we first project the first numOrderingCols 
needed
+ * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's 
fields in BIG_ENDIAN
+ * to allow for scanning keys in sorted order using the byte-wise comparison 
method that
+ * RocksDB uses.
+ * Then, for the rest of the fields, we project those into another UnsafeRow.
+ * We then effectively join these two UnsafeRows together, and finally take 
those bytes
+ * to get the resulting row.
+ * We cannot support variable sized fields given the UnsafeRow format which 
stores variable
+ * sized fields as offset and length pointers to the actual values, thereby 
changing the required
+ * ordering.
+ * Note that we also support "null" values being passed for these fixed size 
fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We 
cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change 
if non-first
+ * columns are marked as null. If the first col is null, those entries will 
appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous 
columns will
+ * still be honored. For rows with null column values, ordering for subsequent 
columns
+ * will also be maintained within those set of rows.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numOrderingCols - number of columns to be used for range scan
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      // NullType is technically fixed size, but not supported for ordering
+      if (field.dataType == NullType) {
+        throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, 
idx.toString)
+      } else {
+        throw 
StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+      }
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = 
UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
+  // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
+  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
+  // the sorting order on iteration.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // Note that we cannot allocate a smaller buffer here even if the value 
is null
+      // because the effective byte array is considered variable size and 
needs to have
+      // the same size across all rows for the ordering to work as expected.
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {
+        writer.write(idx, bbuf.array())
+      } else {
+        field.dataType match {
+          case BooleanType =>
+          case ByteType =>
+            bbuf.put(value.asInstanceOf[Byte])
+            writer.write(idx, bbuf.array())
+
+          // for other multi-byte types, we need to convert to big-endian
+          case ShortType =>
+            bbuf.putShort(value.asInstanceOf[Short])
+            writer.write(idx, bbuf.array())
+
+          case IntegerType =>
+            bbuf.putInt(value.asInstanceOf[Int])
+            writer.write(idx, bbuf.array())
+
+          case LongType =>
+            bbuf.putLong(value.asInstanceOf[Long])
+            writer.write(idx, bbuf.array())
+
+          case FloatType =>
+            bbuf.putFloat(value.asInstanceOf[Float])
+            writer.write(idx, bbuf.array())
+
+          case DoubleType =>
+            bbuf.putDouble(value.asInstanceOf[Double])
+            writer.write(idx, bbuf.array())
+        }
+      }
+    }
+    writer.getRow()
+  }
+
+  // Rewrite the unsafe row by converting back from BIG_ENDIAN byte arrays to 
the
+  // original data types.
+  // For decode, we extract the byte array from the UnsafeRow, and then read 
the first byte
+  // to determine if the value is null or not. If the value is null, we set 
the ordinal on
+  // the UnsafeRow to null. If the value is not null, we read the rest of the 
bytes to get the
+  // actual value.
+  private def decodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.getBinary(idx)
+      val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      val isNullCol = bbuf.get()
+      if (isNullCol == 0x01.toByte) {
+        // set the column to null and skip reading the next byte
+        writer.setNullAt(idx)
+      } else {
+        field.dataType match {
+          case BooleanType =>
+          case ByteType =>
+            writer.write(idx, bbuf.get)
+
+          case ShortType =>
+            writer.write(idx, bbuf.getShort)
+
+          case IntegerType =>
+            writer.write(idx, bbuf.getInt)
+
+          case LongType =>
+            writer.write(idx, bbuf.getLong)
+
+          case FloatType =>
+            writer.write(idx, bbuf.getFloat)
+
+          case DoubleType =>
+            writer.write(idx, bbuf.getDouble)
+        }
+      }
+    }
+    writer.getRow()
+  }
+
+  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+    val prefixKey = extractPrefixKey(row)
+    val rangeScanKeyEncoded = 
encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey))
+
+    val result = if (numOrderingCols < keySchema.length) {
+      val remainingEncoded = encodeUnsafeRow(remainingKeyProjection(row))
+      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + 
remainingEncoded.length + 4)
+      Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
rangeScanKeyEncoded.length)
+      Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET,
+        encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4, 
rangeScanKeyEncoded.length)
+      // NOTE: We don't put the length of remainingEncoded as we can calculate 
later
+      // on deserialization.
+      Platform.copyMemory(remainingEncoded, Platform.BYTE_ARRAY_OFFSET,
+        encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4 + 
rangeScanKeyEncoded.length,
+        remainingEncoded.length)
+      encodedBytes
+    } else {
+      // if the num of ordering cols is same as num of key schema cols, we 
don't need to
+      // encode the remaining key as it's empty.
+      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + 4)
+      Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
rangeScanKeyEncoded.length)
+      Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET,
+        encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4, 
rangeScanKeyEncoded.length)
+      encodedBytes
+    }
+    result
+  }
+
+  override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+    val prefixKeyEncodedLen = Platform.getInt(keyBytes, 
Platform.BYTE_ARRAY_OFFSET)
+    val prefixKeyEncoded = new Array[Byte](prefixKeyEncodedLen)
+    Platform.copyMemory(keyBytes, Platform.BYTE_ARRAY_OFFSET + 4, 
prefixKeyEncoded,
+      Platform.BYTE_ARRAY_OFFSET, prefixKeyEncodedLen)
+
+    val prefixKeyDecodedForRangeScan = decodeToUnsafeRow(prefixKeyEncoded,
+      numFields = numOrderingCols)
+    val prefixKeyDecoded = 
decodePrefixKeyForRangeScan(prefixKeyDecodedForRangeScan)
+
+    if (numOrderingCols < keySchema.length) {
+      // Here we calculate the remainingKeyEncodedLen leveraging the length of 
keyBytes
+      val remainingKeyEncodedLen = keyBytes.length - 4 - prefixKeyEncodedLen
+
+      val remainingKeyEncoded = new Array[Byte](remainingKeyEncodedLen)
+      Platform.copyMemory(keyBytes, Platform.BYTE_ARRAY_OFFSET + 4 +
+        prefixKeyEncodedLen, remainingKeyEncoded, Platform.BYTE_ARRAY_OFFSET,
+        remainingKeyEncodedLen)
+
+      val remainingKeyDecoded = decodeToUnsafeRow(remainingKeyEncoded,
+        numFields = keySchema.length - numOrderingCols)
+
+      
restoreKeyProjection(joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded))
+    } else {
+      // if the number of ordering cols is same as the number of key schema 
cols, we only
+      // return the prefix key decoded unsafe row.
+      prefixKeyDecoded
+    }
+  }
+
+  override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+    val rangeScanKeyEncoded = 
encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey))
+    val prefix = new Array[Byte](rangeScanKeyEncoded.length + 4)
+    Platform.putInt(prefix, Platform.BYTE_ARRAY_OFFSET, 
rangeScanKeyEncoded.length)
+    Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET, 
prefix,
+      Platform.BYTE_ARRAY_OFFSET + 4, rangeScanKeyEncoded.length)
+    prefix
+  }
+
+  override def supportPrefixKeyScan: Boolean = true
+}
+
 /**
  * RocksDB Key Encoder for UnsafeRow that does not support prefix key scan.
  *
@@ -225,10 +476,6 @@ class NoPrefixKeyStateEncoder(keySchema: StructType)
 
   override def supportPrefixKeyScan: Boolean = false
 
-  override def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
-    throw new IllegalStateException("This encoder doesn't support prefix key!")
-  }
-
   override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
     throw new IllegalStateException("This encoder doesn't support prefix key!")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 89471f6af535..c8537f2a6a5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -51,8 +51,8 @@ private[sql] class RocksDBStateStoreProvider
     override def createColFamilyIfAbsent(
         colFamilyName: String,
         keySchema: StructType,
-        numColsPrefixKey: Int,
         valueSchema: StructType,
+        keyStateEncoderSpec: KeyStateEncoderSpec,
         useMultipleValuesPerKey: Boolean = false,
         isInternal: Boolean = false): Unit = {
       verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME,
@@ -60,7 +60,7 @@ private[sql] class RocksDBStateStoreProvider
       verify(useColumnFamilies, "Column families are not supported in this 
store")
       rocksDB.createColFamilyIfAbsent(colFamilyName, isInternal)
       keyValueEncoderMap.putIfAbsent(colFamilyName,
-        (RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey),
+        (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec),
          RocksDBStateEncoder.getValueEncoder(valueSchema, 
useMultipleValuesPerKey)))
     }
 
@@ -263,7 +263,7 @@ private[sql] class RocksDBStateStoreProvider
       stateStoreId: StateStoreId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
       hadoopConf: Configuration,
@@ -275,19 +275,13 @@ private[sql] class RocksDBStateStoreProvider
     this.hadoopConf = hadoopConf
     this.useColumnFamilies = useColumnFamilies
 
-    require((keySchema.length == 0 && numColsPrefixKey == 0) ||
-      (keySchema.length > numColsPrefixKey), "The number of columns in the key 
must be " +
-      "greater than the number of columns for prefix key!")
-
     if (useMultipleValuesPerKey) {
-      require(numColsPrefixKey == 0, "Both multiple values per key, and prefix 
key are not " +
-        "supported simultaneously.")
       require(useColumnFamilies, "Multiple values per key support requires 
column families to be" +
         " enabled in RocksDBStateStore.")
     }
 
     keyValueEncoderMap.putIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME,
-      (RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey),
+      (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec),
        RocksDBStateEncoder.getValueEncoder(valueSchema, 
useMultipleValuesPerKey)))
 
     rocksDB // lazy initialization
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9247c9fe41b6..dd97aa5b9afc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -128,8 +128,8 @@ trait StateStore extends ReadStateStore {
   def createColFamilyIfAbsent(
       colFamilyName: String,
       keySchema: StructType,
-      numColsPrefixKey: Int,
       valueSchema: StructType,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useMultipleValuesPerKey: Boolean = false,
       isInternal: Boolean = false): Unit
 
@@ -289,6 +289,26 @@ class InvalidUnsafeRowException(error: String)
     "among restart. For the first case, you can try to restart the application 
without " +
     s"checkpoint or use the legacy Spark version to process the streaming 
state.\n$error", null)
 
+sealed trait KeyStateEncoderSpec
+
+case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends 
KeyStateEncoderSpec
+
+case class PrefixKeyScanStateEncoderSpec(
+    keySchema: StructType,
+    numColsPrefixKey: Int) extends KeyStateEncoderSpec {
+  if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString)
+  }
+}
+
+case class RangeKeyScanStateEncoderSpec(
+    keySchema: StructType,
+    numColsPrefixKey: Int) extends KeyStateEncoderSpec {
+  if (numColsPrefixKey == 0 || numColsPrefixKey > keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsForRangeScan(numColsPrefixKey.toString)
+  }
+}
+
 /**
  * Trait representing a provider that provide [[StateStore]] instances 
representing
  * versions of state data.
@@ -330,7 +350,7 @@ trait StateStoreProvider {
       stateStoreId: StateStoreId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConfs: StateStoreConf,
       hadoopConf: Configuration,
@@ -385,13 +405,13 @@ object StateStoreProvider {
       providerId: StateStoreProviderId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
       hadoopConf: Configuration,
       useMultipleValuesPerKey: Boolean): StateStoreProvider = {
     val provider = create(storeConf.providerClass)
-    provider.init(providerId.storeId, keySchema, valueSchema, numColsPrefixKey,
+    provider.init(providerId.storeId, keySchema, valueSchema, 
keyStateEncoderSpec,
       useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey)
     provider
   }
@@ -581,7 +601,7 @@ object StateStore extends Logging {
       storeProviderId: StateStoreProviderId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       version: Long,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
@@ -591,7 +611,7 @@ object StateStore extends Logging {
       throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
     }
     val storeProvider = getStateStoreProvider(storeProviderId, keySchema, 
valueSchema,
-      numColsPrefixKey, useColumnFamilies, storeConf, hadoopConf, 
useMultipleValuesPerKey)
+      keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, 
useMultipleValuesPerKey)
     storeProvider.getReadStore(version)
   }
 
@@ -600,7 +620,7 @@ object StateStore extends Logging {
       storeProviderId: StateStoreProviderId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       version: Long,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
@@ -610,7 +630,7 @@ object StateStore extends Logging {
       throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
     }
     val storeProvider = getStateStoreProvider(storeProviderId, keySchema, 
valueSchema,
-      numColsPrefixKey, useColumnFamilies, storeConf, hadoopConf, 
useMultipleValuesPerKey)
+      keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, 
useMultipleValuesPerKey)
     storeProvider.getStore(version)
   }
 
@@ -618,7 +638,7 @@ object StateStore extends Logging {
       storeProviderId: StateStoreProviderId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
       hadoopConf: Configuration,
@@ -655,7 +675,7 @@ object StateStore extends Logging {
         loadedProviders.getOrElseUpdate(
           storeProviderId,
           StateStoreProvider.createAndInit(
-            storeProviderId, keySchema, valueSchema, numColsPrefixKey,
+            storeProviderId, keySchema, valueSchema, keyStateEncoderSpec,
             useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey)
         )
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 7d1ec7f03237..a8d4c06bc83c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -76,9 +76,29 @@ object StateStoreErrors {
       messageParameters = Map("stateName" -> stateName))
   }
 
+  def incorrectNumOrderingColsForPrefixScan(numPrefixCols: String):
+    StateStoreIncorrectNumOrderingColsForPrefixScan = {
+    new StateStoreIncorrectNumOrderingColsForPrefixScan(numPrefixCols)
+  }
+
+  def incorrectNumOrderingColsForRangeScan(numOrderingCols: String):
+    StateStoreIncorrectNumOrderingColsForRangeScan = {
+    new StateStoreIncorrectNumOrderingColsForRangeScan(numOrderingCols)
+  }
+
+  def nullTypeOrderingColsNotSupported(fieldName: String, index: String):
+    StateStoreNullTypeOrderingColsNotSupported = {
+    new StateStoreNullTypeOrderingColsNotSupported(fieldName, index)
+  }
+
+  def variableSizeOrderingColsNotSupported(fieldName: String, index: String):
+    StateStoreVariableSizeOrderingColsNotSupported = {
+    new StateStoreVariableSizeOrderingColsNotSupported(fieldName, index)
+  }
+
   def cannotCreateColumnFamilyWithReservedChars(colFamilyName: String):
     StateStoreCannotCreateColumnFamilyWithReservedChars = {
-      new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName)
+    new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName)
   }
 
   def cannotPerformOperationWithInvalidTimeoutMode(
@@ -142,3 +162,23 @@ class StateStoreUnsupportedOperationOnMissingColumnFamily(
     colFamilyName: String) extends SparkUnsupportedOperationException(
   errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
   messageParameters = Map("operationType" -> operationType, "colFamilyName" -> 
colFamilyName))
+
+class StateStoreIncorrectNumOrderingColsForPrefixScan(numPrefixCols: String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN",
+    messageParameters = Map("numPrefixCols" -> numPrefixCols))
+
+class StateStoreIncorrectNumOrderingColsForRangeScan(numOrderingCols: String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+    messageParameters = Map("numOrderingCols" -> numOrderingCols))
+
+class StateStoreVariableSizeOrderingColsNotSupported(fieldName: String, index: 
String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+    messageParameters = Map("fieldName" -> fieldName, "index" -> index))
+
+class StateStoreNullTypeOrderingColsNotSupported(fieldName: String, index: 
String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+    messageParameters = Map("fieldName" -> fieldName, "index" -> index))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index 1af51c49eaa5..133b4ab1cce3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -74,7 +74,7 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag](
     storeVersion: Long,
     keySchema: StructType,
     valueSchema: StructType,
-    numColsPrefixKey: Int,
+    keyStateEncoderSpec: KeyStateEncoderSpec,
     sessionState: SessionState,
     @transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
     useColumnFamilies: Boolean = false,
@@ -89,7 +89,7 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag](
 
     val inputIter = dataRDD.iterator(partition, ctxt)
     val store = StateStore.getReadOnly(
-      storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion,
+      storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, 
storeVersion,
       useColumnFamilies, storeConf, hadoopConfBroadcast.value.value)
     storeReadFunction(store, inputIter)
   }
@@ -109,7 +109,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     storeVersion: Long,
     keySchema: StructType,
     valueSchema: StructType,
-    numColsPrefixKey: Int,
+    keyStateEncoderSpec: KeyStateEncoderSpec,
     sessionState: SessionState,
     @transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
     useColumnFamilies: Boolean = false,
@@ -125,8 +125,9 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
 
     val inputIter = dataRDD.iterator(partition, ctxt)
     val store = StateStore.get(
-      storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion,
-      useColumnFamilies, storeConf, hadoopConfBroadcast.value.value, 
useMultipleValuesPerKey)
+      storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, 
storeVersion,
+      useColumnFamilies, storeConf, hadoopConfBroadcast.value.value,
+      useMultipleValuesPerKey)
     storeUpdateFunction(store, inputIter)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index b35cf2492666..9802a4dce4e5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -480,12 +480,12 @@ class SymmetricHashJoinStateManager(
         stateInfo.get, partitionId, getStateStoreName(joinSide, 
stateStoreType))
       val store = if (useStateStoreCoordinator) {
         StateStore.get(
-          storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
+          storeProviderId, keySchema, valueSchema, 
NoPrefixKeyStateEncoderSpec(keySchema),
           stateInfo.get.storeVersion, useColumnFamilies = false, storeConf, 
hadoopConf)
       } else {
         // This class will manage the state store provider by itself.
         stateStoreProvider = StateStoreProvider.createAndInit(
-          storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
+          storeProviderId, keySchema, valueSchema, 
NoPrefixKeyStateEncoderSpec(keySchema),
           useColumnFamilies = false, storeConf, hadoopConf,
           useMultipleValuesPerKey = false)
         stateStoreProvider.getStore(stateInfo.get.storeVersion)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 12cd7b8a127e..44e939424f55 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -35,25 +35,27 @@ package object state {
         stateInfo: StatefulOperatorStateInfo,
         keySchema: StructType,
         valueSchema: StructType,
-        numColsPrefixKey: Int)(
+        keyStateEncoderSpec: KeyStateEncoderSpec)(
         storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): 
StateStoreRDD[T, U] = {
 
       mapPartitionsWithStateStore(
         stateInfo,
         keySchema,
         valueSchema,
-        numColsPrefixKey,
+        keyStateEncoderSpec,
         sqlContext.sessionState,
         Some(sqlContext.streams.stateStoreCoordinator))(
         storeUpdateFunction)
     }
 
+    // Disable scala style because num parameters exceeds the max limit used 
to enforce scala style
+    // scalastyle:off
     /** Map each partition of an RDD along with data in a [[StateStore]]. */
     def mapPartitionsWithStateStore[U: ClassTag](
         stateInfo: StatefulOperatorStateInfo,
         keySchema: StructType,
         valueSchema: StructType,
-        numColsPrefixKey: Int,
+        keyStateEncoderSpec: KeyStateEncoderSpec,
         sessionState: SessionState,
         storeCoordinator: Option[StateStoreCoordinatorRef],
         useColumnFamilies: Boolean = false,
@@ -79,20 +81,21 @@ package object state {
         stateInfo.storeVersion,
         keySchema,
         valueSchema,
-        numColsPrefixKey,
+        keyStateEncoderSpec,
         sessionState,
         storeCoordinator,
         useColumnFamilies,
         extraOptions,
         useMultipleValuesPerKey)
     }
+    // scalastyle:on
 
     /** Map each partition of an RDD along with data in a [[ReadStateStore]]. 
*/
     private[streaming] def mapPartitionsWithReadStateStore[U: ClassTag](
         stateInfo: StatefulOperatorStateInfo,
         keySchema: StructType,
         valueSchema: StructType,
-        numColsPrefixKey: Int,
+        keyStateEncoderSpec: KeyStateEncoderSpec,
         sessionState: SessionState,
         storeCoordinator: Option[StateStoreCoordinatorRef],
         useColumnFamilies: Boolean = false,
@@ -117,7 +120,7 @@ package object state {
         stateInfo.storeVersion,
         keySchema,
         valueSchema,
-        numColsPrefixKey,
+        keyStateEncoderSpec,
         sessionState,
         storeCoordinator,
         useColumnFamilies,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 1793b72b9039..3bf833816bcc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -431,7 +431,7 @@ case class StateStoreRestoreExec(
       getStateInfo,
       keyExpressions.toStructType,
       stateManager.getStateValueSchema,
-      numColsPrefixKey = 0,
+      NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType),
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
       val hasInput = iter.hasNext
@@ -495,7 +495,7 @@ case class StateStoreSaveExec(
       getStateInfo,
       keyExpressions.toStructType,
       stateManager.getStateValueSchema,
-      numColsPrefixKey = 0,
+      NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType),
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
         val numOutputRows = longMetric("numOutputRows")
@@ -697,7 +697,8 @@ case class SessionWindowStateStoreRestoreExec(
       getStateInfo,
       stateManager.getStateKeySchema,
       stateManager.getStateValueSchema,
-      numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+      PrefixKeyScanStateEncoderSpec(stateManager.getStateKeySchema,
+        stateManager.getNumColsForPrefixKey),
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
 
@@ -782,7 +783,8 @@ case class SessionWindowStateStoreSaveExec(
       getStateInfo,
       stateManager.getStateKeySchema,
       stateManager.getStateValueSchema,
-      numColsPrefixKey = stateManager.getNumColsForPrefixKey,
+      PrefixKeyScanStateEncoderSpec(stateManager.getStateKeySchema,
+        stateManager.getNumColsForPrefixKey),
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
 
@@ -971,7 +973,7 @@ abstract class BaseStreamingDeduplicateExec
       getStateInfo,
       keyExpressions.toStructType,
       schemaForValueRow,
-      numColsPrefixKey = 0,
+      NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType),
       session.sessionState,
       Some(session.streams.stateStoreCoordinator),
       extraOptions = extraOptionOnStateStore) { (store, iter) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
index 8bba9b8d33c1..e0e3ee582bef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, 
Partitioning}
 import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.execution.streaming.state.StateStoreOps
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStoreOps}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
 import org.apache.spark.util.{CompletionIterator, NextIterator}
@@ -52,7 +52,7 @@ case class StreamingGlobalLimitExec(
         getStateInfo,
         keySchema,
         valueSchema,
-        numColsPrefixKey = 0,
+        NoPrefixKeyStateEncoderSpec(keySchema),
         session.sessionState,
         Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
       val key = UnsafeProjection.create(keySchema)(new 
GenericInternalRow(Array[Any](null)))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
index 3c8e0ca3e52d..a5c393ac0567 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
-import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, 
StateStoreProvider}
+import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
NoPrefixKeyStateEncoderSpec, RocksDBStateStoreProvider, StateStore, 
StateStoreConf, StateStoreId, StateStoreProvider}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType, 
TimestampType}
 import org.apache.spark.util.Utils
@@ -346,8 +346,8 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     val provider = new HDFSBackedStateStoreProvider()
     val storeConf = new StateStoreConf(new SQLConf())
     provider.init(
-      storeId, keySchema, valueSchema, 0, useColumnFamilies = false,
-      storeConf, new Configuration)
+      storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
+      useColumnFamilies = false, storeConf, new Configuration)
     provider
   }
 
@@ -361,8 +361,8 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     val storeConf = new StateStoreConf(sqlConf)
 
     provider.init(
-      storeId, keySchema, valueSchema, 0, useColumnFamilies = false,
-      storeConf, new Configuration)
+      storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
+      useColumnFamilies = false, storeConf, new Configuration)
     provider
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala
index af2cd277b509..fc0c239a5d99 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, 
StateStoreProviderId, StreamingSessionWindowStateManager}
+import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateStore, 
StateStoreConf, StateStoreId, StateStoreProviderId, 
StreamingSessionWindowStateManager}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType}
@@ -220,8 +220,8 @@ class MergingSortWithSessionWindowStateIteratorSuite 
extends StreamTest with Bef
       val storeProviderId = StateStoreProviderId(stateInfo, 0, 
StateStoreId.DEFAULT_STORE_NAME)
       val store = StateStore.get(
         storeProviderId, manager.getStateKeySchema, 
manager.getStateValueSchema,
-        manager.getNumColsForPrefixKey, stateInfo.storeVersion,
-        useColumnFamilies = false, storeConf, new Configuration)
+        PrefixKeyScanStateEncoderSpec(manager.getStateKeySchema, 
manager.getNumColsForPrefixKey),
+        stateInfo.storeVersion, useColumnFamilies = false, storeConf, new 
Configuration)
 
       try {
         f(manager, store)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
index 91ffb7a66adc..5af8dc9a726d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
@@ -33,8 +33,8 @@ class MemoryStateStore extends StateStore() {
   override def createColFamilyIfAbsent(
       colFamilyName: String,
       keySchema: StructType,
-      numColsPrefixKey: Int,
       valueSchema: StructType,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useMultipleValuesPerKey: Boolean = false,
       isInternal: Boolean = false): Unit = {
     throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 1e838ccdb023..0b347e272a48 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -24,10 +24,11 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkUnsupportedOperationException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.LocalSparkSession.withSparkSession
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import org.apache.spark.sql.internal.SQLConf
@@ -35,6 +36,7 @@ import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.ExtendedSQLTest
 import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 @ExtendedSQLTest
@@ -102,7 +104,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       // Create state store in a task and get the RocksDBConf from the 
instantiated RocksDB instance
       val rocksDBConfInTask: RocksDBConf = 
testRDD.mapPartitionsWithStateStore[RocksDBConf](
-        spark.sqlContext, testStateInfo, testSchema, testSchema, 0) {
+        spark.sqlContext, testStateInfo, testSchema, testSchema,
+        NoPrefixKeyStateEncoderSpec(testSchema)) {
           (store: StateStore, _: Iterator[String]) =>
             // Use reflection to get RocksDB instance
             val dbInstanceMethod =
@@ -158,6 +161,501 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 
keySchemaWithRangeScan.length + 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - null type columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithNullTypeCols: StructType = StructType(
+      Seq(StructField("key1", NullType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithNullTypeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - 
variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 
68), (90L, 2000),
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), 
(9L, 95118), (6L, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - 
variable size " +
+    s"non-ordering columns with null values in first ordering column",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, true),
+        StructField("key2", IntegerType, true),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (null, 40), (452300L, 1),
+        (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
+        (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      // verify that the expected null cols are seen
+      val nullRows = store.iterator(cfName).filter { kv =>
+        val keyRow = kv.key
+        keyRow.isNullAt(0)
+      }
+      assert(nullRows.size === 3)
+
+      // filter out the null rows and verify the rest
+      val result: Seq[(Long, Int)] = store.iterator(cfName).filter { kv =>
+        val keyRow = kv.key
+        !keyRow.isNullAt(0)
+      }.map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+
+      val timerTimestampsWithoutNulls = Seq((931L, 10), (452300L, 1),
+        (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
+        (6L, 90118), (9L, 95118), (6L, 87210))
+
+      assert(result === timerTimestampsWithoutNulls.sorted)
+
+      // verify that the null cols are seen in the correct order filtering for 
nulls
+      val nullRowsWithOrder = store.iterator(cfName).filter { kv =>
+        val keyRow = kv.key
+        keyRow.isNullAt(0)
+      }.map { kv =>
+        val keyRow = kv.key
+        keyRow.getInt(1)
+      }.toSeq
+
+      assert(nullRowsWithOrder === Seq(28, 40, 113))
+
+      store.abort()
+
+      val store1 = provider.getStore(0)
+      if (colFamiliesEnabled) {
+        store1.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32), (null, 
113), (null, 40872),
+        (null, 66))
+      timerTimestamps1.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      // verify that ordering for non-null columns on the right in still 
maintained
+      val result1: Seq[Int] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        keyRow.getInt(1)
+      }.toSeq
+
+      assert(result1 === timerTimestamps1.map(_._2).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - 
variable size " +
+    s"non-ordering columns with null values in second ordering column",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, true),
+        StructField("key2", IntegerType, true),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (40L, null), (452300L, 1),
+        (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
+        (6L, 90118), (9L, 95118), (6L, 87210), (113L, null), (100L, null))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      // verify that the expected null cols are seen
+      val nullRows = store.iterator(cfName).filter { kv =>
+        val keyRow = kv.key
+        keyRow.isNullAt(1)
+      }
+      assert(nullRows.size === 3)
+
+      // the ordering based on first col which has non-null values should be 
preserved
+      val result: Seq[(Long, Int)] = store.iterator(cfName)
+        .map { kv =>
+          val keyRow = kv.key
+          val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+          (key._1, key._2)
+        }.toSeq
+      assert(result.map(_._1) === timerTimestamps.map(_._1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan byte ordering column - variable 
size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", ByteType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), 
(0x1F, 1), (0x01, 68),
+        (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 
2112),
+        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by byte col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - ordering cols and key schema 
cols are same",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    // use the same schema as value schema for single col key schema
+    tryWithProviderResource(newStoreProvider(valueSchema,
+      RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          valueSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(valueSchema, 1))
+      }
+
+      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 
6, 9, 5)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToValueRow(ts)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        valueRowToData(kv.key)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+
+      // also check for prefix scan
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToValueRow(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          valueRowToData(kv.key)
+        }.toSeq
+        assert(result.size === 1)
+      }
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - with prefix scan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 1L)
+      timerTimestamps.zipWithIndex.foreach { case (ts, idx) =>
+        (1 to idx + 1).foreach { keyVal =>
+          val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
+      }
+
+      timerTimestamps.zipWithIndex.foreach { case (ts, idx) =>
+        val prefix = dataToPrefixKeyRowWithRangeScan(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          val key = keyRowWithRangeScanToData(kv.key)
+          key._2
+        }.toSeq
+        assert(result.size === idx + 1)
+      }
+    }
+  }
+
   testWithColumnFamilies("rocksdb key and value schema encoders for column 
families",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
     val testColFamily = "testState"
@@ -165,7 +663,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
       val store = provider.getStore(0)
       if (colFamiliesEnabled) {
-        store.createColFamilyIfAbsent(testColFamily, keySchema, 
numColsPrefixKey = 0, valueSchema)
+        store.createColFamilyIfAbsent(testColFamily,
+          keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema))
         val keyRow1 = dataToKeyRow("a", 0)
         val valueRow1 = dataToValueRow(1)
         store.put(keyRow1, valueRow1, colFamilyName = testColFamily)
@@ -222,38 +721,48 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
   }
 
   def newStoreProvider(storeId: StateStoreId): RocksDBStateStoreProvider = {
-    newStoreProvider(storeId, numColsPrefixKey = 0)
+    newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema))
   }
 
   override def newStoreProvider(storeId: StateStoreId, useColumnFamilies: 
Boolean):
     RocksDBStateStoreProvider = {
-    newStoreProvider(storeId, numColsPrefixKey = 0, useColumnFamilies = 
useColumnFamilies)
+    newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema),
+    useColumnFamilies = useColumnFamilies)
   }
 
   override def newStoreProvider(useColumnFamilies: Boolean): 
RocksDBStateStoreProvider = {
-    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 
numColsPrefixKey = 0,
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0),
+      NoPrefixKeyStateEncoderSpec(keySchema),
       useColumnFamilies = useColumnFamilies)
   }
 
   def newStoreProvider(useColumnFamilies: Boolean,
       useMultipleValuesPerKey: Boolean): RocksDBStateStoreProvider = {
-    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 
numColsPrefixKey = 0,
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0),
+      NoPrefixKeyStateEncoderSpec(keySchema),
       useColumnFamilies = useColumnFamilies,
       useMultipleValuesPerKey = useMultipleValuesPerKey
     )
   }
 
   def newStoreProvider(storeId: StateStoreId, conf: Configuration): 
RocksDBStateStoreProvider = {
-    newStoreProvider(storeId, numColsPrefixKey = -1, conf = conf)
+    newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema), conf = 
conf)
   }
 
-  override def newStoreProvider(numPrefixCols: Int): RocksDBStateStoreProvider 
= {
-    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 
numColsPrefixKey = numPrefixCols)
+  override def newStoreProvider(
+      keySchema: StructType,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
+      useColumnFamilies: Boolean): RocksDBStateStoreProvider = {
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0),
+      keyStateEncoderSpec = keyStateEncoderSpec,
+      keySchema = keySchema,
+      useColumnFamilies = useColumnFamilies)
   }
 
   def newStoreProvider(
       storeId: StateStoreId,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
+      keySchema: StructType = keySchema,
       sqlConf: Option[SQLConf] = None,
       conf: Configuration = new Configuration,
       useColumnFamilies: Boolean = false,
@@ -263,12 +772,11 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       storeId,
       keySchema,
       valueSchema,
-      numColsPrefixKey = numColsPrefixKey,
+      keyStateEncoderSpec,
       useColumnFamilies,
       new StateStoreConf(sqlConf.getOrElse(SQLConf.get)),
       conf,
-      useMultipleValuesPerKey
-    )
+      useMultipleValuesPerKey)
     provider
   }
 
@@ -292,8 +800,9 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
   override def newStoreProvider(
     minDeltasForSnapshot: Int,
     numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = {
-    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 0,
-      Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)))
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0),
+      NoPrefixKeyStateEncoderSpec(keySchema),
+      sqlConf = Some(getDefaultSQLConf(minDeltasForSnapshot, 
numOfVersToRetainInMemory)))
   }
 
   override def getDefaultSQLConf(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index f2a555406336..3127c9f60249 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -62,13 +62,15 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
       val path = Utils.createDirectory(tempDir, 
Random.nextFloat().toString).toString
       val rdd1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 0)))
         .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, 
version = 0),
-          keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(increment)
       assert(rdd1.collect().toSet === Set(("a", 0) -> 2, ("b", 0) -> 1))
 
       // Generate next version of stores
       val rdd2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("c", 0)))
         .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, 
version = 1),
-          keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(increment)
       assert(rdd2.collect().toSet === Set(("a", 0) -> 3, ("b", 0) -> 1, ("c", 
0) -> 1))
 
       // Make sure the previous RDD still has the same data.
@@ -86,7 +88,8 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
       implicit val sqlContext = spark.sqlContext
       makeRDD(spark.sparkContext, Seq(("a", 0))).mapPartitionsWithStateStore(
         sqlContext, operatorStateInfo(path, version = storeVersion),
-        keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+        keySchema, valueSchema,
+        NoPrefixKeyStateEncoderSpec(keySchema))(increment)
     }
 
     // Generate RDDs and state store data
@@ -136,19 +139,22 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 
       val rddOfGets1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), 
("c", 0)))
         .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, 
version = 0),
-          keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfGets)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfGets)
       assert(rddOfGets1.collect().toSet ===
         Set(("a", 0) -> None, ("b", 0) -> None, ("c", 0) -> None))
 
       val rddOfPuts = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), 
("a", 0)))
         .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, 
version = 0),
-          keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfPuts)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfPuts)
       assert(rddOfPuts.collect().toSet ===
         Set(("a", 0) -> 1, ("a", 0) -> 2, ("b", 0) -> 1))
 
       val rddOfGets2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), 
("c", 0)))
         .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, 
version = 1),
-          keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfGets)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfGets)
       assert(rddOfGets2.collect().toSet ===
         Set(("a", 0) -> Some(2), ("b", 0) -> Some(1), ("c", 0) -> None))
     }
@@ -174,7 +180,8 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 
         val rdd = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 
0)))
           .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, 
queryRunId = queryRunId),
-          keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+          keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema))(increment)
         require(rdd.partitions.length === 2)
 
         assert(
@@ -202,13 +209,15 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
         val opId = 0
         val rdd1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 
0)))
           .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, 
version = 0),
-            keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+            keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema))(increment)
         assert(rdd1.collect().toSet === Set(("a", 0) -> 2, ("b", 0) -> 1))
 
         // Generate next version of stores
         val rdd2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("c", 0)))
           .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, 
version = 1),
-            keySchema, valueSchema, numColsPrefixKey = 0)(increment)
+            keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema))(increment)
         assert(rdd2.collect().toSet === Set(("a", 0) -> 3, ("b", 0) -> 1, 
("c", 0) -> 1))
 
         // Make sure the previous RDD still has the same data.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 64b3e75ea976..231396aff222 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -62,7 +62,7 @@ class FakeStateStoreProviderWithMaintenanceError extends 
StateStoreProvider {
       stateStoreId: StateStoreId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConfs: StateStoreConf,
       hadoopConf: Configuration,
@@ -197,6 +197,25 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("running with range scan encoder should fail") {
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        keyStateEncoderSpec = 
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1),
+        useColumnFamilies = false)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+      parameters = Map(
+        "operationType" -> "Range scan",
+        "entity" -> "HDFSBackedStateStoreProvider"
+      ),
+      matchPVals = true
+    )
+  }
+
   test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 
1") {
     tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), 
partition = 0,
       numOfVersToRetainInMemory = 1)) { provider =>
@@ -390,7 +409,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 
     def generateStoreVersions(): Unit = {
       for (i <- 1 to 20) {
-        val store = StateStore.get(storeProviderId1, keySchema, valueSchema, 
numColsPrefixKey = 0,
+        val store = StateStore.get(storeProviderId1, keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema),
           latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf)
         put(store, "a", 0, i)
         store.commit()
@@ -444,7 +464,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
           }
 
           // Reload the store and verify
-          StateStore.get(storeProviderId1, keySchema, valueSchema, 
numColsPrefixKey = 0,
+          StateStore.get(storeProviderId1, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
             latestStoreVersion, useColumnFamilies = false, storeConf, 
hadoopConf)
           assert(StateStore.isLoaded(storeProviderId1))
 
@@ -456,7 +477,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
           }
 
           // Reload the store and verify
-          StateStore.get(storeProviderId1, keySchema, valueSchema, 
numColsPrefixKey = 0,
+          StateStore.get(storeProviderId1, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
             latestStoreVersion, useColumnFamilies = false, storeConf, 
hadoopConf)
           assert(StateStore.isLoaded(storeProviderId1))
 
@@ -464,7 +486,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
           // then this executor should unload inactive instances immediately.
           coordinatorRef
             .reportActiveInstance(storeProviderId1, "other-host", 
"other-exec", Seq.empty)
-          StateStore.get(storeProviderId2, keySchema, valueSchema, 
numColsPrefixKey = 0,
+          StateStore.get(storeProviderId2, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
             0, useColumnFamilies = false, storeConf, hadoopConf)
           assert(!StateStore.isLoaded(storeProviderId1))
           assert(StateStore.isLoaded(storeProviderId2))
@@ -500,7 +523,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 
     def generateStoreVersions(): Unit = {
       for (i <- 1 to 20) {
-        val store = StateStore.get(storeProviderId1, keySchema, valueSchema, 
numColsPrefixKey = 0,
+        val store = StateStore.get(storeProviderId1, keySchema, valueSchema,
+          NoPrefixKeyStateEncoderSpec(keySchema),
           latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf)
         put(store, "a", 0, i)
         store.commit()
@@ -633,7 +657,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     // Getting the store should not create temp file
     val store0 = shouldNotCreateTempFile {
       StateStore.get(
-        storeId, keySchema, valueSchema, numColsPrefixKey = 0,
+        storeId, keySchema, valueSchema,
+        NoPrefixKeyStateEncoderSpec(keySchema),
         version = 0, useColumnFamilies = false, storeConf, hadoopConf)
     }
 
@@ -650,7 +675,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     // Remove should create a temp file
     val store1 = shouldNotCreateTempFile {
       StateStore.get(
-        storeId, keySchema, valueSchema, numColsPrefixKey = 0,
+        storeId, keySchema, valueSchema,
+        NoPrefixKeyStateEncoderSpec(keySchema),
         version = 1, useColumnFamilies = false, storeConf, hadoopConf)
     }
     remove(store1, _._1 == "a")
@@ -665,7 +691,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     // Commit without any updates should create a delta file
     val store2 = shouldNotCreateTempFile {
       StateStore.get(
-        storeId, keySchema, valueSchema, numColsPrefixKey = 0,
+        storeId, keySchema, valueSchema,
+        NoPrefixKeyStateEncoderSpec(keySchema),
         version = 2, useColumnFamilies = false, storeConf, hadoopConf)
     }
     store2.commit()
@@ -913,7 +940,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
   def newStoreProvider(
       opId: Long,
       partition: Int,
-      numColsPrefixKey: Int = 0,
+      keyStateEncoderSpec: KeyStateEncoderSpec = 
NoPrefixKeyStateEncoderSpec(keySchema),
+      keySchema: StructType = keySchema,
       dir: String = newDir(),
       minDeltasForSnapshot: Int = 
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
       numOfVersToRetainInMemory: Int = 
SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get,
@@ -924,15 +952,19 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       StateStoreId(dir, opId, partition),
       keySchema,
       valueSchema,
-      numColsPrefixKey = numColsPrefixKey,
+      keyStateEncoderSpec,
       useColumnFamilies = false,
       new StateStoreConf(sqlConf),
       hadoopConf)
     provider
   }
 
-  override def newStoreProvider(numPrefixCols: Int): 
HDFSBackedStateStoreProvider = {
-    newStoreProvider(opId = Random.nextInt(), partition = 0, numColsPrefixKey 
= numPrefixCols)
+  override def newStoreProvider(
+      keySchema: StructType,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
+      useColumnFamilies: Boolean): HDFSBackedStateStoreProvider = {
+    newStoreProvider(opId = Random.nextInt(), partition = 0,
+      keyStateEncoderSpec = keyStateEncoderSpec)
   }
 
   override def newStoreProvider(useColumnFamilies: Boolean): 
HDFSBackedStateStoreProvider = {
@@ -1041,7 +1073,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   }
 
   testWithAllCodec("prefix scan") { colFamiliesEnabled =>
-    tryWithProviderResource(newStoreProvider(numPrefixCols = 1)) { provider =>
+    tryWithProviderResource(newStoreProvider(keySchema, 
PrefixKeyScanStateEncoderSpec(keySchema, 1),
+      colFamiliesEnabled)) { provider =>
       // Verify state before starting a new set of updates
       assert(getLatestData(provider, useColumnFamilies = false).isEmpty)
 
@@ -1351,7 +1384,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
           // Verify that trying to get incorrect versions throw errors
           var e = intercept[SparkException] {
             StateStore.get(
-              storeId, keySchema, valueSchema, 0, -1, useColumnFamilies = 
false,
+              storeId, keySchema, valueSchema,
+                NoPrefixKeyStateEncoderSpec(keySchema), -1, useColumnFamilies 
= false,
                 storeConf, hadoopConf)
           }
           checkError(
@@ -1364,7 +1398,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
 
           e = intercept[SparkException] {
             StateStore.get(
-              storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false,
+              storeId, keySchema, valueSchema,
+              NoPrefixKeyStateEncoderSpec(keySchema),
+              1, useColumnFamilies = false,
               storeConf, hadoopConf)
           }
           checkError(
@@ -1379,14 +1415,18 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
 
           // Increase version of the store and try to get again
           val store0 = StateStore.get(
-            storeId, keySchema, valueSchema, 0, 0, useColumnFamilies = false,
+            storeId, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
+            0, useColumnFamilies = false,
             storeConf, hadoopConf)
           assert(store0.version === 0)
           put(store0, "a", 0, 1)
           store0.commit()
 
           val store1 = StateStore.get(
-            storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false,
+            storeId, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
+            1, useColumnFamilies = false,
             storeConf, hadoopConf)
           assert(StateStore.isLoaded(storeId))
           assert(store1.version === 1)
@@ -1394,7 +1434,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
 
           // Verify that you can also load older version
           val store0reloaded = StateStore.get(
-            storeId, keySchema, valueSchema, 0, 0, useColumnFamilies = false,
+            storeId, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
+            0, useColumnFamilies = false,
             storeConf, hadoopConf)
           assert(store0reloaded.version === 0)
           assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty)
@@ -1404,7 +1446,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
           assert(!StateStore.isLoaded(storeId))
 
           val store1reloaded = StateStore.get(
-            storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false,
+            storeId, keySchema, valueSchema,
+            NoPrefixKeyStateEncoderSpec(keySchema),
+            1, useColumnFamilies = false,
             storeConf, hadoopConf)
           assert(StateStore.isLoaded(storeId))
           assert(store1reloaded.version === 1)
@@ -1500,8 +1544,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
           val storeConf = StateStoreConf(sqlConf)
 
           // get the state store and kick off the maintenance task
-          StateStore.get(storeId, null, null, 0, 0,
-            useColumnFamilies = false, storeConf, sc.hadoopConfiguration)
+          StateStore.get(storeId, null, null,
+            NoPrefixKeyStateEncoderSpec(keySchema), 0, useColumnFamilies = 
false,
+            storeConf, sc.hadoopConfiguration)
 
           eventually(timeout(30.seconds)) {
             assert(!StateStore.isMaintenanceRunning)
@@ -1559,7 +1604,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   def newStoreProvider(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: 
Int): ProviderClass
 
   /** Return a new provider with setting prefix key */
-  def newStoreProvider(numPrefixCols: Int): ProviderClass
+  def newStoreProvider(
+      keySchema: StructType,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
+      useColumnFamilies: Boolean): ProviderClass
 
   /** Return a new provider with useColumnFamilies set to true */
   def newStoreProvider(useColumnFamilies: Boolean): ProviderClass
@@ -1646,18 +1694,31 @@ object StateStoreTestsHelper {
     Seq(StructField("key1", StringType, true), StructField("key2", 
IntegerType, true)))
   val valueSchema = StructType(Seq(StructField("value", IntegerType, true)))
 
+  val keySchemaWithRangeScan: StructType = StructType(
+    Seq(StructField("key1", LongType, false), StructField("key2", StringType, 
false)))
+
   val keyProj = UnsafeProjection.create(Array[DataType](StringType, 
IntegerType))
+  val rangeScanProj = UnsafeProjection.create(Array[DataType](LongType, 
StringType))
   val prefixKeyProj = UnsafeProjection.create(Array[DataType](StringType))
+  val prefixKeyProjWithRangeScan = 
UnsafeProjection.create(Array[DataType](LongType))
   val valueProj = UnsafeProjection.create(Array[DataType](IntegerType))
 
   def dataToPrefixKeyRow(s: String): UnsafeRow = {
     prefixKeyProj.apply(new 
GenericInternalRow(Array[Any](UTF8String.fromString(s)))).copy()
   }
 
+  def dataToPrefixKeyRowWithRangeScan(ts: Long): UnsafeRow = {
+    prefixKeyProjWithRangeScan.apply(new 
GenericInternalRow(Array[Any](ts))).copy()
+  }
+
   def dataToKeyRow(s: String, i: Int): UnsafeRow = {
     keyProj.apply(new GenericInternalRow(Array[Any](UTF8String.fromString(s), 
i))).copy()
   }
 
+  def dataToKeyRowWithRangeScan(ts: Long, s: String): UnsafeRow = {
+    rangeScanProj.apply(new GenericInternalRow(Array[Any](ts, 
UTF8String.fromString(s)))).copy()
+  }
+
   def dataToValueRow(i: Int): UnsafeRow = {
     valueProj.apply(new GenericInternalRow(Array[Any](i))).copy()
   }
@@ -1666,6 +1727,10 @@ object StateStoreTestsHelper {
     (row.getUTF8String(0).toString, row.getInt(1))
   }
 
+  def keyRowWithRangeScanToData(row: UnsafeRow): (Long, String) = {
+    (row.getLong(0), row.getUTF8String(1).toString)
+  }
+
   def valueRowToData(row: UnsafeRow): Int = {
     row.getInt(0)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala
index f758e44e6680..1607d7e699d5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala
@@ -184,8 +184,8 @@ class StreamingSessionWindowStateManagerSuite extends 
StreamTest with BeforeAndA
       val storeProviderId = StateStoreProviderId(stateInfo, 0, 
StateStoreId.DEFAULT_STORE_NAME)
       val store = StateStore.get(
         storeProviderId, manager.getStateKeySchema, 
manager.getStateValueSchema,
-        manager.getNumColsForPrefixKey, stateInfo.storeVersion,
-        useColumnFamilies = false, storeConf, new Configuration)
+        PrefixKeyScanStateEncoderSpec(manager.getStateKeySchema, 
manager.getNumColsForPrefixKey),
+        stateInfo.storeVersion, useColumnFamilies = false, storeConf, new 
Configuration)
 
       try {
         f(manager, store)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index e86ac03b70d9..8668b58672c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -187,8 +187,8 @@ class ValueStateSuite extends StateVariableSuiteBase {
     val storeConf = new StateStoreConf(new SQLConf())
     val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] {
       provider.init(
-        storeId, keySchema, valueSchema, 0, useColumnFamilies = true,
-        storeConf, new Configuration)
+        storeId, keySchema, valueSchema, 
NoPrefixKeyStateEncoderSpec(keySchema),
+        useColumnFamilies = true, storeConf, new Configuration)
     }
     checkError(
       ex,
@@ -334,19 +334,19 @@ abstract class StateVariableSuiteBase extends 
SharedSparkSession
   protected def newStoreProviderWithStateVariable(
       useColumnFamilies: Boolean): RocksDBStateStoreProvider = {
     newStoreProviderWithStateVariable(StateStoreId(newDir(), Random.nextInt(), 
0),
-      numColsPrefixKey = 0,
+      NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
       useColumnFamilies = useColumnFamilies)
   }
 
   protected def newStoreProviderWithStateVariable(
       storeId: StateStoreId,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       sqlConf: SQLConf = SQLConf.get,
       conf: Configuration = new Configuration,
       useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
     provider.init(
-      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec,
       useColumnFamilies,
       new StateStoreConf(sqlConf), conf, useMultipleValuesPerKey)
     provider
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index fa08a44dc9e5..32822994c81c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{LocalLimitExec, 
SimpleMode, SparkPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, 
MemorySink}
-import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreConf, StateStoreId, StateStoreProvider}
+import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, 
StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -1415,7 +1415,7 @@ class TestStateStoreProvider extends StateStoreProvider {
       stateStoreId: StateStoreId,
       keySchema: StructType,
       valueSchema: StructType,
-      numColsPrefixKey: Int,
+      keyStateEncoderSpec: KeyStateEncoderSpec,
       useColumnFamilies: Boolean,
       storeConfs: StateStoreConf,
       hadoopConf: Configuration,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to