This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e778ce689dcb [SPARK-47250][SS] Add additional validations and NERF 
changes for RocksDB state provider and use of column families
e778ce689dcb is described below

commit e778ce689dcbe5e75ce5781a03cf9d8466910cd2
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Tue Mar 12 14:27:50 2024 +0900

    [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB 
state provider and use of column families
    
    ### What changes were proposed in this pull request?
    Add additional validations and NERF changes for RocksDB state provider and 
use of col families
    
    ### Why are the changes needed?
    Improve error handling and migrating errors to NERF.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added new unit tests
    StateStoreSuite
    ```
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: shuffle-boss-36-1 
(daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-33-1 (daemon=true), 
ForkJoinPool.commonPool-worker-2 (daemon=true) =====
    [info] Run completed in 2 minutes, 57 seconds.
    [info] Total number of tests run: 151
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 151, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    RocksDBSuite
    ```
    [info] Run completed in 4 minutes, 54 seconds.
    [info] Total number of tests run: 188
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 188, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45360 from anishshri-db/task/SPARK-47250.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  10 +-
 docs/sql-error-conditions.md                       |  10 +-
 .../state/HDFSBackedStateStoreProvider.scala       |  33 ++++--
 .../sql/execution/streaming/state/RocksDB.scala    |  92 +++++++++++----
 .../state/RocksDBStateStoreProvider.scala          |   4 +-
 .../streaming/state/StateStoreErrors.scala         |  49 ++++----
 .../execution/streaming/state/RocksDBSuite.scala   | 124 +++++++++++++++++++--
 .../streaming/state/StateStoreSuite.scala          |  63 +++++++++++
 .../streaming/state/ValueStateSuite.scala          |   2 +-
 9 files changed, 318 insertions(+), 69 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 3d130fdce301..99fbc585f981 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3371,9 +3371,9 @@
     ],
     "sqlState" : "0A000"
   },
-  "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : {
+  "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME" : {
     "message" : [
-      "Failed to remove default column family with reserved 
name=<colFamilyName>."
+      "Failed to perform column family operation=<operationName> with invalid 
name=<colFamilyName>. Column family name cannot be empty or include 
leading/trailing spaces or use the reserved keyword=default"
     ],
     "sqlState" : "42802"
   },
@@ -3396,6 +3396,12 @@
     ],
     "sqlState" : "XXKST"
   },
