This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00e63d63f9af [SPARK-46864][SS] Onboard Arbitrary StateV2 onto New 
Error Class Framework
00e63d63f9af is described below

commit 00e63d63f9af6ef186e14159ddbe8bb8d1c8690b
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Fri Feb 2 05:38:15 2024 +0900

    [SPARK-46864][SS] Onboard Arbitrary StateV2 onto New Error Class Framework
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to apply error class framework to the new data source, 
State API V2.
    
    ### Why are the changes needed?
    
    Error class framework is a standard to represent all exceptions in Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Refactored unit tests to check that the right error class was being thrown 
in certain situations
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44883 from ericm-db/state-v2-error-class.
    
    Lead-authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Co-authored-by: ericm-db <132308037+ericm...@users.noreply.github.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    | 29 +++++++++++
 ...r-conditions-unsupported-feature-error-class.md |  4 ++
 docs/sql-error-conditions.md                       | 24 ++++++++++
 .../sql/execution/streaming/ValueStateImpl.scala   |  5 +-
 .../state/HDFSBackedStateStoreProvider.scala       |  3 +-
 .../streaming/state/StateStoreChangelog.scala      | 16 +++----
 .../streaming/state/StateStoreErrors.scala         | 56 ++++++++++++++++++++++
 .../streaming/state/MemoryStateStore.scala         |  2 +-
 .../execution/streaming/state/RocksDBSuite.scala   | 37 +++++++++++++-
 .../streaming/state/ValueStateSuite.scala          | 43 +++++++++++++++--
 10 files changed, 199 insertions(+), 20 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 8e47490f5a61..baefb05a7070 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1656,6 +1656,12 @@
     ],
     "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_TWS" : {
+    "message" : [
+      "<message>"
+    ],
+    "sqlState" : "XX000"
+  },
   "INTERVAL_ARITHMETIC_OVERFLOW" : {
     "message" : [
       "<message>.<alternative>"
@@ -3235,6 +3241,18 @@
     ],
     "sqlState" : "0A000"
   },
+  "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : {
+    "message" : [
+      "Store does not support multiple values per key"
+    ],
+    "sqlState" : "42802"
+  },
+  "STATE_STORE_UNSUPPORTED_OPERATION" : {
+    "message" : [
+      "<operationType> operation not supported with <entity>"
+    ],
+    "sqlState" : "XXKST"
+  },
   "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
     "message" : [
       "Static partition column <staticName> is also specified in the column 
list."
@@ -3388,6 +3406,12 @@
     ],
     "sqlState" : "428EK"
   },
+  "TWS_VALUE_SHOULD_NOT_BE_NULL" : {
+    "message" : [
+      "New value should be non-null for <typeOfState>"
+    ],
+    "sqlState" : "22004"
+  },
   "UDTF_ALIAS_NUMBER_MISMATCH" : {
     "message" : [
       "The number of aliases supplied in the AS clause does not match the 
number of columns output by the UDTF.",
@@ -3921,6 +3945,11 @@
           "<variableName> is a VARIABLE and cannot be updated using the SET 
statement. Use SET VARIABLE <variableName> = ... instead."
         ]
       },
