This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 59777222e72 [SPARK-45888][SS] Apply error class framework to State 
(Metadata) Data Source
59777222e72 is described below

commit 59777222e726c63cbd9077a2c76f762e06f6a5b3
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Dec 6 22:38:40 2023 +0900

    [SPARK-45888][SS] Apply error class framework to State (Metadata) Data 
Source
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to apply error class framework to the new data source, 
State (Metadata) Data Source.
    
    ### Why are the changes needed?
    
    Error class framework is a standard to represent all exceptions in Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44025 from HeartSaVioR/SPARK-45888.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 common/utils/src/main/resources/error/README.md    |   1 +
 .../src/main/resources/error/error-classes.json    |  75 ++++++++++
 ...itions-stds-invalid-option-value-error-class.md |  40 ++++++
 docs/sql-error-conditions.md                       |  60 ++++++++
 .../datasources/v2/state/StateDataSource.scala     |  33 +++--
 .../v2/state/StateDataSourceErrors.scala           | 160 +++++++++++++++++++++
 .../datasources/v2/state/StateScanBuilder.scala    |   3 +-
 .../datasources/v2/state/StateTable.scala          |   9 +-
 .../StreamStreamJoinStatePartitionReader.scala     |   2 +-
 .../v2/state/metadata/StateMetadataSource.scala    |   4 +-
 .../v2/state/StateDataSourceReadSuite.scala        |  33 +++--
 .../state/OperatorStateMetadataSuite.scala         |   6 +-
 12 files changed, 389 insertions(+), 37 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index 556a634e992..b062c773907 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -636,6 +636,7 @@ The following SQLSTATEs are collated from:
 |42613    |42   |Syntax Error or Access Rule Violation             |613     
|Clauses are mutually exclusive.                             |DB2            |N 
      |DB2                                                                      
   |
 |42614    |42   |Syntax Error or Access Rule Violation             |614     |A 
duplicate keyword or clause is invalid.                   |DB2            |N    
   |DB2                                                                         
|
 |42615    |42   |Syntax Error or Access Rule Violation             |615     
|An invalid alternative was detected.                        |DB2            |N 
      |DB2                                                                      
   |
+|42616    |42   |Syntax Error or Access Rule Violation             |616     
|Invalid options specified                                   |DB2            |N 
      |DB2                                                                      
   |
 |42617    |42   |Syntax Error or Access Rule Violation             |617     
|The statement string is blank or empty.                     |DB2            |N 
      |DB2                                                                      
   |
 |42618    |42   |Syntax Error or Access Rule Violation             |618     |A 
variable is not allowed.                                  |DB2            |N    
   |DB2                                                                         
|
 |42620    |42   |Syntax Error or Access Rule Violation             |620     
|Read-only SCROLL was specified with the UPDATE clause.      |DB2            |N 
      |DB2                                                                      
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index e54d346e1bc..7a672fa5e55 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3066,6 +3066,81 @@
     ],
     "sqlState" : "42713"
   },
