This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 00e63d63f9af [SPARK-46864][SS] Onboard Arbitrary StateV2 onto New Error Class Framework 00e63d63f9af is described below commit 00e63d63f9af6ef186e14159ddbe8bb8d1c8690b Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Fri Feb 2 05:38:15 2024 +0900 [SPARK-46864][SS] Onboard Arbitrary StateV2 onto New Error Class Framework ### What changes were proposed in this pull request? This PR proposes to apply error class framework to the new data source, State API V2. ### Why are the changes needed? Error class framework is a standard to represent all exceptions in Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Refactored unit tests to check that the right error class was being thrown in certain situations ### Was this patch authored or co-authored using generative AI tooling? No Closes #44883 from ericm-db/state-v2-error-class. Lead-authored-by: Eric Marnadi <eric.marn...@databricks.com> Co-authored-by: ericm-db <132308037+ericm...@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 29 +++++++++++ ...r-conditions-unsupported-feature-error-class.md | 4 ++ docs/sql-error-conditions.md | 24 ++++++++++ .../sql/execution/streaming/ValueStateImpl.scala | 5 +- .../state/HDFSBackedStateStoreProvider.scala | 3 +- .../streaming/state/StateStoreChangelog.scala | 16 +++---- .../streaming/state/StateStoreErrors.scala | 56 ++++++++++++++++++++++ .../streaming/state/MemoryStateStore.scala | 2 +- .../execution/streaming/state/RocksDBSuite.scala | 37 +++++++++++++- .../streaming/state/ValueStateSuite.scala | 43 +++++++++++++++-- 10 files changed, 199 insertions(+), 20 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 8e47490f5a61..baefb05a7070 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1656,6 +1656,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_TWS" : { + "message" : [ + "<message>" + ], + "sqlState" : "XX000" + }, "INTERVAL_ARITHMETIC_OVERFLOW" : { "message" : [ "<message>.<alternative>" @@ -3235,6 +3241,18 @@ ], "sqlState" : "0A000" }, + "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : { + "message" : [ + "Store does not support multiple values per key" + ], + "sqlState" : "42802" + }, + "STATE_STORE_UNSUPPORTED_OPERATION" : { + "message" : [ + "<operationType> operation not supported with <entity>" + ], + "sqlState" : "XXKST" + }, "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : { "message" : [ "Static partition column <staticName> is also specified in the column list." @@ -3388,6 +3406,12 @@ ], "sqlState" : "428EK" }, + "TWS_VALUE_SHOULD_NOT_BE_NULL" : { + "message" : [ + "New value should be non-null for <typeOfState>" + ], + "sqlState" : "22004" + }, "UDTF_ALIAS_NUMBER_MISMATCH" : { "message" : [ "The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF.", @@ -3921,6 +3945,11 @@ "<variableName> is a VARIABLE and cannot be updated using the SET statement. Use SET VARIABLE <variableName> = ... instead." ] }, + "STATE_STORE_MULTIPLE_COLUMN_FAMILIES" : { + "message" : [ + "Creating multiple column families with <stateStoreProvider> is not supported." + ] + }, "TABLE_OPERATION" : { "message" : [ "Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"." diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md b/docs/sql-error-conditions-unsupported-feature-error-class.md index d90d2b2a109f..1b12c4bfc1b3 100644 --- a/docs/sql-error-conditions-unsupported-feature-error-class.md +++ b/docs/sql-error-conditions-unsupported-feature-error-class.md @@ -190,6 +190,10 @@ set PROPERTIES and DBPROPERTIES at the same time. `<variableName>` is a VARIABLE and cannot be updated using the SET statement. Use SET VARIABLE `<variableName>` = ... instead. +## STATE_STORE_MULTIPLE_COLUMN_FAMILIES + +Creating multiple column families with `<stateStoreProvider>` is not supported. + ## TABLE_OPERATION Table `<tableName>` does not support `<operation>`. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog". diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index efd035d332a7..3a2c4d261352 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -998,6 +998,12 @@ For more details see [INTERNAL_ERROR_METADATA_CATALOG](sql-error-conditions-inte `<message>` +### INTERNAL_ERROR_TWS + +[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +`<message>` + ### INTERVAL_ARITHMETIC_OVERFLOW [SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception) @@ -2019,6 +2025,18 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists Star (*) is not allowed in a select list when GROUP BY an ordinal position is used. +### STATE_STORE_MULTIPLE_VALUES_PER_KEY + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Store does not support multiple values per key + +### STATE_STORE_UNSUPPORTED_OPERATION + +[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +`<operationType>` operation not supported with `<entity>` + ### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST [SQLSTATE: 42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) @@ -2157,6 +2175,12 @@ Choose a different name, drop or replace the existing view, or add the IF NOT E CREATE TEMPORARY VIEW or the corresponding Dataset APIs only accept single-part view names, but got: `<actualName>`. +### TWS_VALUE_SHOULD_NOT_BE_NULL + +[SQLSTATE: 22004](sql-error-conditions-sqlstates.html#class-22-data-exception) + +New value should be non-null for `<typeOfState>` + ### UDTF_ALIAS_NUMBER_MISMATCH [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index 5a1b6d01baa3..d82ce5ba1125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.streaming.state.StateStore +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.ValueState import org.apache.spark.sql.types._ @@ -47,8 +47,7 @@ class ValueStateImpl[S]( private def encodeKey(): UnsafeRow = { val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption if (!keyOption.isDefined) { - throw new UnsupportedOperationException("Implicit key not found for operation on" + - s"stateName=$stateName") + throw StateStoreErrors.implicitKeyNotFound(stateName) } val toRow = keyExprEnc.createSerializer() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index b895b975770a..842c4004820c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -256,8 +256,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with // TODO: add support for multiple col families with HDFSBackedStateStoreProvider if (useColumnFamilies) { - throw new UnsupportedOperationException("Multiple column families are not supported with " + - s"HDFSBackedStateStoreProvider") + throw StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider") } require((keySchema.length == 0 && numColsPrefixKey == 0) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index 9e5201123025..d4a1c3fc63c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -137,8 +137,8 @@ class StateStoreChangelogWriterV1( } override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = { - throw new UnsupportedOperationException("Operation not supported with state " + - "changelog writer v1") + throw StateStoreErrors.unsupportedOperationException( + operationName = "Put", entity = "changelog writer v1") } override def delete(key: Array[Byte]): Unit = { @@ -151,8 +151,8 @@ class StateStoreChangelogWriterV1( } override def delete(key: Array[Byte], colFamilyName: String): Unit = { - throw new UnsupportedOperationException("Operation not supported with state " + - "changelog writer v1") + throw StateStoreErrors.unsupportedOperationException( + operationName = "Delete", entity = "changelog writer v1") } override def commit(): Unit = { @@ -189,8 +189,8 @@ class StateStoreChangelogWriterV2( extends StateStoreChangelogWriter(fm, file, compressionCodec) { override def put(key: Array[Byte], value: Array[Byte]): Unit = { - throw new UnsupportedOperationException("Operation not supported with state " + - "changelog writer v2") + throw StateStoreErrors.unsupportedOperationException( + operationName = "Put", entity = "changelog writer v2") } override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = { @@ -206,8 +206,8 @@ class StateStoreChangelogWriterV2( } override def delete(key: Array[Byte]): Unit = { - throw new UnsupportedOperationException("Operation not supported with state " + - "changelog writer v2") + throw StateStoreErrors.unsupportedOperationException( + operationName = "Delete", entity = "changelog writer v2") } override def delete(key: Array[Byte], colFamilyName: String): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala new file mode 100644 index 000000000000..665dafc6f66a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} + +/** + * Object for grouping error messages from (most) exceptions thrown from State API V2 + * + * ERROR_CLASS has a prefix of "STATE_STORE_" to indicate where the error is from + */ +object StateStoreErrors { + def implicitKeyNotFound(stateName: String): SparkException = { + SparkException.internalError( + msg = s"Implicit key not found in state store for stateName=$stateName", + category = "TWS" + ) + } + + def multipleColumnFamiliesNotSupported(stateStoreProvider: String): + StateStoreMultipleColumnFamiliesNotSupportedException = { + new StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider) + } + + def unsupportedOperationException(operationName: String, entity: String): + StateStoreUnsupportedOperationException = { + new StateStoreUnsupportedOperationException(operationName, entity) + } +} + +class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String) + extends SparkUnsupportedOperationException( + errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", + messageParameters = Map("stateStoreProvider" -> stateStoreProvider) + ) + +class StateStoreUnsupportedOperationException(operationType: String, entity: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + messageParameters = Map("operationType" -> operationType, "entity" -> entity) + ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala index 691504b8099f..5229865122be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala @@ -30,7 +30,7 @@ class MemoryStateStore extends StateStore() { } override def createColFamilyIfAbsent(colFamilyName: String): Unit = { - throw new UnsupportedOperationException("Creating multiple column families is not supported") + throw StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider") } override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = map.get(key) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 99141abd2e3a..6a4ad10d9a7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -29,7 +29,7 @@ import org.rocksdb.CompressionType import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} @@ -689,6 +689,41 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("RocksDB: Unsupported Operations" + + " with Changelog Checkpointing") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + val changelogWriter = fileManager.getChangeLogWriter(1) + + val ex1 = intercept[SparkUnsupportedOperationException] { + changelogWriter.put("a", "1", "testColFamily") + } + + checkError( + ex1, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> "Put", + "entity" -> "changelog writer v1" + ), + matchPVals = true + ) + val ex2 = intercept[SparkUnsupportedOperationException] { + changelogWriter.delete("a", "testColFamily") + } + + checkError( + ex2, + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION", + parameters = Map( + "operationType" -> "Delete", + "entity" -> "changelog writer v1" + ), + matchPVals = true + ) + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 49a5fff131ae..c069046eed40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.hadoop.conf.Configuration import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl} @@ -91,14 +92,22 @@ class ValueStateSuite extends SharedSparkSession val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]) + val stateName = "testState" val testState: ValueState[Long] = handle.getValueState[Long]("testState") assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty) val ex = intercept[Exception] { testState.update(123) } - assert(ex.isInstanceOf[UnsupportedOperationException]) - assert(ex.getMessage.contains("Implicit key not found")) + assert(ex.isInstanceOf[SparkException]) + checkError( + ex.asInstanceOf[SparkException], + errorClass = "INTERNAL_ERROR_TWS", + parameters = Map( + "message" -> s"Implicit key not found in state store for stateName=$stateName" + ), + matchPVals = true + ) ImplicitGroupingKeyTracker.setImplicitKey("test_key") assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined) testState.update(123) @@ -110,9 +119,14 @@ class ValueStateSuite extends SharedSparkSession val ex1 = intercept[Exception] { testState.update(123) } - - assert(ex1.isInstanceOf[UnsupportedOperationException]) - assert(ex1.getMessage.contains("Implicit key not found")) + checkError( + ex.asInstanceOf[SparkException], + errorClass = "INTERNAL_ERROR_TWS", + parameters = Map( + "message" -> s"Implicit key not found in state store for stateName=$stateName" + ), + matchPVals = true + ) } } @@ -184,4 +198,23 @@ class ValueStateSuite extends SharedSparkSession assert(testState2.get() === null) } } + + test("colFamily with HDFSBackedStateStoreProvider should fail") { + val storeId = StateStoreId(newDir(), Random.nextInt(), 0) + val provider = new HDFSBackedStateStoreProvider() + val storeConf = new StateStoreConf(new SQLConf()) + val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] { + provider.init( + storeId, keySchema, valueSchema, 0, useColumnFamilies = true, + storeConf, new Configuration) + } + checkError( + ex, + errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", + parameters = Map( + "stateStoreProvider" -> "HDFSStateStoreProvider" + ), + matchPVals = true + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org