+  "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : {
+    "message" : [
+      "State store operation=<operationType> not supported on missing column 
family=<colFamilyName>."
+    ],
+    "sqlState" : "42802"
+  },
   "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
     "message" : [
       "Static partition column <staticName> is also specified in the column 
list."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 2cddb6a94c14..b6b159f277c0 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2105,11 +2105,11 @@ The SQL config `<sqlConf>` cannot be found. Please 
verify that the config exists
 
 Star (*) is not allowed in a select list when GROUP BY an ordinal position is 
used.
 
-### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY
+### STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
 
-Failed to remove default column family with reserved name=`<colFamilyName>`.
+Failed to perform column family operation=`<operationName>` with invalid 
name=`<colFamilyName>`. Column family name cannot be empty or include 
leading/trailing spaces or use the reserved keyword=default
 
 ### STATE_STORE_HANDLE_NOT_INITIALIZED
 
@@ -2130,6 +2130,12 @@ Store does not support multiple values per key
 
 `<operationType>` operation not supported with `<entity>`
 
+### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+State store operation=`<operationType>` not supported on missing column 
family=`<colFamilyName>`.
+
 ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST
 
 [SQLSTATE: 
42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 01e2e7f26083..edb95615d588 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -73,6 +73,8 @@ import org.apache.spark.util.ArrayImplicits._
  */
 private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider 
with Logging {
 
+  private val providerName = "HDFSBackedStateStoreProvider"
+
   class HDFSBackedReadStateStore(val version: Long, map: 
HDFSBackedStateStoreMap)
     extends ReadStateStore {
 
@@ -124,14 +126,25 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         numColsPrefixKey: Int,
         valueSchema: StructType,
         useMultipleValuesPerKey: Boolean = false): Unit = {
-      throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
+      throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
+    }
+
+    // Multiple col families are not supported with 
HDFSBackedStateStoreProvider. Throw an exception
+    // if the user tries to use a non-default col family.
+    private def assertUseOfDefaultColFamily(colFamilyName: String): Unit = {
+      if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+
+        throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
+      }
     }
 
     override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
+      assertUseOfDefaultColFamily(colFamilyName)
       mapToUpdate.get(key)
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): 
Unit = {
+      assertUseOfDefaultColFamily(colFamilyName)
       require(value != null, "Cannot put a null value")
       verify(state == UPDATING, "Cannot put after already committed or 
aborted")
       val keyCopy = key.copy()
@@ -141,6 +154,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
 
     override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
+      assertUseOfDefaultColFamily(colFamilyName)
       verify(state == UPDATING, "Cannot remove after already committed or 
aborted")
       val prevValue = mapToUpdate.remove(key)
       if (prevValue != null) {
@@ -179,10 +193,14 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
      * Get an iterator of all the store data.
      * This can be called only after committing all the updates made in the 
current thread.
      */
-    override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = 
mapToUpdate.iterator()
+    override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+      assertUseOfDefaultColFamily(colFamilyName)
+      mapToUpdate.iterator()
+    }
 
     override def prefixScan(prefixKey: UnsafeRow, colFamilyName: String):
       Iterator[UnsafeRowPair] = {
+      assertUseOfDefaultColFamily(colFamilyName)
       mapToUpdate.prefixScan(prefixKey)
     }
 
@@ -211,18 +229,17 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
 
     override def removeColFamilyIfExists(colFamilyName: String): Unit = {
-      throw StateStoreErrors.removingColumnFamiliesNotSupported(
-        "HDFSBackedStateStoreProvider")
+      throw StateStoreErrors.removingColumnFamiliesNotSupported(providerName)
     }
 
     override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
-      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
"HDFSStateStore")
+      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
providerName)
     }
 
     override def merge(key: UnsafeRow,
         value: UnsafeRow,
         colFamilyName: String): Unit = {
-      throw StateStoreErrors.unsupportedOperationException("merge", 
"HDFSStateStore")
+      throw StateStoreErrors.unsupportedOperationException("merge", 
providerName)
     }
   }
 
@@ -280,11 +297,11 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
     // TODO: add support for multiple col families with 
HDFSBackedStateStoreProvider
     if (useColumnFamilies) {
-      throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
+      throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
     }
 
     if (useMultipleValuesPerKey) {
-      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
"HDFSStateStore")
+      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
providerName)
     }
 
     require((keySchema.length == 0 && numColsPrefixKey == 0) ||
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 41cab78df195..4437cc5583d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
 import org.rocksdb.CompressionType._
 import org.rocksdb.TickerType._
 
-import org.apache.spark.{SparkUnsupportedOperationException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -242,26 +242,75 @@ class RocksDB(
     loadedVersion = endVersion
   }
 
+  /**
+   * Function to check if the column family exists in the state store instance.
+   * @param colFamilyName - name of the column family
+   * @return - true if the column family exists, false otherwise
+   */
   private def checkColFamilyExists(colFamilyName: String): Boolean = {
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private val multColFamiliesDisabledStr = "multiple column families disabled 
in " +
+    "RocksDBStateStoreProvider"
+
+  /**
+   * Function to verify invariants for column family based operations such as 
get, put, remove etc.
+   * @param operationName - name of the store operation
+   * @param colFamilyName - name of the column family
+   */
+  private def verifyColFamilyOperations(
+      operationName: String,
+      colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      // if the state store instance does not support multiple column 
families, throw an exception
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          multColFamiliesDisabledStr)
+      }
+
+      // if the column family name is empty or contains leading/trailing 
whitespaces, throw an
+      // exception
+      if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
+        throw 
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, 
colFamilyName)
+      }
+
+      // if the column family does not exist, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw 
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+          colFamilyName)
+      }
     }
   }
 
   /**
-   * Create RocksDB column family, if not created already
+   * Function to verify invariants for column family creation or deletion 
operations.
+   * @param operationName - name of the store operation
+   * @param colFamilyName - name of the column family
    */
-  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw new SparkUnsupportedOperationException(
-        errorClass = "_LEGACY_ERROR_TEMP_3197",
-        messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+  private def verifyColFamilyCreationOrDeletion(
+      operationName: String,
+      colFamilyName: String): Unit = {
+    // if the state store instance does not support multiple column families, 
throw an exception
+    if (!useColumnFamilies) {
+      throw StateStoreErrors.unsupportedOperationException(operationName,
+        multColFamiliesDisabledStr)
+    }
+
+    // if the column family name is empty or contains leading/trailing 
whitespaces
+    // or using the reserved "default" column family, throw an exception
+    if (colFamilyName.isEmpty
+      || colFamilyName.trim != colFamilyName
+      || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+      throw 
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, 
colFamilyName)
     }
+  }
 
