This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e778ce689dcb [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families e778ce689dcb is described below commit e778ce689dcbe5e75ce5781a03cf9d8466910cd2 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Tue Mar 12 14:27:50 2024 +0900 [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families ### What changes were proposed in this pull request? Add additional validations and NERF changes for RocksDB state provider and use of col families ### Why are the changes needed? Improve error handling and migrating errors to NERF. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new unit tests StateStoreSuite ``` ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: shuffle-boss-36-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-33-1 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true) ===== [info] Run completed in 2 minutes, 57 seconds. [info] Total number of tests run: 151 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 151, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` RocksDBSuite ``` [info] Run completed in 4 minutes, 54 seconds. [info] Total number of tests run: 188 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 188, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45360 from anishshri-db/task/SPARK-47250. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 10 +- docs/sql-error-conditions.md | 10 +- .../state/HDFSBackedStateStoreProvider.scala | 33 ++++-- .../sql/execution/streaming/state/RocksDB.scala | 92 +++++++++++---- .../state/RocksDBStateStoreProvider.scala | 4 +- .../streaming/state/StateStoreErrors.scala | 49 ++++---- .../execution/streaming/state/RocksDBSuite.scala | 124 +++++++++++++++++++-- .../streaming/state/StateStoreSuite.scala | 63 +++++++++++ .../streaming/state/ValueStateSuite.scala | 2 +- 9 files changed, 318 insertions(+), 69 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 3d130fdce301..99fbc585f981 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3371,9 +3371,9 @@ ], "sqlState" : "0A000" }, - "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : { + "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME" : { "message" : [ - "Failed to remove default column family with reserved name=<colFamilyName>." + "Failed to perform column family operation=<operationName> with invalid name=<colFamilyName>. Column family name cannot be empty or include leading/trailing spaces or use the reserved keyword=default" ], "sqlState" : "42802" }, @@ -3396,6 +3396,12 @@ ], "sqlState" : "XXKST" }, + "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : { + "message" : [ + "State store operation=<operationType> not supported on missing column family=<colFamilyName>." + ], + "sqlState" : "42802" + }, "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : { "message" : [ "Static partition column <staticName> is also specified in the column list." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 2cddb6a94c14..b6b159f277c0 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2105,11 +2105,11 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists Star (*) is not allowed in a select list when GROUP BY an ordinal position is used. -### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY +### STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -Failed to remove default column family with reserved name=`<colFamilyName>`. +Failed to perform column family operation=`<operationName>` with invalid name=`<colFamilyName>`. Column family name cannot be empty or include leading/trailing spaces or use the reserved keyword=default ### STATE_STORE_HANDLE_NOT_INITIALIZED @@ -2130,6 +2130,12 @@ Store does not support multiple values per key `<operationType>` operation not supported with `<entity>` +### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +State store operation=`<operationType>` not supported on missing column family=`<colFamilyName>`. + ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST [SQLSTATE: 42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 01e2e7f26083..edb95615d588 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -73,6 +73,8 @@ import org.apache.spark.util.ArrayImplicits._ */ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging { + private val providerName = "HDFSBackedStateStoreProvider" + class HDFSBackedReadStateStore(val version: Long, map: HDFSBackedStateStoreMap) extends ReadStateStore { @@ -124,14 +126,25 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with numColsPrefixKey: Int, valueSchema: StructType, useMultipleValuesPerKey: Boolean = false): Unit = { - throw StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider") + throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) + } + + // Multiple col families are not supported with HDFSBackedStateStoreProvider. Throw an exception + // if the user tries to use a non-default col family. + private def assertUseOfDefaultColFamily(colFamilyName: String): Unit = { + if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + + throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) + } } override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = { + assertUseOfDefaultColFamily(colFamilyName) mapToUpdate.get(key) } override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): Unit = { + assertUseOfDefaultColFamily(colFamilyName) require(value != null, "Cannot put a null value") verify(state == UPDATING, "Cannot put after already committed or aborted") val keyCopy = key.copy() @@ -141,6 +154,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } override def remove(key: UnsafeRow, colFamilyName: String): Unit = { + assertUseOfDefaultColFamily(colFamilyName) verify(state == UPDATING, "Cannot remove after already committed or aborted") val prevValue = mapToUpdate.remove(key) if (prevValue != null) { @@ -179,10 +193,14 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with * Get an iterator of all the store data. * This can be called only after committing all the updates made in the current thread. */ - override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = mapToUpdate.iterator() + override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = { + assertUseOfDefaultColFamily(colFamilyName) + mapToUpdate.iterator() + } override def prefixScan(prefixKey: UnsafeRow, colFamilyName: String): Iterator[UnsafeRowPair] = { + assertUseOfDefaultColFamily(colFamilyName) mapToUpdate.prefixScan(prefixKey) } @@ -211,18 +229,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } override def removeColFamilyIfExists(colFamilyName: String): Unit = { - throw StateStoreErrors.removingColumnFamiliesNotSupported( - "HDFSBackedStateStoreProvider") + throw StateStoreErrors.removingColumnFamiliesNotSupported(providerName) } override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { - throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", "HDFSStateStore") + throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName) } override def merge(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): Unit = { - throw StateStoreErrors.unsupportedOperationException("merge", "HDFSStateStore") + throw StateStoreErrors.unsupportedOperationException("merge", providerName) } } @@ -280,11 +297,11 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with // TODO: add support for multiple col families with HDFSBackedStateStoreProvider if (useColumnFamilies) { - throw StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider") + throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName) } if (useMultipleValuesPerKey) { - throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", "HDFSStateStore") + throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName) } require((keySchema.length == 0 && numColsPrefixKey == 0) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 41cab78df195..4437cc5583d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.rocksdb.CompressionType._ import org.rocksdb.TickerType._ -import org.apache.spark.{SparkUnsupportedOperationException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors @@ -242,26 +242,75 @@ class RocksDB( loadedVersion = endVersion } + /** + * Function to check if the column family exists in the state store instance. + * @param colFamilyName - name of the column family + * @return - true if the column family exists, false otherwise + */ private def checkColFamilyExists(colFamilyName: String): Boolean = { colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { - if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private val multColFamiliesDisabledStr = "multiple column families disabled in " + + "RocksDBStateStoreProvider" + + /** + * Function to verify invariants for column family based operations such as get, put, remove etc. + * @param operationName - name of the store operation + * @param colFamilyName - name of the column family + */ + private def verifyColFamilyOperations( + operationName: String, + colFamilyName: String): Unit = { + if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + // if the state store instance does not support multiple column families, throw an exception + if (!useColumnFamilies) { + throw StateStoreErrors.unsupportedOperationException(operationName, + multColFamiliesDisabledStr) + } + + // if the column family name is empty or contains leading/trailing whitespaces, throw an + // exception + if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) { + throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName) + } + + // if the column family does not exist, throw an exception + if (!checkColFamilyExists(colFamilyName)) { + throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, + colFamilyName) + } } } /** - * Create RocksDB column family, if not created already + * Function to verify invariants for column family creation or deletion operations. + * @param operationName - name of the store operation + * @param colFamilyName - name of the column family */ - def createColFamilyIfAbsent(colFamilyName: String): Unit = { - if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw new SparkUnsupportedOperationException( - errorClass = "_LEGACY_ERROR_TEMP_3197", - messageParameters = Map("colFamilyName" -> colFamilyName).toMap) + private def verifyColFamilyCreationOrDeletion( + operationName: String, + colFamilyName: String): Unit = { + // if the state store instance does not support multiple column families, throw an exception + if (!useColumnFamilies) { + throw StateStoreErrors.unsupportedOperationException(operationName, + multColFamiliesDisabledStr) + } + + // if the column family name is empty or contains leading/trailing whitespaces + // or using the reserved "default" column family, throw an exception + if (colFamilyName.isEmpty + || colFamilyName.trim != colFamilyName + || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { + throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName) } + } + /** + * Create RocksDB column family, if not created already + */ + def createColFamilyIfAbsent(colFamilyName: String): Unit = { + verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName) if (!checkColFamilyExists(colFamilyName)) { assert(db != null) val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, columnFamilyOptions) @@ -274,10 +323,7 @@ class RocksDB( * Remove RocksDB column family, if exists */ def removeColFamilyIfExists(colFamilyName: String): Unit = { - if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw StateStoreErrors.cannotRemoveDefaultColumnFamily(colFamilyName) - } - + verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName) if (checkColFamilyExists(colFamilyName)) { assert(db != null) val handle = colFamilyNameToHandleMap(colFamilyName) @@ -293,7 +339,7 @@ class RocksDB( def get( key: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("get", colFamilyName) db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) } @@ -305,7 +351,7 @@ class RocksDB( key: Array[Byte], value: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("put", colFamilyName) if (conf.trackTotalNumberOfRows) { val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) if (oldValue == null) { @@ -337,10 +383,10 @@ class RocksDB( value: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { if (!useColumnFamilies) { - throw new RuntimeException("Merge operation uses changelog checkpointing v2 which" + - " requires column families to be enabled.") + throw StateStoreErrors.unsupportedOperationException("merge", + multColFamiliesDisabledStr) } - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("merge", colFamilyName) if (conf.trackTotalNumberOfRows) { val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) @@ -360,7 +406,7 @@ class RocksDB( def remove( key: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("remove", colFamilyName) if (conf.trackTotalNumberOfRows) { val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) if (value != null) { @@ -380,7 +426,7 @@ class RocksDB( */ def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Iterator[ByteArrayPair] = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("iterator", colFamilyName) val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName)) logInfo(s"Getting iterator from version $loadedVersion") @@ -409,7 +455,7 @@ class RocksDB( } private def countKeys(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Long = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("countKeys", colFamilyName) val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName)) try { @@ -431,7 +477,7 @@ class RocksDB( def prefixScan(prefix: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Iterator[ByteArrayPair] = { - verifyColFamilyExists(colFamilyName) + verifyColFamilyOperations("prefixScan", colFamilyName) val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName)) iter.seek(prefix) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 7374abdbde98..721d8aa03079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -101,14 +101,14 @@ private[sql] class RocksDBStateStoreProvider override def merge(key: UnsafeRow, value: UnsafeRow, colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - verify(state == UPDATING, "Cannot put after already committed or aborted") + verify(state == UPDATING, "Cannot merge after already committed or aborted") val kvEncoder = keyValueEncoderMap.get(colFamilyName) val keyEncoder = kvEncoder._1 val valueEncoder = kvEncoder._2 verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation requires an encoder" + " which supports multiple values for a single key") verify(key != null, "Key cannot be null") - require(value != null, "Cannot put a null value") + require(value != null, "Cannot merge a null value") rocksDB.merge(keyEncoder.encodeKey(key), valueEncoder.encodeValue(value), colFamilyName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 6f4c3d4c9675..8a0276557f8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -32,25 +32,30 @@ object StateStoreErrors { ) } + def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String): + StateStoreUnsupportedOperationOnMissingColumnFamily = { + new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, colFamilyName) + } + def multipleColumnFamiliesNotSupported(stateStoreProvider: String): StateStoreMultipleColumnFamiliesNotSupportedException = { - new StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider) - } + new StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider) + } def removingColumnFamiliesNotSupported(stateStoreProvider: String): StateStoreRemovingColumnFamiliesNotSupportedException = { - new StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider) - } + new StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider) + } - def cannotRemoveDefaultColumnFamily(colFamilyName: String): - StateStoreCannotRemoveDefaultColumnFamily = { - new StateStoreCannotRemoveDefaultColumnFamily(colFamilyName) - } + def cannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String): + StateStoreCannotUseColumnFamilyWithInvalidName = { + new StateStoreCannotUseColumnFamilyWithInvalidName(operationName, colFamilyName) + } def unsupportedOperationException(operationName: String, entity: String): StateStoreUnsupportedOperationException = { - new StateStoreUnsupportedOperationException(operationName, entity) - } + new StateStoreUnsupportedOperationException(operationName, entity) + } def requireNonNullStateValue(value: Any, stateName: String): Unit = { SparkException.require(value != null, @@ -68,23 +73,25 @@ object StateStoreErrors { class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String) extends SparkUnsupportedOperationException( errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", - messageParameters = Map("stateStoreProvider" -> stateStoreProvider) - ) + messageParameters = Map("stateStoreProvider" -> stateStoreProvider)) + class StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider: String) extends SparkUnsupportedOperationException( errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES", - messageParameters = Map("stateStoreProvider" -> stateStoreProvider) - ) + messageParameters = Map("stateStoreProvider" -> stateStoreProvider)) -class StateStoreCannotRemoveDefaultColumnFamily(colFamilyName: String) +class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String) extends SparkUnsupportedOperationException( - errorClass = "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY", - messageParameters = Map("colFamilyName" -> colFamilyName) - ) - + errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME", + messageParameters = Map("operationName" -> operationName, "colFamilyName" -> colFamilyName)) class StateStoreUnsupportedOperationException(operationType: String, entity: String) extends SparkUnsupportedOperationException( errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", - messageParameters = Map("operationType" -> operationType, "entity" -> entity) - ) + messageParameters = Map("operationType" -> operationType, "entity" -> entity)) + +class StateStoreUnsupportedOperationOnMissingColumnFamily( + operationType: String, + colFamilyName: String) extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY", + messageParameters = Map("operationType" -> operationType, "colFamilyName" -> colFamilyName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 0bc3828318da..a7d4ab362340 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -536,6 +536,110 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies(s"RocksDB: column family creation with invalid names", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + + val conf = RocksDBConf().copy() + withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + Seq("default", "", " ", " ", " default", " default ").foreach { colFamilyName => + val ex = intercept[SparkUnsupportedOperationException] { + db.createColFamilyIfAbsent(colFamilyName) + } + + if (!colFamiliesEnabled) { + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> "create_col_family", + "entity" -> "multiple column families disabled in RocksDBStateStoreProvider" + ), + matchPVals = true + ) + } else { + checkError( + ex, + errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME", + parameters = Map( + "operationName" -> "create_col_family", + "colFamilyName" -> colFamilyName + ), + matchPVals = true + ) + } + } + } + } + + private def verifyStoreOperationUnsupported( + operationName: String, + colFamiliesEnabled: Boolean, + colFamilyName: String) + (testFn: => Unit): Unit = { + val ex = intercept[SparkUnsupportedOperationException] { + testFn + } + + if (!colFamiliesEnabled) { + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> operationName, + "entity" -> "multiple column families disabled in RocksDBStateStoreProvider" + ), + matchPVals = true + ) + } else { + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY", + parameters = Map( + "operationType" -> operationName, + "colFamilyName" -> colFamilyName + ), + matchPVals = true + ) + } + } + + testWithColumnFamilies(s"RocksDB: operations on absent column family", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + + val conf = RocksDBConf().copy() + withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + db.load(0) + val colFamilyName = "test" + verifyStoreOperationUnsupported("put", colFamiliesEnabled, colFamilyName) { + db.put("a", "1", colFamilyName) + } + + verifyStoreOperationUnsupported("remove", colFamiliesEnabled, colFamilyName) { + db.remove("a", colFamilyName) + } + + verifyStoreOperationUnsupported("get", colFamiliesEnabled, colFamilyName) { + db.get("a", colFamilyName) + } + + verifyStoreOperationUnsupported("iterator", colFamiliesEnabled, colFamilyName) { + db.iterator(colFamilyName) + } + + verifyStoreOperationUnsupported("merge", colFamiliesEnabled, colFamilyName) { + db.merge("a", "1", colFamilyName) + } + + verifyStoreOperationUnsupported("prefixScan", colFamiliesEnabled, colFamilyName) { + db.prefixScan("a", colFamilyName) + } + } + } + testWithColumnFamilies(s"RocksDB: get, put, iterator, commit, load " + s"with multiple column families", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => @@ -545,13 +649,6 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared val colFamily2: String = "xyz" val conf = RocksDBConf().copy() - withDB(remoteDir, conf = conf, useColumnFamilies = true) { db => - val ex = intercept[Exception] { - db.createColFamilyIfAbsent("default") - } - ex.getCause.isInstanceOf[UnsupportedOperationException] - } - withDB(remoteDir, conf = conf, useColumnFamilies = true) { db => db.createColFamilyIfAbsent(colFamily1) db.createColFamilyIfAbsent(colFamily2) @@ -572,7 +669,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } withDB(remoteDir, conf = conf, version = 0, useColumnFamilies = true) { db => - val ex = intercept[Exception] { + val ex = intercept[SparkUnsupportedOperationException] { // version 0 can be loaded again assert(toStr(db.get("a", colFamily1)) === null) assert(iterator(db, colFamily1).isEmpty) @@ -581,8 +678,15 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared assert(toStr(db.get("a", colFamily2)) === null) assert(iterator(db, colFamily2).isEmpty) } - assert(ex.isInstanceOf[RuntimeException]) - assert(ex.getMessage.contains("does not exist")) + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY", + parameters = Map( + "operationType" -> "get", + "colFamilyName" -> colFamily1 + ), + matchPVals = true + ) } withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = true) { db => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index a8c7fc05f21e..64b3e75ea976 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -134,6 +134,69 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + private def verifyStoreOperationUnsupported(operationName: String)(testFn: => Unit): Unit = { + if (operationName != "merge") { + val ex = intercept[SparkUnsupportedOperationException] { + testFn + } + checkError( + ex, + errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", + parameters = Map( + "stateStoreProvider" -> "HDFSBackedStateStoreProvider" + ), + matchPVals = true + ) + } else { + val ex = intercept[SparkUnsupportedOperationException] { + testFn + } + checkError( + ex, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> operationName, + "entity" -> "HDFSBackedStateStoreProvider" + ), + matchPVals = true + ) + + } + } + + test("get, put, remove etc operations on non-default col family should fail") { + tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), partition = 0, + minDeltasForSnapshot = 5)) { provider => + val store = provider.getStore(0) + val keyRow = dataToKeyRow("a", 0) + val valueRow = dataToValueRow(1) + val colFamilyName = "test" + verifyStoreOperationUnsupported("put") { + store.put(keyRow, valueRow, colFamilyName) + } + + verifyStoreOperationUnsupported("remove") { + store.remove(keyRow, colFamilyName) + } + + verifyStoreOperationUnsupported("get") { + store.get(keyRow, colFamilyName) + } + + verifyStoreOperationUnsupported("merge") { + store.merge(keyRow, valueRow, colFamilyName) + } + + verifyStoreOperationUnsupported("iterator") { + store.iterator(colFamilyName) + } + + verifyStoreOperationUnsupported("prefixScan") { + store.prefixScan(keyRow, colFamilyName) + } + } + } + test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), partition = 0, numOfVersToRetainInMemory = 1)) { provider => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 71462cb4b643..40e31239895c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -217,7 +217,7 @@ class ValueStateSuite extends SharedSparkSession ex, errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", parameters = Map( - "stateStoreProvider" -> "HDFSStateStoreProvider" + "stateStoreProvider" -> "HDFSBackedStateStoreProvider" ), matchPVals = true ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org