+  "STDS_COMMITTED_BATCH_UNAVAILABLE" : {
+    "message" : [
+      "No committed batch found, checkpoint location: <checkpointLocation>. 
Ensure that the query has run and committed any microbatch before stopping."
+    ],
+    "sqlState" : "KD006"
+  },
+  "STDS_CONFLICT_OPTIONS" : {
+    "message" : [
+      "The options <options> cannot be specified together. Please specify the 
one."
+    ],
+    "sqlState" : "42613"
+  },
+  "STDS_FAILED_TO_READ_STATE_SCHEMA" : {
+    "message" : [
+      "Failed to read the state schema. Either the file does not exist, or the 
file is corrupted. options: <sourceOptions>.",
+      "Rerun the streaming query to construct the state schema, and report to 
the corresponding communities or vendors if the error persists."
+    ],
+    "sqlState" : "42K03"
+  },
+  "STDS_INTERNAL_ERROR" : {
+    "message" : [
+      "Internal error: <message>",
+      "Please, report this bug to the corresponding communities or vendors, 
and provide the full stack trace."
+    ],
+    "sqlState" : "XXKST"
+  },
+  "STDS_INVALID_OPTION_VALUE" : {
+    "message" : [
+      "Invalid value for source option '<optionName>':"
+    ],
+    "subClass" : {
+      "IS_EMPTY" : {
+        "message" : [
+          "cannot be empty."
+        ]
+      },
+      "IS_NEGATIVE" : {
+        "message" : [
+          "cannot be negative."
+        ]
+      },
+      "WITH_MESSAGE" : {
+        "message" : [
+          "<message>"
+        ]
+      }
+    },
+    "sqlState" : "42616"
+  },
+  "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE" : {
+    "message" : [
+      "The state does not have any partition. Please double check that the 
query points to the valid state. options: <sourceOptions>"
+    ],
+    "sqlState" : "KD006"
+  },
+  "STDS_OFFSET_LOG_UNAVAILABLE" : {
+    "message" : [
+      "The offset log for <batchId> does not exist, checkpoint location: 
<checkpointLocation>.",
+      "Please specify the batch ID which is available for querying - you can 
query the available batch IDs via using state metadata data source."
+    ],
+    "sqlState" : "KD006"
+  },
+  "STDS_OFFSET_METADATA_LOG_UNAVAILABLE" : {
+    "message" : [
+      "Metadata is not available for offset log for <batchId>, checkpoint 
location: <checkpointLocation>.",
+      "The checkpoint seems to be only run with older Spark version(s). Run 
the streaming query with the recent Spark version, so that Spark constructs the 
state metadata."
+    ],
+    "sqlState" : "KD006"
+  },
+  "STDS_REQUIRED_OPTION_UNSPECIFIED" : {
+    "message" : [
+      "'<optionName>' must be specified."
+    ],
+    "sqlState" : "42601"
+  },
   "STREAM_FAILED" : {
     "message" : [
       "Query [id = <id>, runId = <runId>] terminated with exception: <message>"
diff --git a/docs/sql-error-conditions-stds-invalid-option-value-error-class.md 
b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md
new file mode 100644
index 00000000000..ec0f15ed9f7
--- /dev/null
+++ b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md
@@ -0,0 +1,40 @@
+---
+layout: global
+title: STDS_INVALID_OPTION_VALUE error class
+displayTitle: STDS_INVALID_OPTION_VALUE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+[SQLSTATE: 
42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Invalid value for source option '`<optionName>`':
+
+This error class has the following derived error classes:
+
+## IS_EMPTY
+
+cannot be empty.
+
+## IS_NEGATIVE
+
+cannot be negative.
+
+## WITH_MESSAGE
+
+`<message>`
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index c9990d3856c..d97e2ceef4c 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1930,6 +1930,66 @@ Star (*) is not allowed in a select list when GROUP BY 
an ordinal position is us
 
 Static partition column `<staticName>` is also specified in the column list.
 
+### STDS_COMMITTED_BATCH_UNAVAILABLE
+
+SQLSTATE: KD006
+
+No committed batch found, checkpoint location: `<checkpointLocation>`. Ensure 
that the query has run and committed any microbatch before stopping.
+
+### STDS_CONFLICT_OPTIONS
+
+[SQLSTATE: 
42613](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The options `<options>` cannot be specified together. Please specify the one.
+
+### STDS_FAILED_TO_READ_STATE_SCHEMA
+
+[SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to read the state schema. Either the file does not exist, or the file 
is corrupted. options: `<sourceOptions>`.
+Rerun the streaming query to construct the state schema, and report to the 
corresponding communities or vendors if the error persists.
+
+### STDS_INTERNAL_ERROR
+
+[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+Internal error: `<message>`
+Please, report this bug to the corresponding communities or vendors, and 
provide the full stack trace.
+
+### 
[STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html)
+
+[SQLSTATE: 
42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Invalid value for source option '`<optionName>`':
+
+For more details see 
[STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html)
+
+### STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE
+
+SQLSTATE: KD006
+
+The state does not have any partition. Please double check that the query 
points to the valid state. options: `<sourceOptions>`
+
+### STDS_OFFSET_LOG_UNAVAILABLE
+
+SQLSTATE: KD006
+
+The offset log for `<batchId>` does not exist, checkpoint location: 
`<checkpointLocation>`.
+Please specify the batch ID which is available for querying - you can query 
the available batch IDs via using state metadata data source.
+
+### STDS_OFFSET_METADATA_LOG_UNAVAILABLE
+
+SQLSTATE: KD006
+
+Metadata is not available for offset log for `<batchId>`, checkpoint location: 
`<checkpointLocation>`.
+The checkpoint seems to be only run with older Spark version(s). Run the 
streaming query with the recent Spark version, so that Spark constructs the 
state metadata.
+
+### STDS_REQUIRED_OPTION_UNSPECIFIED
+
+[SQLSTATE: 
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+'`<optionName>`' must be specified.
+
 ### STREAM_FAILED
 
 [SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 55173a7e887..1192accaabe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -85,8 +85,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
         .add("value", valueSchema)
     } catch {
       case NonFatal(e) =>
-        throw new IllegalArgumentException("Failed to read the state schema. 
Either the file " +
-          s"does not exist, or the file is corrupted. options: 
$sourceOptions", e)
+        throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
     }
   }
 
@@ -96,8 +95,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
     offsetLog.get(batchId) match {
       case Some(value) =>
         val metadata = value.metadata.getOrElse(
-          throw new IllegalStateException(s"Metadata is not available for 
offset log for " +
-            s"$batchId, checkpoint location $checkpointLocation")
+          throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId, 
checkpointLocation)
         )
 
         val clonedRuntimeConf = new 
RuntimeConfig(session.sessionState.conf.clone())
@@ -105,8 +103,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
         StateStoreConf(clonedRuntimeConf.sqlConf)
 
       case _ =>
-        throw new IllegalStateException(s"The offset log for $batchId does not 
exist, " +
-          s"checkpoint location $checkpointLocation")
+        throw StateDataSourceErrors.offsetLogUnavailable(batchId, 
checkpointLocation)
     }
   }
 
@@ -120,6 +117,11 @@ case class StateSourceOptions(
     storeName: String,
     joinSide: JoinSideValues) {
   def stateCheckpointLocation: Path = new Path(resolvedCpLocation, 
DIR_NAME_STATE)
+
+  override def toString: String = {
+    s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, 
batchId=$batchId, " +
+      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+  }
 }
 
 object StateSourceOptions extends DataSourceOptions {
@@ -146,7 +148,7 @@ object StateSourceOptions extends DataSourceOptions {
       hadoopConf: Configuration,
       options: CaseInsensitiveStringMap): StateSourceOptions = {
     val checkpointLocation = Option(options.get(PATH)).orElse {
-      throw new IllegalArgumentException(s"'$PATH' must be specified.")
+      throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
     }.get
 
     val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, 
checkpointLocation)
@@ -156,14 +158,14 @@ object StateSourceOptions extends DataSourceOptions {
     }.get
 
     if (batchId < 0) {
-      throw new IllegalArgumentException(s"'$BATCH_ID' cannot be negative.")
+      throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID)
     }
 
     val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt)
       .orElse(Some(0)).get
 
     if (operatorId < 0) {
-      throw new IllegalArgumentException(s"'$OPERATOR_ID' cannot be negative.")
+      throw StateDataSourceErrors.invalidOptionValueIsNegative(OPERATOR_ID)
     }
 
     val storeName = Option(options.get(STORE_NAME))
@@ -171,7 +173,7 @@ object StateSourceOptions extends DataSourceOptions {
       .getOrElse(StateStoreId.DEFAULT_STORE_NAME)
 
     if (storeName.isEmpty) {
-      throw new IllegalArgumentException(s"'$STORE_NAME' cannot be an empty 
string.")
+      throw StateDataSourceErrors.invalidOptionValueIsEmpty(STORE_NAME)
     }
 
     val joinSide = try {
@@ -179,14 +181,12 @@ object StateSourceOptions extends DataSourceOptions {
         .map(JoinSideValues.withName).getOrElse(JoinSideValues.none)
     } catch {
       case _: NoSuchElementException =>
-        // convert to IllegalArgumentException
-        throw new IllegalArgumentException(s"Incorrect value of the option " +
-          s"'$JOIN_SIDE'. Valid values are 
${JoinSideValues.values.mkString(",")}")
+        throw StateDataSourceErrors.invalidOptionValue(JOIN_SIDE,
+          s"Valid values are ${JoinSideValues.values.mkString(",")}")
     }
 
     if (joinSide != JoinSideValues.none && storeName != 
StateStoreId.DEFAULT_STORE_NAME) {
-      throw new IllegalArgumentException(s"The options '$JOIN_SIDE' and " +
-        s"'$STORE_NAME' cannot be specified together. Please specify either 
one.")
+      throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
     }
 
     StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, 
joinSide)
@@ -205,8 +205,7 @@ object StateSourceOptions extends DataSourceOptions {
       new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
     commitLog.getLatest() match {
       case Some((lastId, _)) => lastId
-      case None => throw new IllegalStateException("No committed batch found, 
" +
-        s"checkpoint location: $checkpointLocation")
+      case None => throw 
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
new file mode 100644
index 00000000000..fe81d65c926
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.SparkRuntimeException
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown from State 
Data Source.
+ * State Metadata Data Source may (re/co)use this object.
+ *
+ * ERROR_CLASS has a prefix of "STDS_" representing STateDataSource.
+ */
+object StateDataSourceErrors {
+  def internalError(message: String): StateDataSourceException = {
+    new StateDataSourceInternalError(message)
+  }
+
+  def invalidOptionValue(optionName: String, message: String): 
StateDataSourceException = {
+    new StateDataSourceInvalidOptionValue(optionName, message)
+  }
+
+  def invalidOptionValueIsNegative(optionName: String): 
StateDataSourceException = {
+    new StateDataSourceInvalidOptionValueIsNegative(optionName)
+  }
+
+  def invalidOptionValueIsEmpty(optionName: String): StateDataSourceException 
= {
+    new StateDataSourceInvalidOptionValueIsEmpty(optionName)
+  }
+
+  def requiredOptionUnspecified(missingOptionName: String): 
StateDataSourceException = {
+    new StateDataSourceUnspecifiedRequiredOption(missingOptionName)
+  }
+
+  def offsetLogUnavailable(
+      batchId: Long,
+      checkpointLocation: String): StateDataSourceException = {
+    new StateDataSourceOffsetLogUnavailable(batchId, checkpointLocation)
+  }
+
+  def offsetMetadataLogUnavailable(
+      batchId: Long,
+      checkpointLocation: String): StateDataSourceException = {
+    new StateDataSourceOffsetMetadataLogUnavailable(batchId, 
checkpointLocation)
+  }
+
+  def failedToReadStateSchema(
+      sourceOptions: StateSourceOptions,
+      cause: Throwable): StateDataSourceException = {
+    new StateDataSourceReadStateSchemaFailure(sourceOptions, cause)
+  }
+
+  def conflictOptions(options: Seq[String]): StateDataSourceException = {
+    new StateDataSourceConflictOptions(options)
+  }
+
+  def committedBatchUnavailable(checkpointLocation: String): 
StateDataSourceException = {
+    new StataDataSourceCommittedBatchUnavailable(checkpointLocation)
+  }
+
+  def noPartitionDiscoveredInStateStore(
+      sourceOptions: StateSourceOptions): StateDataSourceException = {
+    new StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions)
+  }
+}
+
+abstract class StateDataSourceException(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable)
+  extends SparkRuntimeException(
+    errorClass,
+    messageParameters,
+    cause)
+
+class StateDataSourceInternalError(message: String, cause: Throwable = null)
+  extends StateDataSourceException(
+    "STDS_INTERNAL_ERROR",
+    Map("message" -> message),
+    cause)
+
+class StateDataSourceInvalidOptionValue(optionName: String, message: String)
+  extends StateDataSourceException(
+    "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE",
+    Map("optionName" -> optionName, "message" -> message),
+    cause = null)
+
+class StateDataSourceInvalidOptionValueIsNegative(optionName: String)
+  extends StateDataSourceException(
+    "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE",
+    Map("optionName" -> optionName),
+    cause = null)
+
+class StateDataSourceInvalidOptionValueIsEmpty(optionName: String)
+  extends StateDataSourceException(
+    "STDS_INVALID_OPTION_VALUE.IS_EMPTY",
+    Map("optionName" -> optionName),
+    cause = null)
+
+class StateDataSourceUnspecifiedRequiredOption(
+    missingOptionName: String)
+  extends StateDataSourceException(
+    "STDS_REQUIRED_OPTION_UNSPECIFIED",
+    Map("optionName" -> missingOptionName),
+    cause = null)
+
+class StateDataSourceOffsetLogUnavailable(
+    batchId: Long,
+    checkpointLocation: String)
+  extends StateDataSourceException(
+    "STDS_OFFSET_LOG_UNAVAILABLE",
+    Map("batchId" -> batchId.toString, "checkpointLocation" -> 
checkpointLocation),
+    cause = null)
+
+class StateDataSourceOffsetMetadataLogUnavailable(
+    batchId: Long,
+    checkpointLocation: String)
+  extends StateDataSourceException(
+    "STDS_OFFSET_METADATA_LOG_UNAVAILABLE",
+    Map("batchId" -> batchId.toString, "checkpointLocation" -> 
checkpointLocation),
+    cause = null)
+
+class StateDataSourceReadStateSchemaFailure(
+   sourceOptions: StateSourceOptions,
+   cause: Throwable)
+  extends StateDataSourceException(
+    "STDS_FAILED_TO_READ_STATE_SCHEMA",
+    Map("sourceOptions" -> sourceOptions.toString),
+    cause)
+
+class StateDataSourceConflictOptions(options: Seq[String])
+  extends StateDataSourceException(
+    "STDS_CONFLICT_OPTIONS",
+    Map("options" -> options.map(x => s"'$x'").mkString("[", ", ", "]")),
+    cause = null)
+
+class StataDataSourceCommittedBatchUnavailable(checkpointLocation: String)
+  extends StateDataSourceException(
+    "STDS_COMMITTED_BATCH_UNAVAILABLE",
+    Map("checkpointLocation" -> checkpointLocation),
+    cause = null)
+
+class StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions: 
StateSourceOptions)
+  extends StateDataSourceException(
+    "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE",
+    Map("sourceOptions" -> sourceOptions.toString),
+    cause = null)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
index 214f5f97330..0d69bf708e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
@@ -67,8 +67,7 @@ class StateScan(
     })
 
     if (partitions.headOption.isEmpty) {
-      throw new IllegalArgumentException("The state does not have any 
partition. Please double " +
-        s"check that the query points to the valid state. options: 
$sourceOptions")
+      throw 
StateDataSourceErrors.noPartitionDiscoveredInStateStore(sourceOptions)
     } else {
       // just a dummy query id because we are actually not running streaming 
query
       val queryId = UUID.randomUUID()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
index 8b4e2737744..96c1c01cede 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
@@ -41,10 +41,11 @@ class StateTable(
   import StateTable._
 
   if (!isValidSchema(schema)) {
-    throw new IllegalStateException(s"Invalid schema is provided. Provided 
schema: $schema for " +
-      s"checkpoint location: ${sourceOptions.stateCheckpointLocation} , 
operatorId: " +
-      s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, " 
+
-      s"joinSide: ${sourceOptions.joinSide}")
+    throw StateDataSourceErrors.internalError(
+      s"Invalid schema is provided. Provided schema: $schema for " +
+        s"checkpoint location: ${sourceOptions.stateCheckpointLocation} , 
operatorId: " +
+        s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, 
" +
+        s"joinSide: ${sourceOptions.joinSide}")
   }
 
   override def name(): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
index 1a3d42aa066..26492f8790c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
@@ -67,7 +67,7 @@ class StreamStreamJoinStatePartitionReader(
     case JoinSideValues.left => LeftSide
     case JoinSideValues.right => RightSide
     case JoinSideValues.none =>
-      throw new IllegalStateException("Unexpected join side for stream-stream 
read!")
+      throw StateDataSourceErrors.internalError("Unexpected join side for 
stream-stream read!")
   }
 
   /*
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
index ca123a9e501..4f88eccb748 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, Table, TableCapability, TableProvider}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, 
OperatorStateMetadataReader, OperatorStateMetadataV1}
 import org.apache.spark.sql.sources.DataSourceRegister
@@ -95,8 +96,7 @@ class StateMetadataTable extends Table with SupportsRead with 
SupportsMetadataCo
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     () => {
       if (!options.containsKey("path")) {
-        throw new IllegalArgumentException("Checkpoint path is not specified 
for" +
-          " state metadata data source.")
+        throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
       }
       new StateMetadataScan(options.get("path"))
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index bfc9ad2fe0f..86c3ab70af6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -49,7 +49,7 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
         CheckLastBatch((6, 0), (7, 1), (8, 0))
       )
 
-      intercept[IllegalArgumentException] {
+      intercept[StateDataSourceReadStateSchemaFailure] {
         spark.read.format("statestore").load(tempDir.getAbsolutePath)
       }
     }
@@ -67,7 +67,7 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
       offsetLog.purgeAfter(0)
       commitLog.purgeAfter(-1)
 
-      intercept[IllegalStateException] {
+      intercept[StataDataSourceCommittedBatchUnavailable] {
         spark.read.format("statestore").load(tempDir.getAbsolutePath)
       }
     }
@@ -98,67 +98,79 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
 
       rewriteStateSchemaFileToDummy()
 
-      intercept[IllegalArgumentException] {
+      intercept[StateDataSourceReadStateSchemaFailure] {
         spark.read.format("statestore").load(tempDir.getAbsolutePath)
       }
     }
   }
 
   test("ERROR: path is not specified") {
-    intercept[IllegalArgumentException] {
+    val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
       spark.read.format("statestore").load()
     }
+    checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+      Map("optionName" -> StateSourceOptions.PATH))
   }
 
   test("ERROR: operator ID specified to negative") {
     withTempDir { tempDir =>
-      intercept[IllegalArgumentException] {
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
         spark.read.format("statestore")
           .option(StateSourceOptions.OPERATOR_ID, -1)
           // trick to bypass getting the last committed batch before 
validating operator ID
           .option(StateSourceOptions.BATCH_ID, 0)
           .load(tempDir.getAbsolutePath)
       }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.OPERATOR_ID))
     }
   }
 
   test("ERROR: batch ID specified to negative") {
     withTempDir { tempDir =>
-      intercept[IllegalArgumentException] {
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
         spark.read.format("statestore")
           .option(StateSourceOptions.BATCH_ID, -1)
           .load(tempDir.getAbsolutePath)
       }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.BATCH_ID))
     }
   }
 
   test("ERROR: store name is empty") {
     withTempDir { tempDir =>
-      intercept[IllegalArgumentException] {
+      val exc = intercept[StateDataSourceInvalidOptionValueIsEmpty] {
         spark.read.format("statestore")
           .option(StateSourceOptions.STORE_NAME, "")
           // trick to bypass getting the last committed batch before 
validating operator ID
           .option(StateSourceOptions.BATCH_ID, 0)
           .load(tempDir.getAbsolutePath)
       }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_EMPTY", "42616",
+        Map("optionName" -> StateSourceOptions.STORE_NAME))
     }
   }
 
   test("ERROR: invalid value for joinSide option") {
     withTempDir { tempDir =>
-      intercept[IllegalArgumentException] {
+      val exc = intercept[StateDataSourceInvalidOptionValue] {
         spark.read.format("statestore")
           .option(StateSourceOptions.JOIN_SIDE, "both")
           // trick to bypass getting the last committed batch before 
validating operator ID
           .option(StateSourceOptions.BATCH_ID, 0)
           .load(tempDir.getAbsolutePath)
       }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE", "42616",
+        Map(
+          "optionName" -> StateSourceOptions.JOIN_SIDE,
+          "message" -> "Valid values are left,right,none"))
     }
   }
 
   test("ERROR: both options `joinSide` and `storeName` are specified") {
     withTempDir { tempDir =>
-      intercept[IllegalArgumentException] {
+      val exc = intercept[StateDataSourceConflictOptions] {
         spark.read.format("statestore")
           .option(StateSourceOptions.JOIN_SIDE, "right")
           .option(StateSourceOptions.STORE_NAME, "right-keyToNumValues")
@@ -166,6 +178,9 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
           .option(StateSourceOptions.BATCH_ID, 0)
           .load(tempDir.getAbsolutePath)
       }
+      checkError(exc, "STDS_CONFLICT_OPTIONS", "42613",
+        Map("options" ->
+          s"['${StateSourceOptions.JOIN_SIDE}', 
'${StateSourceOptions.STORE_NAME}']"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
index 340187fa495..9115af456eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{Column, Row}
+import 
org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceUnspecifiedRequiredOption,
 StateSourceOptions}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
@@ -208,9 +209,10 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
   }
 
   test("State metadata data source handle missing argument") {
-    val e = intercept[IllegalArgumentException] {
+    val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
       spark.read.format("state-metadata").load().collect()
     }
-    assert(e.getMessage == "Checkpoint path is not specified for state 
metadata data source.")
+    checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+      Map("optionName" -> StateSourceOptions.PATH))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to