This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bdcb79f23da3 [SPARK-48543][SS] Track state row validation failures using explicit error class bdcb79f23da3 is described below commit bdcb79f23da3d09469910508426a54a78adcbda6 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu Jun 13 16:47:49 2024 +0900 [SPARK-48543][SS] Track state row validation failures using explicit error class ### What changes were proposed in this pull request? Track state row validation failures using explicit error class ### Why are the changes needed? We want to track these exceptions explicitly since they could be indicative of underlying corruptions/data loss scenarios. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` 13:06:32.803 INFO org.apache.spark.util.ShutdownHookManager: Deleting directory /Users/anish.shrigondekar/spark/spark/target/tmp/spark-6d90d3f3-0f37-48b8-8506-a8cdee3d25d7 [info] Run completed in 9 seconds, 861 milliseconds. [info] Total number of tests run: 4 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46885 from anishshri-db/task/SPARK-48543. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-conditions.json | 24 ++++++++++++++++++++++ .../sql/execution/streaming/state/StateStore.scala | 18 ++++++---------- .../streaming/state/StateStoreErrors.scala | 21 ++++++++++++++++++- .../streaming/state/StateStoreSuite.scala | 5 +++-- ...reamingStateStoreFormatCompatibilitySuite.scala | 5 +++-- 5 files changed, 56 insertions(+), 17 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 36d8fe1daa37..35dfa7a6c349 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3735,6 +3735,18 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ + "The streaming query failed to validate written state for key row.", + "The following reasons may cause this:", + "1. An old Spark version wrote the checkpoint that is incompatible with the current one", + "2. Corrupt checkpoint files", + "3. The query changed in an incompatible way between restarts", + "For the first case, use a new checkpoint directory or use the original Spark version", + "to process the streaming state. Retrieved error_message=<errorMsg>" + ], + "sqlState" : "XX000" + }, "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided key schema does not match existing state key schema.", @@ -3769,6 +3781,18 @@ ], "sqlState" : "42802" }, + "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ + "The streaming query failed to validate written state for value row.", + "The following reasons may cause this:", + "1. An old Spark version wrote the checkpoint that is incompatible with the current one", + "2. Corrupt checkpoint files", + "3. The query changed in an incompatible way between restarts", + "For the first case, use a new checkpoint directory or use the original Spark version", + "to process the streaming state. Retrieved error_message=<errorMsg>" + ], + "sqlState" : "XX000" + }, "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided value schema does not match existing state value schema.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index b59fe65fb14a..2f9ce2c236f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat SQLMetrics.createTimingMetric(sparkContext, desc) } -/** - * An exception thrown when an invalid UnsafeRow is detected in state store. - */ -class InvalidUnsafeRowException(error: String) - extends RuntimeException("The streaming query failed by state format invalidation. " + - "The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " + - "incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " + - "among restart. For the first case, you can try to restart the application without " + - s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null) - sealed trait KeyStateEncoderSpec case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec @@ -434,12 +424,16 @@ object StateStoreProvider { conf: StateStoreConf): Unit = { if (conf.formatValidationEnabled) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema) - validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + validationError.foreach { error => + throw StateStoreErrors.keyRowFormatValidationFailure(error) + } if (conf.formatValidationCheckValue) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema) - validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + validationError.foreach { error => + throw StateStoreErrors.valueRowFormatValidationFailure(error) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 36be4d9f5bab..205e093e755d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import org.apache.spark.{SparkException, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException} /** * Object for grouping error messages from (most) exceptions thrown from State API V2 @@ -39,6 +39,16 @@ object StateStoreErrors { ) } + def keyRowFormatValidationFailure(errorMsg: String): + StateStoreKeyRowFormatValidationFailure = { + new StateStoreKeyRowFormatValidationFailure(errorMsg) + } + + def valueRowFormatValidationFailure(errorMsg: String): + StateStoreValueRowFormatValidationFailure = { + new StateStoreValueRowFormatValidationFailure(errorMsg) + } + def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String): StateStoreUnsupportedOperationOnMissingColumnFamily = { new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, colFamilyName) @@ -245,3 +255,12 @@ class StateStoreValueSchemaNotCompatible( "storedValueSchema" -> storedValueSchema, "newValueSchema" -> newValueSchema)) +class StateStoreKeyRowFormatValidationFailure(errorMsg: String) + extends SparkRuntimeException( + errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg)) + +class StateStoreValueRowFormatValidationFailure(errorMsg: String) + extends SparkRuntimeException( + errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 6a6867fbb552..98b2030f1bac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef +import org.apache.spark.sql.execution.streaming.state.StateStoreValueRowFormatValidationFailure import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1606,12 +1607,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // By default, when there is an invalid pair of value row and value schema, it should throw val keyRow = dataToKeyRow("key", 1) val valueRow = dataToValueRow(2) - val e = intercept[InvalidUnsafeRowException] { + val e = intercept[StateStoreValueRowFormatValidationFailure] { // Here valueRow doesn't match with prefixKeySchema StateStoreProvider.validateStateRowFormat( keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf()) } - assert(e.getMessage.contains("The streaming query failed by state format invalidation")) + assert(e.getMessage.contains("The streaming query failed to validate written state")) // When sqlConf.stateStoreFormatValidationEnabled is set to false and // StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala index 3cd6b397a8b8..8a9d4d42ef2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.InvalidUnsafeRowException +import org.apache.spark.sql.execution.streaming.state.{StateStoreKeyRowFormatValidationFailure, StateStoreValueRowFormatValidationFailure} import org.apache.spark.sql.functions._ import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @@ -254,7 +254,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest { private def findStateSchemaException(exc: Throwable): Boolean = { exc match { case _: SparkUnsupportedOperationException => true - case _: InvalidUnsafeRowException => true + case _: StateStoreKeyRowFormatValidationFailure => true + case _: StateStoreValueRowFormatValidationFailure => true case e1 if e1.getCause != null => findStateSchemaException(e1.getCause) case _ => false } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org