This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bdcb79f23da3 [SPARK-48543][SS] Track state row validation failures 
using explicit error class
bdcb79f23da3 is described below

commit bdcb79f23da3d09469910508426a54a78adcbda6
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Jun 13 16:47:49 2024 +0900

    [SPARK-48543][SS] Track state row validation failures using explicit error 
class
    
    ### What changes were proposed in this pull request?
    Track state row validation failures using explicit error class
    
    ### Why are the changes needed?
    We want to track these exceptions explicitly since they could be indicative 
of underlying corruptions/data loss scenarios.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    13:06:32.803 INFO org.apache.spark.util.ShutdownHookManager: Deleting 
directory 
/Users/anish.shrigondekar/spark/spark/target/tmp/spark-6d90d3f3-0f37-48b8-8506-a8cdee3d25d7
    [info] Run completed in 9 seconds, 861 milliseconds.
    [info] Total number of tests run: 4
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46885 from anishshri-db/task/SPARK-48543.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-conditions.json | 24 ++++++++++++++++++++++
 .../sql/execution/streaming/state/StateStore.scala | 18 ++++++----------
 .../streaming/state/StateStoreErrors.scala         | 21 ++++++++++++++++++-
 .../streaming/state/StateStoreSuite.scala          |  5 +++--
 ...reamingStateStoreFormatCompatibilitySuite.scala |  5 +++--
 5 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 36d8fe1daa37..35dfa7a6c349 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3735,6 +3735,18 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : {
+    "message" : [
+      "The streaming query failed to validate written state for key row.",
+      "The following reasons may cause this:",
+      "1. An old Spark version wrote the checkpoint that is incompatible with 
the current one",
+      "2. Corrupt checkpoint files",
+      "3. The query changed in an incompatible way between restarts",
+      "For the first case, use a new checkpoint directory or use the original 
Spark version",
+      "to process the streaming state. Retrieved error_message=<errorMsg>"
+    ],
+    "sqlState" : "XX000"
+  },
   "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : {
     "message" : [
       "Provided key schema does not match existing state key schema.",
@@ -3769,6 +3781,18 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : {
+    "message" : [
+      "The streaming query failed to validate written state for value row.",
+      "The following reasons may cause this:",
+      "1. An old Spark version wrote the checkpoint that is incompatible with 
the current one",
+      "2. Corrupt checkpoint files",
+      "3. The query changed in an incompatible way between restarts",
+      "For the first case, use a new checkpoint directory or use the original 
Spark version",
+      "to process the streaming state. Retrieved error_message=<errorMsg>"
+    ],
+    "sqlState" : "XX000"
+  },
   "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : {
     "message" : [
       "Provided value schema does not match existing state value schema.",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index b59fe65fb14a..2f9ce2c236f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
-/**
- * An exception thrown when an invalid UnsafeRow is detected in state store.
- */
-class InvalidUnsafeRowException(error: String)
-  extends RuntimeException("The streaming query failed by state format 
invalidation. " +
-    "The following reasons may cause this: 1. An old Spark version wrote the 
checkpoint that is " +
-    "incompatible with the current one; 2. Broken checkpoint files; 3. The 
query is changed " +
-    "among restart. For the first case, you can try to restart the application 
without " +
-    s"checkpoint or use the legacy Spark version to process the streaming 
state.\n$error", null)
-
 sealed trait KeyStateEncoderSpec
 
 case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends 
KeyStateEncoderSpec
@@ -434,12 +424,16 @@ object StateStoreProvider {
       conf: StateStoreConf): Unit = {
     if (conf.formatValidationEnabled) {
       val validationError = 
UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
-      validationError.foreach { error => throw new 
InvalidUnsafeRowException(error) }
+      validationError.foreach { error =>
+        throw StateStoreErrors.keyRowFormatValidationFailure(error)
+      }
 
       if (conf.formatValidationCheckValue) {
         val validationError =
           UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, 
valueSchema)
-        validationError.foreach { error => throw new 
InvalidUnsafeRowException(error) }
+        validationError.foreach { error =>
+          throw StateStoreErrors.valueRowFormatValidationFailure(error)
+        }
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 36be4d9f5bab..205e093e755d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkRuntimeException, 
SparkUnsupportedOperationException}
 
 /**
  * Object for grouping error messages from (most) exceptions thrown from State 
API V2
@@ -39,6 +39,16 @@ object StateStoreErrors {
     )
   }
 
+  def keyRowFormatValidationFailure(errorMsg: String):
+    StateStoreKeyRowFormatValidationFailure = {
+    new StateStoreKeyRowFormatValidationFailure(errorMsg)
+  }
+
+  def valueRowFormatValidationFailure(errorMsg: String):
+    StateStoreValueRowFormatValidationFailure = {
+    new StateStoreValueRowFormatValidationFailure(errorMsg)
+  }
+
   def unsupportedOperationOnMissingColumnFamily(operationName: String, 
colFamilyName: String):
     StateStoreUnsupportedOperationOnMissingColumnFamily = {
     new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, 
colFamilyName)
@@ -245,3 +255,12 @@ class StateStoreValueSchemaNotCompatible(
       "storedValueSchema" -> storedValueSchema,
       "newValueSchema" -> newValueSchema))
 
+class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
+  extends SparkRuntimeException(
+    errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",
+    messageParameters = Map("errorMsg" -> errorMsg))
+
+class StateStoreValueRowFormatValidationFailure(errorMsg: String)
+  extends SparkRuntimeException(
+    errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
+    messageParameters = Map("errorMsg" -> errorMsg))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6a6867fbb552..98b2030f1bac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
+import 
org.apache.spark.sql.execution.streaming.state.StateStoreValueRowFormatValidationFailure
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -1606,12 +1607,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     // By default, when there is an invalid pair of value row and value 
schema, it should throw
     val keyRow = dataToKeyRow("key", 1)
     val valueRow = dataToValueRow(2)
-    val e = intercept[InvalidUnsafeRowException] {
+    val e = intercept[StateStoreValueRowFormatValidationFailure] {
       // Here valueRow doesn't match with prefixKeySchema
       StateStoreProvider.validateStateRowFormat(
         keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
     }
-    assert(e.getMessage.contains("The streaming query failed by state format 
invalidation"))
+    assert(e.getMessage.contains("The streaming query failed to validate 
written state"))
 
     // When sqlConf.stateStoreFormatValidationEnabled is set to false and
     // StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
index 3cd6b397a8b8..8a9d4d42ef2b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
 import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.execution.streaming.state.InvalidUnsafeRowException
+import 
org.apache.spark.sql.execution.streaming.state.{StateStoreKeyRowFormatValidationFailure,
 StateStoreValueRowFormatValidationFailure}
 import org.apache.spark.sql.functions._
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.util.Utils
@@ -254,7 +254,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends 
StreamTest {
   private def findStateSchemaException(exc: Throwable): Boolean = {
     exc match {
       case _: SparkUnsupportedOperationException => true
-      case _: InvalidUnsafeRowException => true
+      case _: StateStoreKeyRowFormatValidationFailure => true
+      case _: StateStoreValueRowFormatValidationFailure => true
       case e1 if e1.getCause != null => findStateSchemaException(e1.getCause)
       case _ => false
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to