+      "STATE_STORE_MULTIPLE_COLUMN_FAMILIES" : {
+        "message" : [
+          "Creating multiple column families with <stateStoreProvider> is not 
supported."
+        ]
+      },
       "TABLE_OPERATION" : {
         "message" : [
           "Table <tableName> does not support <operation>. Please check the 
current catalog and namespace to make sure the qualified table name is 
expected, and also check the catalog implementation which is configured by 
\"spark.sql.catalog\"."
diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md 
b/docs/sql-error-conditions-unsupported-feature-error-class.md
index d90d2b2a109f..1b12c4bfc1b3 100644
--- a/docs/sql-error-conditions-unsupported-feature-error-class.md
+++ b/docs/sql-error-conditions-unsupported-feature-error-class.md
@@ -190,6 +190,10 @@ set PROPERTIES and DBPROPERTIES at the same time.
 
 `<variableName>` is a VARIABLE and cannot be updated using the SET statement. 
Use SET VARIABLE `<variableName>` = ... instead.
 
+## STATE_STORE_MULTIPLE_COLUMN_FAMILIES
+
+Creating multiple column families with `<stateStoreProvider>` is not supported.
+
 ## TABLE_OPERATION
 
 Table `<tableName>` does not support `<operation>`. Please check the current 
catalog and namespace to make sure the qualified table name is expected, and 
also check the catalog implementation which is configured by 
"spark.sql.catalog".
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index efd035d332a7..3a2c4d261352 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -998,6 +998,12 @@ For more details see 
[INTERNAL_ERROR_METADATA_CATALOG](sql-error-conditions-inte
 
 `<message>`
 
+### INTERNAL_ERROR_TWS
+
+[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+`<message>`
+
 ### INTERVAL_ARITHMETIC_OVERFLOW
 
 [SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception)
@@ -2019,6 +2025,18 @@ The SQL config `<sqlConf>` cannot be found. Please 
verify that the config exists
 
 Star (*) is not allowed in a select list when GROUP BY an ordinal position is 
used.
 
+### STATE_STORE_MULTIPLE_VALUES_PER_KEY
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Store does not support multiple values per key
+
+### STATE_STORE_UNSUPPORTED_OPERATION
+
+[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+`<operationType>` operation not supported with `<entity>`
+
 ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST
 
 [SQLSTATE: 
42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -2157,6 +2175,12 @@ Choose a different name, drop or replace the existing 
view,  or add the IF NOT E
 
 CREATE TEMPORARY VIEW or the corresponding Dataset APIs only accept 
single-part view names, but got: `<actualName>`.
 
+### TWS_VALUE_SHOULD_NOT_BE_NULL
+
+[SQLSTATE: 22004](sql-error-conditions-sqlstates.html#class-22-data-exception)
+
+New value should be non-null for `<typeOfState>`
+
 ### UDTF_ALIAS_NUMBER_MISMATCH
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
index 5a1b6d01baa3..d82ce5ba1125 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors}
 import org.apache.spark.sql.streaming.ValueState
 import org.apache.spark.sql.types._
 
@@ -47,8 +47,7 @@ class ValueStateImpl[S](
   private def encodeKey(): UnsafeRow = {
     val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
     if (!keyOption.isDefined) {
-      throw new UnsupportedOperationException("Implicit key not found for 
operation on" +
-        s"stateName=$stateName")
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
     }
 
     val toRow = keyExprEnc.createSerializer()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index b895b975770a..842c4004820c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -256,8 +256,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
     // TODO: add support for multiple col families with 
HDFSBackedStateStoreProvider
     if (useColumnFamilies) {
-      throw new UnsupportedOperationException("Multiple column families are 
not supported with " +
-        s"HDFSBackedStateStoreProvider")
+      throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
     }
 
     require((keySchema.length == 0 && numColsPrefixKey == 0) ||
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 9e5201123025..d4a1c3fc63c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -137,8 +137,8 @@ class StateStoreChangelogWriterV1(
   }
 
   override def put(key: Array[Byte], value: Array[Byte], colFamilyName: 
String): Unit = {
-    throw new UnsupportedOperationException("Operation not supported with 
state " +
-      "changelog writer v1")
+    throw StateStoreErrors.unsupportedOperationException(
+      operationName = "Put", entity = "changelog writer v1")
   }
 
   override def delete(key: Array[Byte]): Unit = {
@@ -151,8 +151,8 @@ class StateStoreChangelogWriterV1(
   }
 
   override def delete(key: Array[Byte], colFamilyName: String): Unit = {
-    throw new UnsupportedOperationException("Operation not supported with 
state " +
-      "changelog writer v1")
+    throw StateStoreErrors.unsupportedOperationException(
+      operationName = "Delete", entity = "changelog writer v1")
   }
 
   override def commit(): Unit = {
@@ -189,8 +189,8 @@ class StateStoreChangelogWriterV2(
   extends StateStoreChangelogWriter(fm, file, compressionCodec) {
 
   override def put(key: Array[Byte], value: Array[Byte]): Unit = {
-    throw new UnsupportedOperationException("Operation not supported with 
state " +
-      "changelog writer v2")
+    throw StateStoreErrors.unsupportedOperationException(
+      operationName = "Put", entity = "changelog writer v2")
   }
 
   override def put(key: Array[Byte], value: Array[Byte], colFamilyName: 
String): Unit = {
@@ -206,8 +206,8 @@ class StateStoreChangelogWriterV2(
   }
 
   override def delete(key: Array[Byte]): Unit = {
-    throw new UnsupportedOperationException("Operation not supported with 
state " +
-      "changelog writer v2")
+    throw StateStoreErrors.unsupportedOperationException(
+      operationName = "Delete", entity = "changelog writer v2")
   }
 
   override def delete(key: Array[Byte], colFamilyName: String): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
new file mode 100644
index 000000000000..665dafc6f66a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown from State 
API V2
+ *
+ * ERROR_CLASS has a prefix of "STATE_STORE_" to indicate where the error is 
from
+ */
+object StateStoreErrors {
+  def implicitKeyNotFound(stateName: String): SparkException = {
+    SparkException.internalError(
+      msg = s"Implicit key not found in state store for stateName=$stateName",
+      category = "TWS"
+    )
+  }
+
+  def multipleColumnFamiliesNotSupported(stateStoreProvider: String):
+    StateStoreMultipleColumnFamiliesNotSupportedException = {
+      new 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
+    }
+
+  def unsupportedOperationException(operationName: String, entity: String):
+    StateStoreUnsupportedOperationException = {
+      new StateStoreUnsupportedOperationException(operationName, entity)
+    }
+}
+
+class 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: 
String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
+    messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
+  )
+
+class StateStoreUnsupportedOperationException(operationType: String, entity: 
String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+    messageParameters = Map("operationType" -> operationType, "entity" -> 
entity)
+  )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
index 691504b8099f..5229865122be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
@@ -30,7 +30,7 @@ class MemoryStateStore extends StateStore() {
   }
 
   override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-    throw new UnsupportedOperationException("Creating multiple column families 
is not supported")
+    throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider")
   }
 
   override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = 
map.get(key)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 99141abd2e3a..6a4ad10d9a7f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -29,7 +29,7 @@ import org.rocksdb.CompressionType
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, 
FileSystemBasedCheckpointFileManager}
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream,
 RenameBasedFSDataOutputStream}
@@ -689,6 +689,41 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("RocksDB: Unsupported Operations" +
+    " with Changelog Checkpointing") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1)
+
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      changelogWriter.put("a", "1", "testColFamily")
+    }
+
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+      parameters = Map(
+        "operationType" -> "Put",
+        "entity" -> "changelog writer v1"
+      ),
+      matchPVals = true
+    )
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      changelogWriter.delete("a", "testColFamily")
+    }
+
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+      parameters = Map(
+        "operationType" -> "Delete",
+        "entity" -> "changelog writer v1"
+      ),
+      matchPVals = true
+    )
+  }
+
   testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write 
changelog") {
     val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
     val fileManager = new RocksDBFileManager(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 49a5fff131ae..c069046eed40 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl}
@@ -91,14 +92,22 @@ class ValueStateSuite extends SharedSparkSession
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
 
+      val stateName = "testState"
       val testState: ValueState[Long] = handle.getValueState[Long]("testState")
       assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty)
       val ex = intercept[Exception] {
         testState.update(123)
       }
 
-      assert(ex.isInstanceOf[UnsupportedOperationException])
-      assert(ex.getMessage.contains("Implicit key not found"))
+      assert(ex.isInstanceOf[SparkException])
+      checkError(
+        ex.asInstanceOf[SparkException],
+        errorClass = "INTERNAL_ERROR_TWS",
+        parameters = Map(
+          "message" -> s"Implicit key not found in state store for 
stateName=$stateName"
+        ),
+        matchPVals = true
+      )
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
       assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
       testState.update(123)
@@ -110,9 +119,14 @@ class ValueStateSuite extends SharedSparkSession
       val ex1 = intercept[Exception] {
         testState.update(123)
       }
-
-      assert(ex1.isInstanceOf[UnsupportedOperationException])
-      assert(ex1.getMessage.contains("Implicit key not found"))
+      checkError(
+        ex.asInstanceOf[SparkException],
+        errorClass = "INTERNAL_ERROR_TWS",
+        parameters = Map(
+          "message" -> s"Implicit key not found in state store for 
stateName=$stateName"
+        ),
+        matchPVals = true
+      )
     }
   }
 
@@ -184,4 +198,23 @@ class ValueStateSuite extends SharedSparkSession
       assert(testState2.get() === null)
     }
   }
+
+  test("colFamily with HDFSBackedStateStoreProvider should fail") {
+    val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+    val provider = new HDFSBackedStateStoreProvider()
+    val storeConf = new StateStoreConf(new SQLConf())
+    val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] {
+      provider.init(
+        storeId, keySchema, valueSchema, 0, useColumnFamilies = true,
+        storeConf, new Configuration)
+    }
+    checkError(
+      ex,
+      errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
+      parameters = Map(
+        "stateStoreProvider" -> "HDFSStateStoreProvider"
+      ),
+      matchPVals = true
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to