This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff38378d7e42 [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider ff38378d7e42 is described below commit ff38378d7e425c3e810e6556a3916f4594f2aec0 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Tue Mar 26 13:48:40 2024 +0900 [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider ### What changes were proposed in this pull request? Add support for range scan based key state encoder for use with state store provider ### Why are the changes needed? Changes are needed to allow range scan of fixed size initial cols especially with RocksDB state store provider. Earlier we had tried to use the existing key state encoder to encapsulate state for ordering columns using BIG_ENDIAN encoding. However, that model does not work with variable sized non-ordering columns part of the grouping key, since in that case the variable length portion gets encoded upfront in the `UnsafeRow` thereby breaking the range scan/sorting functionality. In ord [...] - create the state store instance or any column family with a key schema and the `RangeScanKeyStateEncoder` encoder type and the num of ordering cols being specified - with the new encoder, user can perform range scan in sorted order using ordering cols - with the new encoder, user can perform prefix scan using ordering cols - ordering cols have to be > 0 and <= num_key_schema_cols which is a more lenient requirement than `PrefixScanKeyStateEncoder` - only fixed size (primitive type) ordering cols can be used with this encoder Internally, we will convert the passed `UnsafeRow` into a new one that has the BIG_ENDIAN encoded byte arrays that are eventually written out to RocksDB. Note that the existing - `NoPrefixKeyStateEncoder` and `PrefixScanKeyStateEncoder` will also continue to be supported. The user can decide which encoder best fits the need of their store/col family schema/use. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` ... [info] - rocksdb range scan validation - variable sized columns - with colFamiliesEnabled=true (without changelog checkpointing) (1 millisecond) [info] - rocksdb range scan validation - variable sized columns - with colFamiliesEnabled=false (with changelog checkpointing) (1 millisecond) [info] - rocksdb range scan validation - variable sized columns - with colFamiliesEnabled=false (without changelog checkpointing) (1 millisecond) [info] - rocksdb range scan - fixed size non-ordering columns - with colFamiliesEnabled=true (with changelog checkpointing) (782 milliseconds) [info] - rocksdb range scan - fixed size non-ordering columns - with colFamiliesEnabled=true (without changelog checkpointing) (125 milliseconds) [info] - rocksdb range scan - fixed size non-ordering columns - with colFamiliesEnabled=false (with changelog checkpointing) (177 milliseconds) [info] - rocksdb range scan - fixed size non-ordering columns - with colFamiliesEnabled=false (without changelog checkpointing) (89 milliseconds) [info] - rocksdb range scan - variable size non-ordering columns - with colFamiliesEnabled=true (with changelog checkpointing) (192 milliseconds) [info] - rocksdb range scan - variable size non-ordering columns - with colFamiliesEnabled=true (without changelog checkpointing) (107 milliseconds) [info] - rocksdb range scan - variable size non-ordering columns - with colFamiliesEnabled=false (with changelog checkpointing) (185 milliseconds) [info] - rocksdb range scan - variable size non-ordering columns - with colFamiliesEnabled=false (without changelog checkpointing) (96 milliseconds) [info] - rocksdb range scan - ordering cols and key schema cols are same - with colFamiliesEnabled=true (with changelog checkpointing) (195 milliseconds) [info] - rocksdb range scan - ordering cols and key schema cols are same - with colFamiliesEnabled=true (without changelog checkpointing) (106 milliseconds) [info] - rocksdb range scan - ordering cols and key schema cols are same - with colFamiliesEnabled=false (with changelog checkpointing) (161 milliseconds) [info] - rocksdb range scan - ordering cols and key schema cols are same - with colFamiliesEnabled=false (without changelog checkpointing) (88 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (with changelog checkpointing) (169 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (without changelog checkpointing) (105 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (with changelog checkpointing) (185 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (without changelog checkpointing) (94 milliseconds) 16:24:18.401 WARN org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 5 seconds, 247 milliseconds. [info] Total number of tests run: 24 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 24, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 22 s, completed Mar 15, 2024, 4:24:18 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45503 from anishshri-db/task/SPARK-47372. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 24 + docs/sql-error-conditions.md | 24 + .../v2/state/StatePartitionReader.scala | 16 +- .../streaming/FlatMapGroupsWithStateExec.scala | 4 +- .../sql/execution/streaming/ListStateImpl.scala | 6 +- .../sql/execution/streaming/MapStateImpl.scala | 6 +- .../sql/execution/streaming/TimerStateImpl.scala | 14 +- .../streaming/TransformWithStateExec.scala | 4 +- .../sql/execution/streaming/ValueStateImpl.scala | 6 +- .../state/HDFSBackedStateStoreProvider.scala | 47 +- .../streaming/state/RocksDBStateEncoder.scala | 281 ++++++++++- .../state/RocksDBStateStoreProvider.scala | 14 +- .../sql/execution/streaming/state/StateStore.scala | 40 +- .../streaming/state/StateStoreErrors.scala | 42 +- .../execution/streaming/state/StateStoreRDD.scala | 11 +- .../state/SymmetricHashJoinStateManager.scala | 4 +- .../sql/execution/streaming/state/package.scala | 15 +- .../execution/streaming/statefulOperators.scala | 12 +- .../sql/execution/streaming/streamingLimits.scala | 4 +- .../StateStoreBasicOperationsBenchmark.scala | 10 +- ...ngSortWithSessionWindowStateIteratorSuite.scala | 6 +- .../streaming/state/MemoryStateStore.scala | 2 +- .../streaming/state/RocksDBStateStoreSuite.scala | 541 ++++++++++++++++++++- .../streaming/state/StateStoreRDDSuite.scala | 27 +- .../streaming/state/StateStoreSuite.scala | 111 ++++- .../StreamingSessionWindowStateManagerSuite.scala | 4 +- .../streaming/state/ValueStateSuite.scala | 10 +- .../apache/spark/sql/streaming/StreamSuite.scala | 4 +- 28 files changed, 1127 insertions(+), 162 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 99aeca33dfb4..717d5e6631ec 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3561,6 +3561,24 @@ ], "sqlState" : "42802" }, + "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN" : { + "message" : [ + "Incorrect number of ordering columns=<numOrderingCols> for range scan encoder. Ordering columns cannot be zero or greater than num of schema columns." + ], + "sqlState" : "42802" + }, + "STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN" : { + "message" : [ + "Incorrect number of prefix columns=<numPrefixCols> for prefix scan encoder. Prefix columns cannot be zero or greater than or equal to num of schema columns." + ], + "sqlState" : "42802" + }, + "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED" : { + "message" : [ + "Null type ordering column with name=<fieldName> at index=<index> is not supported for range scan encoder." + ], + "sqlState" : "42802" + }, "STATE_STORE_UNSUPPORTED_OPERATION" : { "message" : [ "<operationType> operation not supported with <entity>" @@ -3573,6 +3591,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED" : { + "message" : [ + "Variable size ordering column with name=<fieldName> at index=<index> is not supported for range scan encoder." + ], + "sqlState" : "42802" + }, "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : { "message" : [ "Static partition column <staticName> is also specified in the column list." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 359e120c9cd0..b05a8d1ff61e 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2193,6 +2193,24 @@ Failed to perform column family operation=`<operationName>` with invalid name=`< The handle has not been initialized for this StatefulProcessor. Please only use the StatefulProcessor within the transformWithState operator. +### STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Incorrect number of ordering columns=`<numOrderingCols>` for range scan encoder. Ordering columns cannot be zero or greater than num of schema columns. + +### STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Incorrect number of prefix columns=`<numPrefixCols>` for prefix scan encoder. Prefix columns cannot be zero or greater than or equal to num of schema columns. + +### STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Null type ordering column with name=`<fieldName>` at index=`<index>` is not supported for range scan encoder. + ### STATE_STORE_UNSUPPORTED_OPERATION [SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) @@ -2205,6 +2223,12 @@ Please only use the StatefulProcessor within the transformWithState operator. State store operation=`<operationType>` not supported on missing column family=`<colFamilyName>`. +### STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Variable size ordering column with name=`<fieldName>` at index=`<index>` is not supported for range scan encoder. + ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST [SQLSTATE: 42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala index e4e28d5f8ac3..bbfe3a3f373e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil -import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, ReadStateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -77,9 +77,19 @@ class StatePartitionReader( stateStoreMetadata.head.numColsPrefixKey } + // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support for this will be + // added in the future along with state metadata changes. + // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524 + val keyStateEncoderType = if (numColsPrefixKey > 0) { + PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) + } else { + NoPrefixKeyStateEncoderSpec(keySchema) + } + StateStoreProvider.createAndInit( - stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey, - useColumnFamilies = false, storeConf, hadoopConf.value, useMultipleValuesPerKey = false) + stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType, + useColumnFamilies = false, storeConf, hadoopConf.value, + useMultipleValuesPerKey = false) } private lazy val store: ReadStateStore = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 26a2ee71e7b6..01b16b63fa27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -226,7 +226,7 @@ trait FlatMapGroupsWithStateExecBase storeProviderId, groupingAttributes.toStructType, stateManager.stateSchema, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType), stateInfo.get.storeVersion, useColumnFamilies = false, storeConf, hadoopConfBroadcast.value.value) @@ -238,7 +238,7 @@ trait FlatMapGroupsWithStateExecBase getStateInfo, groupingAttributes.toStructType, stateManager.stateSchema, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType), session.sqlContext.sessionState, Some(session.sqlContext.streams.stateStoreCoordinator) ) { case (store: StateStore, singleIterator: Iterator[InternalRow]) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala index d0be62293d05..662bef5716ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala @@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.ListState /** @@ -44,8 +44,8 @@ class ListStateImpl[S]( private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, stateName) - store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, numColsPrefixKey = 0, - VALUE_ROW_SCHEMA, useMultipleValuesPerKey = true) + store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), useMultipleValuesPerKey = true) /** Whether state exists or not. */ override def exists(): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala index 91f6be4ddfd1..d2ccd0a77807 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair} import org.apache.spark.sql.streaming.MapState import org.apache.spark.sql.types.{BinaryType, StructType} @@ -40,8 +40,8 @@ class MapStateImpl[K, V]( private val stateTypesEncoder = new CompositeKeyStateEncoder( keySerializer, userKeyEnc, valEncoder, schemaForCompositeKeyRow, stateName) - store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1, - schemaForValueRow) + store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, schemaForValueRow, + PrefixKeyScanStateEncoderSpec(schemaForCompositeKeyRow, 1)) /** Whether state exists or not. */ override def exists(): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala index d8b5cb7ef073..6166374d25e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala @@ -85,16 +85,14 @@ class TimerStateImpl( } val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF - store.createColFamilyIfAbsent(keyToTsCFName, - schemaForKeyRow, numColsPrefixKey = 1, - schemaForValueRow, useMultipleValuesPerKey = false, - isInternal = true) + store.createColFamilyIfAbsent(keyToTsCFName, schemaForKeyRow, + schemaForValueRow, PrefixKeyScanStateEncoderSpec(schemaForKeyRow, 1), + useMultipleValuesPerKey = false, isInternal = true) val tsToKeyCFName = timerCFName + TimerStateUtils.TIMESTAMP_TO_KEY_CF - store.createColFamilyIfAbsent(tsToKeyCFName, - keySchemaForSecIndex, numColsPrefixKey = 0, - schemaForValueRow, useMultipleValuesPerKey = false, - isInternal = true) + store.createColFamilyIfAbsent(tsToKeyCFName, keySchemaForSecIndex, + schemaForValueRow, NoPrefixKeyStateEncoderSpec(keySchemaForSecIndex), + useMultipleValuesPerKey = false, isInternal = true) private def getGroupingKey(cfName: String): Any = { val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 500da5492f88..39365e92185a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -276,7 +276,7 @@ case class TransformWithStateExec( getStateInfo, schemaForKeyRow, schemaForValueRow, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(schemaForKeyRow), session.sqlContext.sessionState, Some(session.sqlContext.streams.stateStoreCoordinator), useColumnFamilies = true, @@ -308,7 +308,7 @@ case class TransformWithStateExec( providerId, schemaForKeyRow, schemaForValueRow, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(schemaForKeyRow), useColumnFamilies = true, storeConf = storeConf, hadoopConf = broadcastedHadoopConf.value, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index 5d2b9881c78d..08876ca3032e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.StateStore +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} import org.apache.spark.sql.streaming.ValueState /** @@ -43,8 +43,8 @@ class ValueStateImpl[S]( private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, stateName) - store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, numColsPrefixKey = 0, - VALUE_ROW_SCHEMA) + store.createColFamilyIfAbsent(stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA)) /** Function to check if state exists. Returns true if present and false otherwise */ override def exists(): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 42d10f6c1bd5..5fbc69ac6093 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -123,8 +123,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with override def createColFamilyIfAbsent( colFamilyName: String, keySchema: StructType, - numColsPrefixKey: Int, valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, useMultipleValuesPerKey: Boolean = false, isInternal: Boolean = false): Unit = { throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) @@ -280,11 +280,39 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } + // Run bunch of validations specific to HDFSBackedStateStoreProvider + private def runValidation( + useColumnFamilies: Boolean, + useMultipleValuesPerKey: Boolean, + keyStateEncoderSpec: KeyStateEncoderSpec): Unit = { + // TODO: add support for multiple col families with HDFSBackedStateStoreProvider + if (useColumnFamilies) { + throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) + } + + if (useMultipleValuesPerKey) { + throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName) + } + + if (keyStateEncoderSpec.isInstanceOf[RangeKeyScanStateEncoderSpec]) { + throw StateStoreErrors.unsupportedOperationException("Range scan", providerName) + } + } + + private def getNumColsPrefixKey(keyStateEncoderSpec: KeyStateEncoderSpec): Int = { + keyStateEncoderSpec match { + case NoPrefixKeyStateEncoderSpec(_) => 0 + case PrefixKeyScanStateEncoderSpec(_, numColsPrefixKey) => numColsPrefixKey + case _ => throw StateStoreErrors.unsupportedOperationException("Invalid key state encoder", + providerName) + } + } + override def init( stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConf: StateStoreConf, hadoopConf: Configuration, @@ -296,19 +324,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with this.hadoopConf = hadoopConf this.numberOfVersionsToRetainInMemory = storeConf.maxVersionsToRetainInMemory - // TODO: add support for multiple col families with HDFSBackedStateStoreProvider - if (useColumnFamilies) { - throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) - } - - if (useMultipleValuesPerKey) { - throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName) - } + // run a bunch of validation checks for this state store provider + runValidation(useColumnFamilies, useMultipleValuesPerKey, keyStateEncoderSpec) - require((keySchema.length == 0 && numColsPrefixKey == 0) || - (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " + - "greater than the number of columns for prefix key!") - this.numColsPrefixKey = numColsPrefixKey + this.numColsPrefixKey = getNumColsPrefixKey(keyStateEncoderSpec) fm.mkdirs(baseDir) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index 8f58bccd948b..f342853514d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -17,16 +17,18 @@ package org.apache.spark.sql.execution.streaming.state +import java.nio.{ByteBuffer, ByteOrder} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform sealed trait RocksDBKeyStateEncoder { def supportPrefixKeyScan: Boolean def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] - def extractPrefixKey(key: UnsafeRow): UnsafeRow def encodeKey(row: UnsafeRow): Array[Byte] def decodeKey(keyBytes: Array[Byte]): UnsafeRow } @@ -39,13 +41,21 @@ sealed trait RocksDBValueStateEncoder { } object RocksDBStateEncoder { - def getKeyEncoder( - keySchema: StructType, - numColsPrefixKey: Int): RocksDBKeyStateEncoder = { - if (numColsPrefixKey > 0) { - new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey) - } else { - new NoPrefixKeyStateEncoder(keySchema) + def getKeyEncoder(keyStateEncoderSpec: KeyStateEncoderSpec): RocksDBKeyStateEncoder = { + // Return the key state encoder based on the requested type + keyStateEncoderSpec match { + case NoPrefixKeyStateEncoderSpec(keySchema) => + new NoPrefixKeyStateEncoder(keySchema) + + case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) => + new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey) + + case RangeKeyScanStateEncoderSpec(keySchema, numOrderingCols) => + new RangeKeyScanStateEncoder(keySchema, numOrderingCols) + + case _ => + throw new IllegalArgumentException(s"Unsupported key state encoder spec: " + + s"$keyStateEncoderSpec") } } @@ -110,9 +120,6 @@ class PrefixKeyScanStateEncoder( import RocksDBStateEncoder._ - require(keySchema.length > numColsPrefixKey, "The number of columns in the key must be " + - "greater than the number of columns for prefix key!") - private val prefixKeyFieldsWithIdx: Seq[(StructField, Int)] = { keySchema.zipWithIndex.take(numColsPrefixKey) } @@ -176,7 +183,7 @@ class PrefixKeyScanStateEncoder( restoreKeyProjection(joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded)) } - override def extractPrefixKey(key: UnsafeRow): UnsafeRow = { + private def extractPrefixKey(key: UnsafeRow): UnsafeRow = { prefixKeyProjection(key) } @@ -192,6 +199,250 @@ class PrefixKeyScanStateEncoder( override def supportPrefixKeyScan: Boolean = true } +/** + * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields + * + * To encode a row for range scan, we first project the first numOrderingCols needed + * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN + * to allow for scanning keys in sorted order using the byte-wise comparison method that + * RocksDB uses. + * Then, for the rest of the fields, we project those into another UnsafeRow. + * We then effectively join these two UnsafeRows together, and finally take those bytes + * to get the resulting row. + * We cannot support variable sized fields given the UnsafeRow format which stores variable + * sized fields as offset and length pointers to the actual values, thereby changing the required + * ordering. + * Note that we also support "null" values being passed for these fixed size fields. We prepend + * a single byte to indicate whether the column value is null or not. We cannot change the + * nullability on the UnsafeRow itself as the expected ordering would change if non-first + * columns are marked as null. If the first col is null, those entries will appear last in + * the iterator. If non-first columns are null, ordering based on the previous columns will + * still be honored. For rows with null column values, ordering for subsequent columns + * will also be maintained within those set of rows. + * + * @param keySchema - schema of the key to be encoded + * @param numOrderingCols - number of columns to be used for range scan + */ +class RangeKeyScanStateEncoder( + keySchema: StructType, + numOrderingCols: Int) extends RocksDBKeyStateEncoder { + + import RocksDBStateEncoder._ + + private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = { + keySchema.zipWithIndex.take(numOrderingCols) + } + + private def isFixedSize(dataType: DataType): Boolean = dataType match { + case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType | + _: FloatType | _: DoubleType => true + case _ => false + } + + // verify that only fixed sized columns are used for ordering + rangeScanKeyFieldsWithIdx.foreach { case (field, idx) => + if (!isFixedSize(field.dataType)) { + // NullType is technically fixed size, but not supported for ordering + if (field.dataType == NullType) { + throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, idx.toString) + } else { + throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString) + } + } + } + + private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = { + keySchema.zipWithIndex.drop(numOrderingCols) + } + + private val rangeScanKeyProjection: UnsafeProjection = { + val refs = rangeScanKeyFieldsWithIdx.map(x => + BoundReference(x._2, x._1.dataType, x._1.nullable)) + UnsafeProjection.create(refs) + } + + private val remainingKeyProjection: UnsafeProjection = { + val refs = remainingKeyFieldsWithIdx.map(x => + BoundReference(x._2, x._1.dataType, x._1.nullable)) + UnsafeProjection.create(refs) + } + + private val restoreKeyProjection: UnsafeProjection = UnsafeProjection.create(keySchema) + + // Reusable objects + private val joinedRowOnKey = new JoinedRow() + + private def extractPrefixKey(key: UnsafeRow): UnsafeRow = { + rangeScanKeyProjection(key) + } + + // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding + // using byte arrays. + // To handle "null" values, we prepend a byte to the byte array indicating whether the value + // is null or not. If the value is null, we write the null byte followed by a zero byte. + // If the value is not null, we write the null byte followed by the value. + // Note that setting null for the index on the unsafeRow is not feasible as it would change + // the sorting order on iteration. + private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = { + val writer = new UnsafeRowWriter(numOrderingCols) + writer.resetRowWriter() + rangeScanKeyFieldsWithIdx.foreach { case (field, idx) => + val value = row.get(idx, field.dataType) + val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte + // Note that we cannot allocate a smaller buffer here even if the value is null + // because the effective byte array is considered variable size and needs to have + // the same size across all rows for the ordering to work as expected. + val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1) + bbuf.order(ByteOrder.BIG_ENDIAN) + bbuf.put(isNullCol) + if (isNullCol == 0x01.toByte) { + writer.write(idx, bbuf.array()) + } else { + field.dataType match { + case BooleanType => + case ByteType => + bbuf.put(value.asInstanceOf[Byte]) + writer.write(idx, bbuf.array()) + + // for other multi-byte types, we need to convert to big-endian + case ShortType => + bbuf.putShort(value.asInstanceOf[Short]) + writer.write(idx, bbuf.array()) + + case IntegerType => + bbuf.putInt(value.asInstanceOf[Int]) + writer.write(idx, bbuf.array()) + + case LongType => + bbuf.putLong(value.asInstanceOf[Long]) + writer.write(idx, bbuf.array()) + + case FloatType => + bbuf.putFloat(value.asInstanceOf[Float]) + writer.write(idx, bbuf.array()) + + case DoubleType => + bbuf.putDouble(value.asInstanceOf[Double]) + writer.write(idx, bbuf.array()) + } + } + } + writer.getRow() + } + + // Rewrite the unsafe row by converting back from BIG_ENDIAN byte arrays to the + // original data types. + // For decode, we extract the byte array from the UnsafeRow, and then read the first byte + // to determine if the value is null or not. If the value is null, we set the ordinal on + // the UnsafeRow to null. If the value is not null, we read the rest of the bytes to get the + // actual value. + private def decodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = { + val writer = new UnsafeRowWriter(numOrderingCols) + writer.resetRowWriter() + rangeScanKeyFieldsWithIdx.foreach { case (field, idx) => + val value = row.getBinary(idx) + val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]]) + bbuf.order(ByteOrder.BIG_ENDIAN) + val isNullCol = bbuf.get() + if (isNullCol == 0x01.toByte) { + // set the column to null and skip reading the next byte + writer.setNullAt(idx) + } else { + field.dataType match { + case BooleanType => + case ByteType => + writer.write(idx, bbuf.get) + + case ShortType => + writer.write(idx, bbuf.getShort) + + case IntegerType => + writer.write(idx, bbuf.getInt) + + case LongType => + writer.write(idx, bbuf.getLong) + + case FloatType => + writer.write(idx, bbuf.getFloat) + + case DoubleType => + writer.write(idx, bbuf.getDouble) + } + } + } + writer.getRow() + } + + override def encodeKey(row: UnsafeRow): Array[Byte] = { + val prefixKey = extractPrefixKey(row) + val rangeScanKeyEncoded = encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey)) + + val result = if (numOrderingCols < keySchema.length) { + val remainingEncoded = encodeUnsafeRow(remainingKeyProjection(row)) + val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + remainingEncoded.length + 4) + Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, rangeScanKeyEncoded.length) + Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET, + encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4, rangeScanKeyEncoded.length) + // NOTE: We don't put the length of remainingEncoded as we can calculate later + // on deserialization. + Platform.copyMemory(remainingEncoded, Platform.BYTE_ARRAY_OFFSET, + encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4 + rangeScanKeyEncoded.length, + remainingEncoded.length) + encodedBytes + } else { + // if the num of ordering cols is same as num of key schema cols, we don't need to + // encode the remaining key as it's empty. + val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + 4) + Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, rangeScanKeyEncoded.length) + Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET, + encodedBytes, Platform.BYTE_ARRAY_OFFSET + 4, rangeScanKeyEncoded.length) + encodedBytes + } + result + } + + override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = { + val prefixKeyEncodedLen = Platform.getInt(keyBytes, Platform.BYTE_ARRAY_OFFSET) + val prefixKeyEncoded = new Array[Byte](prefixKeyEncodedLen) + Platform.copyMemory(keyBytes, Platform.BYTE_ARRAY_OFFSET + 4, prefixKeyEncoded, + Platform.BYTE_ARRAY_OFFSET, prefixKeyEncodedLen) + + val prefixKeyDecodedForRangeScan = decodeToUnsafeRow(prefixKeyEncoded, + numFields = numOrderingCols) + val prefixKeyDecoded = decodePrefixKeyForRangeScan(prefixKeyDecodedForRangeScan) + + if (numOrderingCols < keySchema.length) { + // Here we calculate the remainingKeyEncodedLen leveraging the length of keyBytes + val remainingKeyEncodedLen = keyBytes.length - 4 - prefixKeyEncodedLen + + val remainingKeyEncoded = new Array[Byte](remainingKeyEncodedLen) + Platform.copyMemory(keyBytes, Platform.BYTE_ARRAY_OFFSET + 4 + + prefixKeyEncodedLen, remainingKeyEncoded, Platform.BYTE_ARRAY_OFFSET, + remainingKeyEncodedLen) + + val remainingKeyDecoded = decodeToUnsafeRow(remainingKeyEncoded, + numFields = keySchema.length - numOrderingCols) + + restoreKeyProjection(joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded)) + } else { + // if the number of ordering cols is same as the number of key schema cols, we only + // return the prefix key decoded unsafe row. + prefixKeyDecoded + } + } + + override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { + val rangeScanKeyEncoded = encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey)) + val prefix = new Array[Byte](rangeScanKeyEncoded.length + 4) + Platform.putInt(prefix, Platform.BYTE_ARRAY_OFFSET, rangeScanKeyEncoded.length) + Platform.copyMemory(rangeScanKeyEncoded, Platform.BYTE_ARRAY_OFFSET, prefix, + Platform.BYTE_ARRAY_OFFSET + 4, rangeScanKeyEncoded.length) + prefix + } + + override def supportPrefixKeyScan: Boolean = true +} + /** * RocksDB Key Encoder for UnsafeRow that does not support prefix key scan. * @@ -225,10 +476,6 @@ class NoPrefixKeyStateEncoder(keySchema: StructType) override def supportPrefixKeyScan: Boolean = false - override def extractPrefixKey(key: UnsafeRow): UnsafeRow = { - throw new IllegalStateException("This encoder doesn't support prefix key!") - } - override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { throw new IllegalStateException("This encoder doesn't support prefix key!") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 89471f6af535..c8537f2a6a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -51,8 +51,8 @@ private[sql] class RocksDBStateStoreProvider override def createColFamilyIfAbsent( colFamilyName: String, keySchema: StructType, - numColsPrefixKey: Int, valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, useMultipleValuesPerKey: Boolean = false, isInternal: Boolean = false): Unit = { verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME, @@ -60,7 +60,7 @@ private[sql] class RocksDBStateStoreProvider verify(useColumnFamilies, "Column families are not supported in this store") rocksDB.createColFamilyIfAbsent(colFamilyName, isInternal) keyValueEncoderMap.putIfAbsent(colFamilyName, - (RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey), + (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec), RocksDBStateEncoder.getValueEncoder(valueSchema, useMultipleValuesPerKey))) } @@ -263,7 +263,7 @@ private[sql] class RocksDBStateStoreProvider stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConf: StateStoreConf, hadoopConf: Configuration, @@ -275,19 +275,13 @@ private[sql] class RocksDBStateStoreProvider this.hadoopConf = hadoopConf this.useColumnFamilies = useColumnFamilies - require((keySchema.length == 0 && numColsPrefixKey == 0) || - (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " + - "greater than the number of columns for prefix key!") - if (useMultipleValuesPerKey) { - require(numColsPrefixKey == 0, "Both multiple values per key, and prefix key are not " + - "supported simultaneously.") require(useColumnFamilies, "Multiple values per key support requires column families to be" + " enabled in RocksDBStateStore.") } keyValueEncoderMap.putIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME, - (RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey), + (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec), RocksDBStateEncoder.getValueEncoder(valueSchema, useMultipleValuesPerKey))) rocksDB // lazy initialization diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9247c9fe41b6..dd97aa5b9afc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -128,8 +128,8 @@ trait StateStore extends ReadStateStore { def createColFamilyIfAbsent( colFamilyName: String, keySchema: StructType, - numColsPrefixKey: Int, valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, useMultipleValuesPerKey: Boolean = false, isInternal: Boolean = false): Unit @@ -289,6 +289,26 @@ class InvalidUnsafeRowException(error: String) "among restart. For the first case, you can try to restart the application without " + s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null) +sealed trait KeyStateEncoderSpec + +case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec + +case class PrefixKeyScanStateEncoderSpec( + keySchema: StructType, + numColsPrefixKey: Int) extends KeyStateEncoderSpec { + if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) { + throw StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString) + } +} + +case class RangeKeyScanStateEncoderSpec( + keySchema: StructType, + numColsPrefixKey: Int) extends KeyStateEncoderSpec { + if (numColsPrefixKey == 0 || numColsPrefixKey > keySchema.length) { + throw StateStoreErrors.incorrectNumOrderingColsForRangeScan(numColsPrefixKey.toString) + } +} + /** * Trait representing a provider that provide [[StateStore]] instances representing * versions of state data. @@ -330,7 +350,7 @@ trait StateStoreProvider { stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConfs: StateStoreConf, hadoopConf: Configuration, @@ -385,13 +405,13 @@ object StateStoreProvider { providerId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConf: StateStoreConf, hadoopConf: Configuration, useMultipleValuesPerKey: Boolean): StateStoreProvider = { val provider = create(storeConf.providerClass) - provider.init(providerId.storeId, keySchema, valueSchema, numColsPrefixKey, + provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) provider } @@ -581,7 +601,7 @@ object StateStore extends Logging { storeProviderId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, version: Long, useColumnFamilies: Boolean, storeConf: StateStoreConf, @@ -591,7 +611,7 @@ object StateStore extends Logging { throw QueryExecutionErrors.unexpectedStateStoreVersion(version) } val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, - numColsPrefixKey, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) + keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) storeProvider.getReadStore(version) } @@ -600,7 +620,7 @@ object StateStore extends Logging { storeProviderId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, version: Long, useColumnFamilies: Boolean, storeConf: StateStoreConf, @@ -610,7 +630,7 @@ object StateStore extends Logging { throw QueryExecutionErrors.unexpectedStateStoreVersion(version) } val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, - numColsPrefixKey, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) + keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) storeProvider.getStore(version) } @@ -618,7 +638,7 @@ object StateStore extends Logging { storeProviderId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConf: StateStoreConf, hadoopConf: Configuration, @@ -655,7 +675,7 @@ object StateStore extends Logging { loadedProviders.getOrElseUpdate( storeProviderId, StateStoreProvider.createAndInit( - storeProviderId, keySchema, valueSchema, numColsPrefixKey, + storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 7d1ec7f03237..a8d4c06bc83c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -76,9 +76,29 @@ object StateStoreErrors { messageParameters = Map("stateName" -> stateName)) } + def incorrectNumOrderingColsForPrefixScan(numPrefixCols: String): + StateStoreIncorrectNumOrderingColsForPrefixScan = { + new StateStoreIncorrectNumOrderingColsForPrefixScan(numPrefixCols) + } + + def incorrectNumOrderingColsForRangeScan(numOrderingCols: String): + StateStoreIncorrectNumOrderingColsForRangeScan = { + new StateStoreIncorrectNumOrderingColsForRangeScan(numOrderingCols) + } + + def nullTypeOrderingColsNotSupported(fieldName: String, index: String): + StateStoreNullTypeOrderingColsNotSupported = { + new StateStoreNullTypeOrderingColsNotSupported(fieldName, index) + } + + def variableSizeOrderingColsNotSupported(fieldName: String, index: String): + StateStoreVariableSizeOrderingColsNotSupported = { + new StateStoreVariableSizeOrderingColsNotSupported(fieldName, index) + } + def cannotCreateColumnFamilyWithReservedChars(colFamilyName: String): StateStoreCannotCreateColumnFamilyWithReservedChars = { - new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName) + new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName) } def cannotPerformOperationWithInvalidTimeoutMode( @@ -142,3 +162,23 @@ class StateStoreUnsupportedOperationOnMissingColumnFamily( colFamilyName: String) extends SparkUnsupportedOperationException( errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY", messageParameters = Map("operationType" -> operationType, "colFamilyName" -> colFamilyName)) + +class StateStoreIncorrectNumOrderingColsForPrefixScan(numPrefixCols: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_INCORRECT_NUM_PREFIX_COLS_FOR_PREFIX_SCAN", + messageParameters = Map("numPrefixCols" -> numPrefixCols)) + +class StateStoreIncorrectNumOrderingColsForRangeScan(numOrderingCols: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN", + messageParameters = Map("numOrderingCols" -> numOrderingCols)) + +class StateStoreVariableSizeOrderingColsNotSupported(fieldName: String, index: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED", + messageParameters = Map("fieldName" -> fieldName, "index" -> index)) + +class StateStoreNullTypeOrderingColsNotSupported(fieldName: String, index: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED", + messageParameters = Map("fieldName" -> fieldName, "index" -> index)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 1af51c49eaa5..133b4ab1cce3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -74,7 +74,7 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag]( storeVersion: Long, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, sessionState: SessionState, @transient private val storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, @@ -89,7 +89,7 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag]( val inputIter = dataRDD.iterator(partition, ctxt) val store = StateStore.getReadOnly( - storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion, + storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, storeVersion, useColumnFamilies, storeConf, hadoopConfBroadcast.value.value) storeReadFunction(store, inputIter) } @@ -109,7 +109,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( storeVersion: Long, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, sessionState: SessionState, @transient private val storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, @@ -125,8 +125,9 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( val inputIter = dataRDD.iterator(partition, ctxt) val store = StateStore.get( - storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion, - useColumnFamilies, storeConf, hadoopConfBroadcast.value.value, useMultipleValuesPerKey) + storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, storeVersion, + useColumnFamilies, storeConf, hadoopConfBroadcast.value.value, + useMultipleValuesPerKey) storeUpdateFunction(store, inputIter) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index b35cf2492666..9802a4dce4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -480,12 +480,12 @@ class SymmetricHashJoinStateManager( stateInfo.get, partitionId, getStateStoreName(joinSide, stateStoreType)) val store = if (useStateStoreCoordinator) { StateStore.get( - storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0, + storeProviderId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), stateInfo.get.storeVersion, useColumnFamilies = false, storeConf, hadoopConf) } else { // This class will manage the state store provider by itself. stateStoreProvider = StateStoreProvider.createAndInit( - storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0, + storeProviderId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, storeConf, hadoopConf, useMultipleValuesPerKey = false) stateStoreProvider.getStore(stateInfo.get.storeVersion) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala index 12cd7b8a127e..44e939424f55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala @@ -35,25 +35,27 @@ package object state { stateInfo: StatefulOperatorStateInfo, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int)( + keyStateEncoderSpec: KeyStateEncoderSpec)( storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = { mapPartitionsWithStateStore( stateInfo, keySchema, valueSchema, - numColsPrefixKey, + keyStateEncoderSpec, sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator))( storeUpdateFunction) } + // Disable scala style because num parameters exceeds the max limit used to enforce scala style + // scalastyle:off /** Map each partition of an RDD along with data in a [[StateStore]]. */ def mapPartitionsWithStateStore[U: ClassTag]( stateInfo: StatefulOperatorStateInfo, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, @@ -79,20 +81,21 @@ package object state { stateInfo.storeVersion, keySchema, valueSchema, - numColsPrefixKey, + keyStateEncoderSpec, sessionState, storeCoordinator, useColumnFamilies, extraOptions, useMultipleValuesPerKey) } + // scalastyle:on /** Map each partition of an RDD along with data in a [[ReadStateStore]]. */ private[streaming] def mapPartitionsWithReadStateStore[U: ClassTag]( stateInfo: StatefulOperatorStateInfo, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, @@ -117,7 +120,7 @@ package object state { stateInfo.storeVersion, keySchema, valueSchema, - numColsPrefixKey, + keyStateEncoderSpec, sessionState, storeCoordinator, useColumnFamilies, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1793b72b9039..3bf833816bcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -431,7 +431,7 @@ case class StateStoreRestoreExec( getStateInfo, keyExpressions.toStructType, stateManager.getStateValueSchema, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType), session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => val hasInput = iter.hasNext @@ -495,7 +495,7 @@ case class StateStoreSaveExec( getStateInfo, keyExpressions.toStructType, stateManager.getStateValueSchema, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType), session.sessionState, Some(session.streams.stateStoreCoordinator)) { (store, iter) => val numOutputRows = longMetric("numOutputRows") @@ -697,7 +697,8 @@ case class SessionWindowStateStoreRestoreExec( getStateInfo, stateManager.getStateKeySchema, stateManager.getStateValueSchema, - numColsPrefixKey = stateManager.getNumColsForPrefixKey, + PrefixKeyScanStateEncoderSpec(stateManager.getStateKeySchema, + stateManager.getNumColsForPrefixKey), session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => @@ -782,7 +783,8 @@ case class SessionWindowStateStoreSaveExec( getStateInfo, stateManager.getStateKeySchema, stateManager.getStateValueSchema, - numColsPrefixKey = stateManager.getNumColsForPrefixKey, + PrefixKeyScanStateEncoderSpec(stateManager.getStateKeySchema, + stateManager.getNumColsForPrefixKey), session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => @@ -971,7 +973,7 @@ abstract class BaseStreamingDeduplicateExec getStateInfo, keyExpressions.toStructType, schemaForValueRow, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(keyExpressions.toStructType), session.sessionState, Some(session.streams.stateStoreCoordinator), extraOptions = extraOptionOnStateStore) { (store, iter) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala index 8bba9b8d33c1..e0e3ee582bef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning} import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.streaming.state.StateStoreOps +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStoreOps} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType} import org.apache.spark.util.{CompletionIterator, NextIterator} @@ -52,7 +52,7 @@ case class StreamingGlobalLimitExec( getStateInfo, keySchema, valueSchema, - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(keySchema), session.sessionState, Some(session.streams.stateStoreCoordinator)) { (store, iter) => val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala index 3c8e0ca3e52d..a5c393ac0567 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, NoPrefixKeyStateEncoderSpec, RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StructField, StructType, TimestampType} import org.apache.spark.util.Utils @@ -346,8 +346,8 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { val provider = new HDFSBackedStateStoreProvider() val storeConf = new StateStoreConf(new SQLConf()) provider.init( - storeId, keySchema, valueSchema, 0, useColumnFamilies = false, - storeConf, new Configuration) + storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + useColumnFamilies = false, storeConf, new Configuration) provider } @@ -361,8 +361,8 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { val storeConf = new StateStoreConf(sqlConf) provider.init( - storeId, keySchema, valueSchema, 0, useColumnFamilies = false, - storeConf, new Configuration) + storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + useColumnFamilies = false, storeConf, new Configuration) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala index af2cd277b509..fc0c239a5d99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIteratorSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId, StreamingSessionWindowStateManager} +import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId, StreamingSessionWindowStateManager} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} @@ -220,8 +220,8 @@ class MergingSortWithSessionWindowStateIteratorSuite extends StreamTest with Bef val storeProviderId = StateStoreProviderId(stateInfo, 0, StateStoreId.DEFAULT_STORE_NAME) val store = StateStore.get( storeProviderId, manager.getStateKeySchema, manager.getStateValueSchema, - manager.getNumColsForPrefixKey, stateInfo.storeVersion, - useColumnFamilies = false, storeConf, new Configuration) + PrefixKeyScanStateEncoderSpec(manager.getStateKeySchema, manager.getNumColsForPrefixKey), + stateInfo.storeVersion, useColumnFamilies = false, storeConf, new Configuration) try { f(manager, store) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala index 91ffb7a66adc..5af8dc9a726d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala @@ -33,8 +33,8 @@ class MemoryStateStore extends StateStore() { override def createColFamilyIfAbsent( colFamilyName: String, keySchema: StructType, - numColsPrefixKey: Int, valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, useMultipleValuesPerKey: Boolean = false, isInternal: Boolean = false): Unit = { throw StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 1e838ccdb023..0b347e272a48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -24,10 +24,11 @@ import scala.util.Random import org.apache.hadoop.conf.Configuration import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkUnsupportedOperationException} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.LocalSparkSession.withSparkSession import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo import org.apache.spark.sql.internal.SQLConf @@ -35,6 +36,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.ExtendedSQLTest import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @ExtendedSQLTest @@ -102,7 +104,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid // Create state store in a task and get the RocksDBConf from the instantiated RocksDB instance val rocksDBConfInTask: RocksDBConf = testRDD.mapPartitionsWithStateStore[RocksDBConf]( - spark.sqlContext, testStateInfo, testSchema, testSchema, 0) { + spark.sqlContext, testStateInfo, testSchema, testSchema, + NoPrefixKeyStateEncoderSpec(testSchema)) { (store: StateStore, _: Iterator[String]) => // Use reflection to get RocksDB instance val dbInstanceMethod = @@ -158,6 +161,501 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + testWithColumnFamilies("rocksdb range scan validation - invalid num columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + // zero ordering cols + val ex1 = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex1, + errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN", + parameters = Map( + "numOrderingCols" -> "0" + ), + matchPVals = true + ) + + // ordering cols greater than schema cols + val ex2 = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, keySchemaWithRangeScan.length + 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex2, + errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN", + parameters = Map( + "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan validation - variable sized columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val keySchemaWithVariableSizeCols: StructType = StructType( + Seq(StructField("key1", StringType, false), StructField("key2", StringType, false))) + + val ex = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols, + RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex, + errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED", + parameters = Map( + "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name, + "index" -> "0" + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan validation - null type columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val keySchemaWithNullTypeCols: StructType = StructType( + Seq(StructField("key1", NullType, false), StructField("key2", StringType, false))) + + val ex = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols, + RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex, + errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED", + parameters = Map( + "fieldName" -> keySchemaWithNullTypeCols.fields(0).name, + "index" -> "0" + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + // use non-default col family if column families are enabled + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, "a") + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result === timerTimestamps.sorted) + store.commit() + + // test with a different set of power of 2 timestamps + val store1 = provider.getStore(1) + val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + timerTimestamps1.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, "a") + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + val result1 = store1.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps.foreach { ts => + // non-timestamp col is of variable size + val keyRow = dataToKeyRowWithRangeScan(ts, + Random.alphanumeric.take(Random.nextInt(20) + 1).mkString) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result === timerTimestamps.sorted) + store.commit() + + // test with a different set of power of 2 timestamps + val store1 = provider.getStore(1) + val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + timerTimestamps1.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, + Random.alphanumeric.take(Random.nextInt(20) + 1).mkString) + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + val result1 = store1.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " + + s"non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", LongType, false), + StructField("key2", IntegerType, false), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000), + (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + assert(result === timerTimestamps.sorted) + } + } + + testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " + + s"non-ordering columns with null values in first ordering column", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", LongType, true), + StructField("key2", IntegerType, true), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps = Seq((931L, 10), (null, 40), (452300L, 1), + (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), + (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + // verify that the expected null cols are seen + val nullRows = store.iterator(cfName).filter { kv => + val keyRow = kv.key + keyRow.isNullAt(0) + } + assert(nullRows.size === 3) + + // filter out the null rows and verify the rest + val result: Seq[(Long, Int)] = store.iterator(cfName).filter { kv => + val keyRow = kv.key + !keyRow.isNullAt(0) + }.map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + + val timerTimestampsWithoutNulls = Seq((931L, 10), (452300L, 1), + (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), + (6L, 90118), (9L, 95118), (6L, 87210)) + + assert(result === timerTimestampsWithoutNulls.sorted) + + // verify that the null cols are seen in the correct order filtering for nulls + val nullRowsWithOrder = store.iterator(cfName).filter { kv => + val keyRow = kv.key + keyRow.isNullAt(0) + }.map { kv => + val keyRow = kv.key + keyRow.getInt(1) + }.toSeq + + assert(nullRowsWithOrder === Seq(28, 40, 113)) + + store.abort() + + val store1 = provider.getStore(0) + if (colFamiliesEnabled) { + store1.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32), (null, 113), (null, 40872), + (null, 66)) + timerTimestamps1.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + // verify that ordering for non-null columns on the right in still maintained + val result1: Seq[Int] = store.iterator(cfName).map { kv => + val keyRow = kv.key + keyRow.getInt(1) + }.toSeq + + assert(result1 === timerTimestamps1.map(_._2).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " + + s"non-ordering columns with null values in second ordering column", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", LongType, true), + StructField("key2", IntegerType, true), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps = Seq((931L, 10), (40L, null), (452300L, 1), + (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), + (6L, 90118), (9L, 95118), (6L, 87210), (113L, null), (100L, null)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + // verify that the expected null cols are seen + val nullRows = store.iterator(cfName).filter { kv => + val keyRow = kv.key + keyRow.isNullAt(1) + } + assert(nullRows.size === 3) + + // the ordering based on first col which has non-null values should be preserved + val result: Seq[(Long, Int)] = store.iterator(cfName) + .map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + assert(result.map(_._1) === timerTimestamps.map(_._1).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan byte ordering column - variable size " + + s"non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", ByteType, false), + StructField("key2", IntegerType, false), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68), + (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112), + (0x06, 90118), (0x09, 95118), (0x06, 87210)) + timerTimestamps.foreach { ts => + // order by byte col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + assert(result === timerTimestamps.sorted) + } + } + + testWithColumnFamilies("rocksdb range scan - ordering cols and key schema cols are same", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + // use the same schema as value schema for single col key schema + tryWithProviderResource(newStoreProvider(valueSchema, + RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + valueSchema, valueSchema, + RangeKeyScanStateEncoderSpec(valueSchema, 1)) + } + + val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 6, 9, 5) + timerTimestamps.foreach { ts => + // non-timestamp col is of variable size + val keyRow = dataToValueRow(ts) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + valueRowToData(kv.key) + }.toSeq + assert(result === timerTimestamps.sorted) + + // also check for prefix scan + timerTimestamps.foreach { ts => + val prefix = dataToValueRow(ts) + val result = store.prefixScan(prefix, cfName).map { kv => + assert(valueRowToData(kv.value) === 1) + valueRowToData(kv.key) + }.toSeq + assert(result.size === 1) + } + } + } + + testWithColumnFamilies("rocksdb range scan - with prefix scan", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 1L) + timerTimestamps.zipWithIndex.foreach { case (ts, idx) => + (1 to idx + 1).foreach { keyVal => + val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + } + + timerTimestamps.zipWithIndex.foreach { case (ts, idx) => + val prefix = dataToPrefixKeyRowWithRangeScan(ts) + val result = store.prefixScan(prefix, cfName).map { kv => + assert(valueRowToData(kv.value) === 1) + val key = keyRowWithRangeScanToData(kv.key) + key._2 + }.toSeq + assert(result.size === idx + 1) + } + } + } + testWithColumnFamilies("rocksdb key and value schema encoders for column families", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => val testColFamily = "testState" @@ -165,7 +663,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider => val store = provider.getStore(0) if (colFamiliesEnabled) { - store.createColFamilyIfAbsent(testColFamily, keySchema, numColsPrefixKey = 0, valueSchema) + store.createColFamilyIfAbsent(testColFamily, + keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema)) val keyRow1 = dataToKeyRow("a", 0) val valueRow1 = dataToValueRow(1) store.put(keyRow1, valueRow1, colFamilyName = testColFamily) @@ -222,38 +721,48 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } def newStoreProvider(storeId: StateStoreId): RocksDBStateStoreProvider = { - newStoreProvider(storeId, numColsPrefixKey = 0) + newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema)) } override def newStoreProvider(storeId: StateStoreId, useColumnFamilies: Boolean): RocksDBStateStoreProvider = { - newStoreProvider(storeId, numColsPrefixKey = 0, useColumnFamilies = useColumnFamilies) + newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema), + useColumnFamilies = useColumnFamilies) } override def newStoreProvider(useColumnFamilies: Boolean): RocksDBStateStoreProvider = { - newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), numColsPrefixKey = 0, + newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), + NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = useColumnFamilies) } def newStoreProvider(useColumnFamilies: Boolean, useMultipleValuesPerKey: Boolean): RocksDBStateStoreProvider = { - newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), numColsPrefixKey = 0, + newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), + NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = useColumnFamilies, useMultipleValuesPerKey = useMultipleValuesPerKey ) } def newStoreProvider(storeId: StateStoreId, conf: Configuration): RocksDBStateStoreProvider = { - newStoreProvider(storeId, numColsPrefixKey = -1, conf = conf) + newStoreProvider(storeId, NoPrefixKeyStateEncoderSpec(keySchema), conf = conf) } - override def newStoreProvider(numPrefixCols: Int): RocksDBStateStoreProvider = { - newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), numColsPrefixKey = numPrefixCols) + override def newStoreProvider( + keySchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, + useColumnFamilies: Boolean): RocksDBStateStoreProvider = { + newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), + keyStateEncoderSpec = keyStateEncoderSpec, + keySchema = keySchema, + useColumnFamilies = useColumnFamilies) } def newStoreProvider( storeId: StateStoreId, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, + keySchema: StructType = keySchema, sqlConf: Option[SQLConf] = None, conf: Configuration = new Configuration, useColumnFamilies: Boolean = false, @@ -263,12 +772,11 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid storeId, keySchema, valueSchema, - numColsPrefixKey = numColsPrefixKey, + keyStateEncoderSpec, useColumnFamilies, new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf, - useMultipleValuesPerKey - ) + useMultipleValuesPerKey) provider } @@ -292,8 +800,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid override def newStoreProvider( minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = { - newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 0, - Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory))) + newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), + NoPrefixKeyStateEncoderSpec(keySchema), + sqlConf = Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory))) } override def getDefaultSQLConf( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala index f2a555406336..3127c9f60249 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala @@ -62,13 +62,15 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { val path = Utils.createDirectory(tempDir, Random.nextFloat().toString).toString val rdd1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 0))) .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, version = 0), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) assert(rdd1.collect().toSet === Set(("a", 0) -> 2, ("b", 0) -> 1)) // Generate next version of stores val rdd2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("c", 0))) .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, version = 1), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) assert(rdd2.collect().toSet === Set(("a", 0) -> 3, ("b", 0) -> 1, ("c", 0) -> 1)) // Make sure the previous RDD still has the same data. @@ -86,7 +88,8 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { implicit val sqlContext = spark.sqlContext makeRDD(spark.sparkContext, Seq(("a", 0))).mapPartitionsWithStateStore( sqlContext, operatorStateInfo(path, version = storeVersion), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) } // Generate RDDs and state store data @@ -136,19 +139,22 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { val rddOfGets1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("c", 0))) .mapPartitionsWithStateStore(spark.sqlContext, operatorStateInfo(path, version = 0), - keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfGets) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfGets) assert(rddOfGets1.collect().toSet === Set(("a", 0) -> None, ("b", 0) -> None, ("c", 0) -> None)) val rddOfPuts = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 0))) .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, version = 0), - keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfPuts) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfPuts) assert(rddOfPuts.collect().toSet === Set(("a", 0) -> 1, ("a", 0) -> 2, ("b", 0) -> 1)) val rddOfGets2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("c", 0))) .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, version = 1), - keySchema, valueSchema, numColsPrefixKey = 0)(iteratorOfGets) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(iteratorOfGets) assert(rddOfGets2.collect().toSet === Set(("a", 0) -> Some(2), ("b", 0) -> Some(1), ("c", 0) -> None)) } @@ -174,7 +180,8 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { val rdd = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 0))) .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, queryRunId = queryRunId), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) require(rdd.partitions.length === 2) assert( @@ -202,13 +209,15 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { val opId = 0 val rdd1 = makeRDD(spark.sparkContext, Seq(("a", 0), ("b", 0), ("a", 0))) .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, version = 0), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) assert(rdd1.collect().toSet === Set(("a", 0) -> 2, ("b", 0) -> 1)) // Generate next version of stores val rdd2 = makeRDD(spark.sparkContext, Seq(("a", 0), ("c", 0))) .mapPartitionsWithStateStore(sqlContext, operatorStateInfo(path, version = 1), - keySchema, valueSchema, numColsPrefixKey = 0)(increment) + keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema))(increment) assert(rdd2.collect().toSet === Set(("a", 0) -> 3, ("b", 0) -> 1, ("c", 0) -> 1)) // Make sure the previous RDD still has the same data. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 64b3e75ea976..231396aff222 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -62,7 +62,7 @@ class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider { stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConfs: StateStoreConf, hadoopConf: Configuration, @@ -197,6 +197,25 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("running with range scan encoder should fail") { + val ex = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + keyStateEncoderSpec = RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), + useColumnFamilies = false)) { provider => + provider.getStore(0) + } + } + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> "Range scan", + "entity" -> "HDFSBackedStateStoreProvider" + ), + matchPVals = true + ) + } + test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), partition = 0, numOfVersToRetainInMemory = 1)) { provider => @@ -390,7 +409,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] def generateStoreVersions(): Unit = { for (i <- 1 to 20) { - val store = StateStore.get(storeProviderId1, keySchema, valueSchema, numColsPrefixKey = 0, + val store = StateStore.get(storeProviderId1, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf) put(store, "a", 0, i) store.commit() @@ -444,7 +464,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } // Reload the store and verify - StateStore.get(storeProviderId1, keySchema, valueSchema, numColsPrefixKey = 0, + StateStore.get(storeProviderId1, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf) assert(StateStore.isLoaded(storeProviderId1)) @@ -456,7 +477,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } // Reload the store and verify - StateStore.get(storeProviderId1, keySchema, valueSchema, numColsPrefixKey = 0, + StateStore.get(storeProviderId1, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf) assert(StateStore.isLoaded(storeProviderId1)) @@ -464,7 +486,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // then this executor should unload inactive instances immediately. coordinatorRef .reportActiveInstance(storeProviderId1, "other-host", "other-exec", Seq.empty) - StateStore.get(storeProviderId2, keySchema, valueSchema, numColsPrefixKey = 0, + StateStore.get(storeProviderId2, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), 0, useColumnFamilies = false, storeConf, hadoopConf) assert(!StateStore.isLoaded(storeProviderId1)) assert(StateStore.isLoaded(storeProviderId2)) @@ -500,7 +523,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] def generateStoreVersions(): Unit = { for (i <- 1 to 20) { - val store = StateStore.get(storeProviderId1, keySchema, valueSchema, numColsPrefixKey = 0, + val store = StateStore.get(storeProviderId1, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), latestStoreVersion, useColumnFamilies = false, storeConf, hadoopConf) put(store, "a", 0, i) store.commit() @@ -633,7 +657,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Getting the store should not create temp file val store0 = shouldNotCreateTempFile { StateStore.get( - storeId, keySchema, valueSchema, numColsPrefixKey = 0, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), version = 0, useColumnFamilies = false, storeConf, hadoopConf) } @@ -650,7 +675,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Remove should create a temp file val store1 = shouldNotCreateTempFile { StateStore.get( - storeId, keySchema, valueSchema, numColsPrefixKey = 0, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), version = 1, useColumnFamilies = false, storeConf, hadoopConf) } remove(store1, _._1 == "a") @@ -665,7 +691,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Commit without any updates should create a delta file val store2 = shouldNotCreateTempFile { StateStore.get( - storeId, keySchema, valueSchema, numColsPrefixKey = 0, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), version = 2, useColumnFamilies = false, storeConf, hadoopConf) } store2.commit() @@ -913,7 +940,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] def newStoreProvider( opId: Long, partition: Int, - numColsPrefixKey: Int = 0, + keyStateEncoderSpec: KeyStateEncoderSpec = NoPrefixKeyStateEncoderSpec(keySchema), + keySchema: StructType = keySchema, dir: String = newDir(), minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, @@ -924,15 +952,19 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] StateStoreId(dir, opId, partition), keySchema, valueSchema, - numColsPrefixKey = numColsPrefixKey, + keyStateEncoderSpec, useColumnFamilies = false, new StateStoreConf(sqlConf), hadoopConf) provider } - override def newStoreProvider(numPrefixCols: Int): HDFSBackedStateStoreProvider = { - newStoreProvider(opId = Random.nextInt(), partition = 0, numColsPrefixKey = numPrefixCols) + override def newStoreProvider( + keySchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, + useColumnFamilies: Boolean): HDFSBackedStateStoreProvider = { + newStoreProvider(opId = Random.nextInt(), partition = 0, + keyStateEncoderSpec = keyStateEncoderSpec) } override def newStoreProvider(useColumnFamilies: Boolean): HDFSBackedStateStoreProvider = { @@ -1041,7 +1073,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } testWithAllCodec("prefix scan") { colFamiliesEnabled => - tryWithProviderResource(newStoreProvider(numPrefixCols = 1)) { provider => + tryWithProviderResource(newStoreProvider(keySchema, PrefixKeyScanStateEncoderSpec(keySchema, 1), + colFamiliesEnabled)) { provider => // Verify state before starting a new set of updates assert(getLatestData(provider, useColumnFamilies = false).isEmpty) @@ -1351,7 +1384,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // Verify that trying to get incorrect versions throw errors var e = intercept[SparkException] { StateStore.get( - storeId, keySchema, valueSchema, 0, -1, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), -1, useColumnFamilies = false, storeConf, hadoopConf) } checkError( @@ -1364,7 +1398,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] e = intercept[SparkException] { StateStore.get( - storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + 1, useColumnFamilies = false, storeConf, hadoopConf) } checkError( @@ -1379,14 +1415,18 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // Increase version of the store and try to get again val store0 = StateStore.get( - storeId, keySchema, valueSchema, 0, 0, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + 0, useColumnFamilies = false, storeConf, hadoopConf) assert(store0.version === 0) put(store0, "a", 0, 1) store0.commit() val store1 = StateStore.get( - storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + 1, useColumnFamilies = false, storeConf, hadoopConf) assert(StateStore.isLoaded(storeId)) assert(store1.version === 1) @@ -1394,7 +1434,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // Verify that you can also load older version val store0reloaded = StateStore.get( - storeId, keySchema, valueSchema, 0, 0, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + 0, useColumnFamilies = false, storeConf, hadoopConf) assert(store0reloaded.version === 0) assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty) @@ -1404,7 +1446,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] assert(!StateStore.isLoaded(storeId)) val store1reloaded = StateStore.get( - storeId, keySchema, valueSchema, 0, 1, useColumnFamilies = false, + storeId, keySchema, valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + 1, useColumnFamilies = false, storeConf, hadoopConf) assert(StateStore.isLoaded(storeId)) assert(store1reloaded.version === 1) @@ -1500,8 +1544,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] val storeConf = StateStoreConf(sqlConf) // get the state store and kick off the maintenance task - StateStore.get(storeId, null, null, 0, 0, - useColumnFamilies = false, storeConf, sc.hadoopConfiguration) + StateStore.get(storeId, null, null, + NoPrefixKeyStateEncoderSpec(keySchema), 0, useColumnFamilies = false, + storeConf, sc.hadoopConfiguration) eventually(timeout(30.seconds)) { assert(!StateStore.isMaintenanceRunning) @@ -1559,7 +1604,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] def newStoreProvider(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): ProviderClass /** Return a new provider with setting prefix key */ - def newStoreProvider(numPrefixCols: Int): ProviderClass + def newStoreProvider( + keySchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, + useColumnFamilies: Boolean): ProviderClass /** Return a new provider with useColumnFamilies set to true */ def newStoreProvider(useColumnFamilies: Boolean): ProviderClass @@ -1646,18 +1694,31 @@ object StateStoreTestsHelper { Seq(StructField("key1", StringType, true), StructField("key2", IntegerType, true))) val valueSchema = StructType(Seq(StructField("value", IntegerType, true))) + val keySchemaWithRangeScan: StructType = StructType( + Seq(StructField("key1", LongType, false), StructField("key2", StringType, false))) + val keyProj = UnsafeProjection.create(Array[DataType](StringType, IntegerType)) + val rangeScanProj = UnsafeProjection.create(Array[DataType](LongType, StringType)) val prefixKeyProj = UnsafeProjection.create(Array[DataType](StringType)) + val prefixKeyProjWithRangeScan = UnsafeProjection.create(Array[DataType](LongType)) val valueProj = UnsafeProjection.create(Array[DataType](IntegerType)) def dataToPrefixKeyRow(s: String): UnsafeRow = { prefixKeyProj.apply(new GenericInternalRow(Array[Any](UTF8String.fromString(s)))).copy() } + def dataToPrefixKeyRowWithRangeScan(ts: Long): UnsafeRow = { + prefixKeyProjWithRangeScan.apply(new GenericInternalRow(Array[Any](ts))).copy() + } + def dataToKeyRow(s: String, i: Int): UnsafeRow = { keyProj.apply(new GenericInternalRow(Array[Any](UTF8String.fromString(s), i))).copy() } + def dataToKeyRowWithRangeScan(ts: Long, s: String): UnsafeRow = { + rangeScanProj.apply(new GenericInternalRow(Array[Any](ts, UTF8String.fromString(s)))).copy() + } + def dataToValueRow(i: Int): UnsafeRow = { valueProj.apply(new GenericInternalRow(Array[Any](i))).copy() } @@ -1666,6 +1727,10 @@ object StateStoreTestsHelper { (row.getUTF8String(0).toString, row.getInt(1)) } + def keyRowWithRangeScanToData(row: UnsafeRow): (Long, String) = { + (row.getLong(0), row.getUTF8String(1).toString) + } + def valueRowToData(row: UnsafeRow): Int = { row.getInt(0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala index f758e44e6680..1607d7e699d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManagerSuite.scala @@ -184,8 +184,8 @@ class StreamingSessionWindowStateManagerSuite extends StreamTest with BeforeAndA val storeProviderId = StateStoreProviderId(stateInfo, 0, StateStoreId.DEFAULT_STORE_NAME) val store = StateStore.get( storeProviderId, manager.getStateKeySchema, manager.getStateValueSchema, - manager.getNumColsForPrefixKey, stateInfo.storeVersion, - useColumnFamilies = false, storeConf, new Configuration) + PrefixKeyScanStateEncoderSpec(manager.getStateKeySchema, manager.getNumColsForPrefixKey), + stateInfo.storeVersion, useColumnFamilies = false, storeConf, new Configuration) try { f(manager, store) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index e86ac03b70d9..8668b58672c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -187,8 +187,8 @@ class ValueStateSuite extends StateVariableSuiteBase { val storeConf = new StateStoreConf(new SQLConf()) val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] { provider.init( - storeId, keySchema, valueSchema, 0, useColumnFamilies = true, - storeConf, new Configuration) + storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + useColumnFamilies = true, storeConf, new Configuration) } checkError( ex, @@ -334,19 +334,19 @@ abstract class StateVariableSuiteBase extends SharedSparkSession protected def newStoreProviderWithStateVariable( useColumnFamilies: Boolean): RocksDBStateStoreProvider = { newStoreProviderWithStateVariable(StateStoreId(newDir(), Random.nextInt(), 0), - numColsPrefixKey = 0, + NoPrefixKeyStateEncoderSpec(schemaForKeyRow), useColumnFamilies = useColumnFamilies) } protected def newStoreProviderWithStateVariable( storeId: StateStoreId, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, sqlConf: SQLConf = SQLConf.get, conf: Configuration = new Configuration, useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() provider.init( - storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec, useColumnFamilies, new StateStoreConf(sqlConf), conf, useMultipleValuesPerKey) provider diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index fa08a44dc9e5..32822994c81c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink} -import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1415,7 +1415,7 @@ class TestStateStoreProvider extends StateStoreProvider { stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, - numColsPrefixKey: Int, + keyStateEncoderSpec: KeyStateEncoderSpec, useColumnFamilies: Boolean, storeConfs: StateStoreConf, hadoopConf: Configuration, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org