+  /**
+   * Create RocksDB column family, if not created already
+   */
+  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+    verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName)
     if (!checkColFamilyExists(colFamilyName)) {
       assert(db != null)
       val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, 
columnFamilyOptions)
@@ -274,10 +323,7 @@ class RocksDB(
    * Remove RocksDB column family, if exists
    */
   def removeColFamilyIfExists(colFamilyName: String): Unit = {
-    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw StateStoreErrors.cannotRemoveDefaultColumnFamily(colFamilyName)
-    }
-
+    verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
     if (checkColFamilyExists(colFamilyName)) {
       assert(db != null)
       val handle = colFamilyNameToHandleMap(colFamilyName)
@@ -293,7 +339,7 @@ class RocksDB(
   def get(
       key: Array[Byte],
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] 
= {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("get", colFamilyName)
     db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
   }
 
@@ -305,7 +351,7 @@ class RocksDB(
       key: Array[Byte],
       value: Array[Byte],
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("put", colFamilyName)
     if (conf.trackTotalNumberOfRows) {
       val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
       if (oldValue == null) {
@@ -337,10 +383,10 @@ class RocksDB(
       value: Array[Byte],
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
     if (!useColumnFamilies) {
-      throw new RuntimeException("Merge operation uses changelog checkpointing 
v2 which" +
-        " requires column families to be enabled.")
+      throw StateStoreErrors.unsupportedOperationException("merge",
+        multColFamiliesDisabledStr)
     }
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("merge", colFamilyName)
 
     if (conf.trackTotalNumberOfRows) {
       val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
@@ -360,7 +406,7 @@ class RocksDB(
   def remove(
       key: Array[Byte],
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("remove", colFamilyName)
     if (conf.trackTotalNumberOfRows) {
       val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, 
key)
       if (value != null) {
@@ -380,7 +426,7 @@ class RocksDB(
    */
   def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
     Iterator[ByteArrayPair] = {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("iterator", colFamilyName)
 
     val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
     logInfo(s"Getting iterator from version $loadedVersion")
@@ -409,7 +455,7 @@ class RocksDB(
   }
 
   private def countKeys(colFamilyName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME): Long = {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("countKeys", colFamilyName)
     val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
 
     try {
@@ -431,7 +477,7 @@ class RocksDB(
 
   def prefixScan(prefix: Array[Byte], colFamilyName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME):
     Iterator[ByteArrayPair] = {
-    verifyColFamilyExists(colFamilyName)
+    verifyColFamilyOperations("prefixScan", colFamilyName)
     val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
     iter.seek(prefix)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 7374abdbde98..721d8aa03079 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -101,14 +101,14 @@ private[sql] class RocksDBStateStoreProvider
 
     override def merge(key: UnsafeRow, value: UnsafeRow,
         colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-      verify(state == UPDATING, "Cannot put after already committed or 
aborted")
+      verify(state == UPDATING, "Cannot merge after already committed or 
aborted")
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val keyEncoder = kvEncoder._1
       val valueEncoder = kvEncoder._2
       verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation 
requires an encoder" +
         " which supports multiple values for a single key")
       verify(key != null, "Key cannot be null")
-      require(value != null, "Cannot put a null value")
+      require(value != null, "Cannot merge a null value")
       rocksDB.merge(keyEncoder.encodeKey(key), 
valueEncoder.encodeValue(value), colFamilyName)
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6f4c3d4c9675..8a0276557f8f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -32,25 +32,30 @@ object StateStoreErrors {
     )
   }
 
+  def unsupportedOperationOnMissingColumnFamily(operationName: String, 
colFamilyName: String):
+    StateStoreUnsupportedOperationOnMissingColumnFamily = {
+    new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, 
colFamilyName)
+  }
+
   def multipleColumnFamiliesNotSupported(stateStoreProvider: String):
     StateStoreMultipleColumnFamiliesNotSupportedException = {
-      new 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
-    }
+    new 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
+  }
 
   def removingColumnFamiliesNotSupported(stateStoreProvider: String):
     StateStoreRemovingColumnFamiliesNotSupportedException = {
-        new 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
-    }
+    new 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
+  }
 
-  def cannotRemoveDefaultColumnFamily(colFamilyName: String):
-    StateStoreCannotRemoveDefaultColumnFamily = {
-        new StateStoreCannotRemoveDefaultColumnFamily(colFamilyName)
-    }
+  def cannotUseColumnFamilyWithInvalidName(operationName: String, 
colFamilyName: String):
+    StateStoreCannotUseColumnFamilyWithInvalidName = {
+      new StateStoreCannotUseColumnFamilyWithInvalidName(operationName, 
colFamilyName)
+  }
 
   def unsupportedOperationException(operationName: String, entity: String):
     StateStoreUnsupportedOperationException = {
-      new StateStoreUnsupportedOperationException(operationName, entity)
-    }
+    new StateStoreUnsupportedOperationException(operationName, entity)
+  }
 
   def requireNonNullStateValue(value: Any, stateName: String): Unit = {
     SparkException.require(value != null,
@@ -68,23 +73,25 @@ object StateStoreErrors {
 class 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: 
String)
   extends SparkUnsupportedOperationException(
     errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
-    messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
-  )
+    messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
+
 class 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider: 
String)
   extends SparkUnsupportedOperationException(
     errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
-    messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
-  )
+    messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
 
-class StateStoreCannotRemoveDefaultColumnFamily(colFamilyName: String)
+class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String, 
colFamilyName: String)
   extends SparkUnsupportedOperationException(
-    errorClass = "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY",
-    messageParameters = Map("colFamilyName" -> colFamilyName)
-  )
-
+    errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
+    messageParameters = Map("operationName" -> operationName, "colFamilyName" 
-> colFamilyName))
 
 class StateStoreUnsupportedOperationException(operationType: String, entity: 
String)
   extends SparkUnsupportedOperationException(
     errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
-    messageParameters = Map("operationType" -> operationType, "entity" -> 
entity)
-  )
+    messageParameters = Map("operationType" -> operationType, "entity" -> 
entity))
+
+class StateStoreUnsupportedOperationOnMissingColumnFamily(
+    operationType: String,
+    colFamilyName: String) extends SparkUnsupportedOperationException(
+  errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+  messageParameters = Map("operationType" -> operationType, "colFamilyName" -> 
colFamilyName))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 0bc3828318da..a7d4ab362340 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -536,6 +536,110 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+      Seq("default", "", " ", "    ", " default", " default ").foreach { 
colFamilyName =>
+        val ex = intercept[SparkUnsupportedOperationException] {
+          db.createColFamilyIfAbsent(colFamilyName)
+        }
+
+        if (!colFamiliesEnabled) {
+          checkError(
+            ex,
+            errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+            parameters = Map(
+              "operationType" -> "create_col_family",
+              "entity" -> "multiple column families disabled in 
RocksDBStateStoreProvider"
+            ),
+            matchPVals = true
+          )
+        } else {
+          checkError(
+            ex,
+            errorClass = 
"STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
+            parameters = Map(
+              "operationName" -> "create_col_family",
+              "colFamilyName" -> colFamilyName
+            ),
+            matchPVals = true
+          )
+        }
+      }
+    }
+  }
+
+  private def verifyStoreOperationUnsupported(
+      operationName: String,
+      colFamiliesEnabled: Boolean,
+      colFamilyName: String)
+      (testFn: => Unit): Unit = {
+    val ex = intercept[SparkUnsupportedOperationException] {
+      testFn
+    }
+
+    if (!colFamiliesEnabled) {
+      checkError(
+        ex,
+        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+        parameters = Map(
+          "operationType" -> operationName,
+          "entity" -> "multiple column families disabled in 
RocksDBStateStoreProvider"
+        ),
+        matchPVals = true
+      )
+    } else {
+      checkError(
+        ex,
+        errorClass = 
"STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+        parameters = Map(
+          "operationType" -> operationName,
+          "colFamilyName" -> colFamilyName
+        ),
+        matchPVals = true
+      )
+    }
+  }
+
+  testWithColumnFamilies(s"RocksDB: operations on absent column family",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+      db.load(0)
+      val colFamilyName = "test"
+      verifyStoreOperationUnsupported("put", colFamiliesEnabled, 
colFamilyName) {
+        db.put("a", "1", colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("remove", colFamiliesEnabled, 
colFamilyName) {
+        db.remove("a", colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("get", colFamiliesEnabled, 
colFamilyName) {
+        db.get("a", colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("iterator", colFamiliesEnabled, 
colFamilyName) {
+        db.iterator(colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("merge", colFamiliesEnabled, 
colFamilyName) {
+        db.merge("a", "1", colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("prefixScan", colFamiliesEnabled, 
colFamilyName) {
+        db.prefixScan("a", colFamilyName)
+      }
+    }
+  }
+
   testWithColumnFamilies(s"RocksDB: get, put, iterator, commit, load " +
     s"with multiple column families",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
@@ -545,13 +649,6 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     val colFamily2: String = "xyz"
 
     val conf = RocksDBConf().copy()
-    withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
-      val ex = intercept[Exception] {
-        db.createColFamilyIfAbsent("default")
-      }
-      ex.getCause.isInstanceOf[UnsupportedOperationException]
-    }
-
     withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
       db.createColFamilyIfAbsent(colFamily1)
       db.createColFamilyIfAbsent(colFamily2)
@@ -572,7 +669,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
 
     withDB(remoteDir, conf = conf, version = 0, useColumnFamilies = true) { db 
=>
-      val ex = intercept[Exception] {
+      val ex = intercept[SparkUnsupportedOperationException] {
         // version 0 can be loaded again
         assert(toStr(db.get("a", colFamily1)) === null)
         assert(iterator(db, colFamily1).isEmpty)
@@ -581,8 +678,15 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         assert(toStr(db.get("a", colFamily2)) === null)
         assert(iterator(db, colFamily2).isEmpty)
       }
-      assert(ex.isInstanceOf[RuntimeException])
-      assert(ex.getMessage.contains("does not exist"))
+      checkError(
+        ex,
+        errorClass = 
"STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+        parameters = Map(
+          "operationType" -> "get",
+          "colFamilyName" -> colFamily1
+        ),
+        matchPVals = true
+      )
     }
 
     withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = true) { db 
=>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index a8c7fc05f21e..64b3e75ea976 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -134,6 +134,69 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  private def verifyStoreOperationUnsupported(operationName: String)(testFn: 
=> Unit): Unit = {
+    if (operationName != "merge") {
+      val ex = intercept[SparkUnsupportedOperationException] {
+        testFn
+      }
+      checkError(
+        ex,
+        errorClass = 
"UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
+        parameters = Map(
+          "stateStoreProvider" -> "HDFSBackedStateStoreProvider"
+        ),
+        matchPVals = true
+      )
+    } else {
+      val ex = intercept[SparkUnsupportedOperationException] {
+        testFn
+      }
+      checkError(
+        ex,
+        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+        parameters = Map(
+          "operationType" -> operationName,
+          "entity" -> "HDFSBackedStateStoreProvider"
+        ),
+        matchPVals = true
+      )
+
+    }
+  }
+
+  test("get, put, remove etc operations on non-default col family should 
fail") {
+    tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), 
partition = 0,
+      minDeltasForSnapshot = 5)) { provider =>
+      val store = provider.getStore(0)
+      val keyRow = dataToKeyRow("a", 0)
+      val valueRow = dataToValueRow(1)
+      val colFamilyName = "test"
+      verifyStoreOperationUnsupported("put") {
+        store.put(keyRow, valueRow, colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("remove") {
+        store.remove(keyRow, colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("get") {
+        store.get(keyRow, colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("merge") {
+        store.merge(keyRow, valueRow, colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("iterator") {
+        store.iterator(colFamilyName)
+      }
+
+      verifyStoreOperationUnsupported("prefixScan") {
+        store.prefixScan(keyRow, colFamilyName)
+      }
+    }
+  }
+
   test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 
1") {
     tryWithProviderResource(newStoreProvider(opId = Random.nextInt(), 
partition = 0,
       numOfVersToRetainInMemory = 1)) { provider =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 71462cb4b643..40e31239895c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -217,7 +217,7 @@ class ValueStateSuite extends SharedSparkSession
       ex,
       errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
       parameters = Map(
-        "stateStoreProvider" -> "HDFSStateStoreProvider"
+        "stateStoreProvider" -> "HDFSBackedStateStoreProvider"
       ),
       matchPVals = true
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to