This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 59777222e72 [SPARK-45888][SS] Apply error class framework to State (Metadata) Data Source 59777222e72 is described below commit 59777222e726c63cbd9077a2c76f762e06f6a5b3 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Dec 6 22:38:40 2023 +0900 [SPARK-45888][SS] Apply error class framework to State (Metadata) Data Source ### What changes were proposed in this pull request? This PR proposes to apply error class framework to the new data source, State (Metadata) Data Source. ### Why are the changes needed? Error class framework is a standard to represent all exceptions in Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44025 from HeartSaVioR/SPARK-45888. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- common/utils/src/main/resources/error/README.md | 1 + .../src/main/resources/error/error-classes.json | 75 ++++++++++ ...itions-stds-invalid-option-value-error-class.md | 40 ++++++ docs/sql-error-conditions.md | 60 ++++++++ .../datasources/v2/state/StateDataSource.scala | 33 +++-- .../v2/state/StateDataSourceErrors.scala | 160 +++++++++++++++++++++ .../datasources/v2/state/StateScanBuilder.scala | 3 +- .../datasources/v2/state/StateTable.scala | 9 +- .../StreamStreamJoinStatePartitionReader.scala | 2 +- .../v2/state/metadata/StateMetadataSource.scala | 4 +- .../v2/state/StateDataSourceReadSuite.scala | 33 +++-- .../state/OperatorStateMetadataSuite.scala | 6 +- 12 files changed, 389 insertions(+), 37 deletions(-) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index 556a634e992..b062c773907 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -636,6 +636,7 @@ The following SQLSTATEs are collated from: |42613 |42 |Syntax Error or Access Rule Violation |613 |Clauses are mutually exclusive. |DB2 |N |DB2 | |42614 |42 |Syntax Error or Access Rule Violation |614 |A duplicate keyword or clause is invalid. |DB2 |N |DB2 | |42615 |42 |Syntax Error or Access Rule Violation |615 |An invalid alternative was detected. |DB2 |N |DB2 | +|42616 |42 |Syntax Error or Access Rule Violation |616 |Invalid options specified |DB2 |N |DB2 | |42617 |42 |Syntax Error or Access Rule Violation |617 |The statement string is blank or empty. |DB2 |N |DB2 | |42618 |42 |Syntax Error or Access Rule Violation |618 |A variable is not allowed. |DB2 |N |DB2 | |42620 |42 |Syntax Error or Access Rule Violation |620 |Read-only SCROLL was specified with the UPDATE clause. |DB2 |N |DB2 | diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index e54d346e1bc..7a672fa5e55 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3066,6 +3066,81 @@ ], "sqlState" : "42713" }, + "STDS_COMMITTED_BATCH_UNAVAILABLE" : { + "message" : [ + "No committed batch found, checkpoint location: <checkpointLocation>. Ensure that the query has run and committed any microbatch before stopping." + ], + "sqlState" : "KD006" + }, + "STDS_CONFLICT_OPTIONS" : { + "message" : [ + "The options <options> cannot be specified together. Please specify the one." + ], + "sqlState" : "42613" + }, + "STDS_FAILED_TO_READ_STATE_SCHEMA" : { + "message" : [ + "Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: <sourceOptions>.", + "Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists." + ], + "sqlState" : "42K03" + }, + "STDS_INTERNAL_ERROR" : { + "message" : [ + "Internal error: <message>", + "Please, report this bug to the corresponding communities or vendors, and provide the full stack trace." + ], + "sqlState" : "XXKST" + }, + "STDS_INVALID_OPTION_VALUE" : { + "message" : [ + "Invalid value for source option '<optionName>':" + ], + "subClass" : { + "IS_EMPTY" : { + "message" : [ + "cannot be empty." + ] + }, + "IS_NEGATIVE" : { + "message" : [ + "cannot be negative." + ] + }, + "WITH_MESSAGE" : { + "message" : [ + "<message>" + ] + } + }, + "sqlState" : "42616" + }, + "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE" : { + "message" : [ + "The state does not have any partition. Please double check that the query points to the valid state. options: <sourceOptions>" + ], + "sqlState" : "KD006" + }, + "STDS_OFFSET_LOG_UNAVAILABLE" : { + "message" : [ + "The offset log for <batchId> does not exist, checkpoint location: <checkpointLocation>.", + "Please specify the batch ID which is available for querying - you can query the available batch IDs via using state metadata data source." + ], + "sqlState" : "KD006" + }, + "STDS_OFFSET_METADATA_LOG_UNAVAILABLE" : { + "message" : [ + "Metadata is not available for offset log for <batchId>, checkpoint location: <checkpointLocation>.", + "The checkpoint seems to be only run with older Spark version(s). Run the streaming query with the recent Spark version, so that Spark constructs the state metadata." + ], + "sqlState" : "KD006" + }, + "STDS_REQUIRED_OPTION_UNSPECIFIED" : { + "message" : [ + "'<optionName>' must be specified." + ], + "sqlState" : "42601" + }, "STREAM_FAILED" : { "message" : [ "Query [id = <id>, runId = <runId>] terminated with exception: <message>" diff --git a/docs/sql-error-conditions-stds-invalid-option-value-error-class.md b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md new file mode 100644 index 00000000000..ec0f15ed9f7 --- /dev/null +++ b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md @@ -0,0 +1,40 @@ +--- +layout: global +title: STDS_INVALID_OPTION_VALUE error class +displayTitle: STDS_INVALID_OPTION_VALUE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +[SQLSTATE: 42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Invalid value for source option '`<optionName>`': + +This error class has the following derived error classes: + +## IS_EMPTY + +cannot be empty. + +## IS_NEGATIVE + +cannot be negative. + +## WITH_MESSAGE + +`<message>` + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index c9990d3856c..d97e2ceef4c 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1930,6 +1930,66 @@ Star (*) is not allowed in a select list when GROUP BY an ordinal position is us Static partition column `<staticName>` is also specified in the column list. +### STDS_COMMITTED_BATCH_UNAVAILABLE + +SQLSTATE: KD006 + +No committed batch found, checkpoint location: `<checkpointLocation>`. Ensure that the query has run and committed any microbatch before stopping. + +### STDS_CONFLICT_OPTIONS + +[SQLSTATE: 42613](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +The options `<options>` cannot be specified together. Please specify the one. + +### STDS_FAILED_TO_READ_STATE_SCHEMA + +[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: `<sourceOptions>`. +Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. + +### STDS_INTERNAL_ERROR + +[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +Internal error: `<message>` +Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. + +### [STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html) + +[SQLSTATE: 42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Invalid value for source option '`<optionName>`': + +For more details see [STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html) + +### STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE + +SQLSTATE: KD006 + +The state does not have any partition. Please double check that the query points to the valid state. options: `<sourceOptions>` + +### STDS_OFFSET_LOG_UNAVAILABLE + +SQLSTATE: KD006 + +The offset log for `<batchId>` does not exist, checkpoint location: `<checkpointLocation>`. +Please specify the batch ID which is available for querying - you can query the available batch IDs via using state metadata data source. + +### STDS_OFFSET_METADATA_LOG_UNAVAILABLE + +SQLSTATE: KD006 + +Metadata is not available for offset log for `<batchId>`, checkpoint location: `<checkpointLocation>`. +The checkpoint seems to be only run with older Spark version(s). Run the streaming query with the recent Spark version, so that Spark constructs the state metadata. + +### STDS_REQUIRED_OPTION_UNSPECIFIED + +[SQLSTATE: 42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +'`<optionName>`' must be specified. + ### STREAM_FAILED [SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 55173a7e887..1192accaabe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -85,8 +85,7 @@ class StateDataSource extends TableProvider with DataSourceRegister { .add("value", valueSchema) } catch { case NonFatal(e) => - throw new IllegalArgumentException("Failed to read the state schema. Either the file " + - s"does not exist, or the file is corrupted. options: $sourceOptions", e) + throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e) } } @@ -96,8 +95,7 @@ class StateDataSource extends TableProvider with DataSourceRegister { offsetLog.get(batchId) match { case Some(value) => val metadata = value.metadata.getOrElse( - throw new IllegalStateException(s"Metadata is not available for offset log for " + - s"$batchId, checkpoint location $checkpointLocation") + throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId, checkpointLocation) ) val clonedRuntimeConf = new RuntimeConfig(session.sessionState.conf.clone()) @@ -105,8 +103,7 @@ class StateDataSource extends TableProvider with DataSourceRegister { StateStoreConf(clonedRuntimeConf.sqlConf) case _ => - throw new IllegalStateException(s"The offset log for $batchId does not exist, " + - s"checkpoint location $checkpointLocation") + throw StateDataSourceErrors.offsetLogUnavailable(batchId, checkpointLocation) } } @@ -120,6 +117,11 @@ case class StateSourceOptions( storeName: String, joinSide: JoinSideValues) { def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE) + + override def toString: String = { + s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " + + s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)" + } } object StateSourceOptions extends DataSourceOptions { @@ -146,7 +148,7 @@ object StateSourceOptions extends DataSourceOptions { hadoopConf: Configuration, options: CaseInsensitiveStringMap): StateSourceOptions = { val checkpointLocation = Option(options.get(PATH)).orElse { - throw new IllegalArgumentException(s"'$PATH' must be specified.") + throw StateDataSourceErrors.requiredOptionUnspecified(PATH) }.get val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation) @@ -156,14 +158,14 @@ object StateSourceOptions extends DataSourceOptions { }.get if (batchId < 0) { - throw new IllegalArgumentException(s"'$BATCH_ID' cannot be negative.") + throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID) } val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt) .orElse(Some(0)).get if (operatorId < 0) { - throw new IllegalArgumentException(s"'$OPERATOR_ID' cannot be negative.") + throw StateDataSourceErrors.invalidOptionValueIsNegative(OPERATOR_ID) } val storeName = Option(options.get(STORE_NAME)) @@ -171,7 +173,7 @@ object StateSourceOptions extends DataSourceOptions { .getOrElse(StateStoreId.DEFAULT_STORE_NAME) if (storeName.isEmpty) { - throw new IllegalArgumentException(s"'$STORE_NAME' cannot be an empty string.") + throw StateDataSourceErrors.invalidOptionValueIsEmpty(STORE_NAME) } val joinSide = try { @@ -179,14 +181,12 @@ object StateSourceOptions extends DataSourceOptions { .map(JoinSideValues.withName).getOrElse(JoinSideValues.none) } catch { case _: NoSuchElementException => - // convert to IllegalArgumentException - throw new IllegalArgumentException(s"Incorrect value of the option " + - s"'$JOIN_SIDE'. Valid values are ${JoinSideValues.values.mkString(",")}") + throw StateDataSourceErrors.invalidOptionValue(JOIN_SIDE, + s"Valid values are ${JoinSideValues.values.mkString(",")}") } if (joinSide != JoinSideValues.none && storeName != StateStoreId.DEFAULT_STORE_NAME) { - throw new IllegalArgumentException(s"The options '$JOIN_SIDE' and " + - s"'$STORE_NAME' cannot be specified together. Please specify either one.") + throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME)) } StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, joinSide) @@ -205,8 +205,7 @@ object StateSourceOptions extends DataSourceOptions { new Path(checkpointLocation, DIR_NAME_COMMITS).toString) commitLog.getLatest() match { case Some((lastId, _)) => lastId - case None => throw new IllegalStateException("No committed batch found, " + - s"checkpoint location: $checkpointLocation") + case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala new file mode 100644 index 00000000000..fe81d65c926 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import org.apache.spark.SparkRuntimeException + +/** + * Object for grouping error messages from (most) exceptions thrown from State Data Source. + * State Metadata Data Source may (re/co)use this object. + * + * ERROR_CLASS has a prefix of "STDS_" representing STateDataSource. + */ +object StateDataSourceErrors { + def internalError(message: String): StateDataSourceException = { + new StateDataSourceInternalError(message) + } + + def invalidOptionValue(optionName: String, message: String): StateDataSourceException = { + new StateDataSourceInvalidOptionValue(optionName, message) + } + + def invalidOptionValueIsNegative(optionName: String): StateDataSourceException = { + new StateDataSourceInvalidOptionValueIsNegative(optionName) + } + + def invalidOptionValueIsEmpty(optionName: String): StateDataSourceException = { + new StateDataSourceInvalidOptionValueIsEmpty(optionName) + } + + def requiredOptionUnspecified(missingOptionName: String): StateDataSourceException = { + new StateDataSourceUnspecifiedRequiredOption(missingOptionName) + } + + def offsetLogUnavailable( + batchId: Long, + checkpointLocation: String): StateDataSourceException = { + new StateDataSourceOffsetLogUnavailable(batchId, checkpointLocation) + } + + def offsetMetadataLogUnavailable( + batchId: Long, + checkpointLocation: String): StateDataSourceException = { + new StateDataSourceOffsetMetadataLogUnavailable(batchId, checkpointLocation) + } + + def failedToReadStateSchema( + sourceOptions: StateSourceOptions, + cause: Throwable): StateDataSourceException = { + new StateDataSourceReadStateSchemaFailure(sourceOptions, cause) + } + + def conflictOptions(options: Seq[String]): StateDataSourceException = { + new StateDataSourceConflictOptions(options) + } + + def committedBatchUnavailable(checkpointLocation: String): StateDataSourceException = { + new StataDataSourceCommittedBatchUnavailable(checkpointLocation) + } + + def noPartitionDiscoveredInStateStore( + sourceOptions: StateSourceOptions): StateDataSourceException = { + new StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions) + } +} + +abstract class StateDataSourceException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable) + extends SparkRuntimeException( + errorClass, + messageParameters, + cause) + +class StateDataSourceInternalError(message: String, cause: Throwable = null) + extends StateDataSourceException( + "STDS_INTERNAL_ERROR", + Map("message" -> message), + cause) + +class StateDataSourceInvalidOptionValue(optionName: String, message: String) + extends StateDataSourceException( + "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE", + Map("optionName" -> optionName, "message" -> message), + cause = null) + +class StateDataSourceInvalidOptionValueIsNegative(optionName: String) + extends StateDataSourceException( + "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", + Map("optionName" -> optionName), + cause = null) + +class StateDataSourceInvalidOptionValueIsEmpty(optionName: String) + extends StateDataSourceException( + "STDS_INVALID_OPTION_VALUE.IS_EMPTY", + Map("optionName" -> optionName), + cause = null) + +class StateDataSourceUnspecifiedRequiredOption( + missingOptionName: String) + extends StateDataSourceException( + "STDS_REQUIRED_OPTION_UNSPECIFIED", + Map("optionName" -> missingOptionName), + cause = null) + +class StateDataSourceOffsetLogUnavailable( + batchId: Long, + checkpointLocation: String) + extends StateDataSourceException( + "STDS_OFFSET_LOG_UNAVAILABLE", + Map("batchId" -> batchId.toString, "checkpointLocation" -> checkpointLocation), + cause = null) + +class StateDataSourceOffsetMetadataLogUnavailable( + batchId: Long, + checkpointLocation: String) + extends StateDataSourceException( + "STDS_OFFSET_METADATA_LOG_UNAVAILABLE", + Map("batchId" -> batchId.toString, "checkpointLocation" -> checkpointLocation), + cause = null) + +class StateDataSourceReadStateSchemaFailure( + sourceOptions: StateSourceOptions, + cause: Throwable) + extends StateDataSourceException( + "STDS_FAILED_TO_READ_STATE_SCHEMA", + Map("sourceOptions" -> sourceOptions.toString), + cause) + +class StateDataSourceConflictOptions(options: Seq[String]) + extends StateDataSourceException( + "STDS_CONFLICT_OPTIONS", + Map("options" -> options.map(x => s"'$x'").mkString("[", ", ", "]")), + cause = null) + +class StataDataSourceCommittedBatchUnavailable(checkpointLocation: String) + extends StateDataSourceException( + "STDS_COMMITTED_BATCH_UNAVAILABLE", + Map("checkpointLocation" -> checkpointLocation), + cause = null) + +class StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions: StateSourceOptions) + extends StateDataSourceException( + "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE", + Map("sourceOptions" -> sourceOptions.toString), + cause = null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala index 214f5f97330..0d69bf708e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala @@ -67,8 +67,7 @@ class StateScan( }) if (partitions.headOption.isEmpty) { - throw new IllegalArgumentException("The state does not have any partition. Please double " + - s"check that the query points to the valid state. options: $sourceOptions") + throw StateDataSourceErrors.noPartitionDiscoveredInStateStore(sourceOptions) } else { // just a dummy query id because we are actually not running streaming query val queryId = UUID.randomUUID() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala index 8b4e2737744..96c1c01cede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala @@ -41,10 +41,11 @@ class StateTable( import StateTable._ if (!isValidSchema(schema)) { - throw new IllegalStateException(s"Invalid schema is provided. Provided schema: $schema for " + - s"checkpoint location: ${sourceOptions.stateCheckpointLocation} , operatorId: " + - s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, " + - s"joinSide: ${sourceOptions.joinSide}") + throw StateDataSourceErrors.internalError( + s"Invalid schema is provided. Provided schema: $schema for " + + s"checkpoint location: ${sourceOptions.stateCheckpointLocation} , operatorId: " + + s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, " + + s"joinSide: ${sourceOptions.joinSide}") } override def name(): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala index 1a3d42aa066..26492f8790c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala @@ -67,7 +67,7 @@ class StreamStreamJoinStatePartitionReader( case JoinSideValues.left => LeftSide case JoinSideValues.right => RightSide case JoinSideValues.none => - throw new IllegalStateException("Unexpected join side for stream-stream read!") + throw StateDataSourceErrors.internalError("Unexpected join side for stream-stream read!") } /* diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala index ca123a9e501..4f88eccb748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataV1} import org.apache.spark.sql.sources.DataSourceRegister @@ -95,8 +96,7 @@ class StateMetadataTable extends Table with SupportsRead with SupportsMetadataCo override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { () => { if (!options.containsKey("path")) { - throw new IllegalArgumentException("Checkpoint path is not specified for" + - " state metadata data source.") + throw StateDataSourceErrors.requiredOptionUnspecified(PATH) } new StateMetadataScan(options.get("path")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index bfc9ad2fe0f..86c3ab70af6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -49,7 +49,7 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase { CheckLastBatch((6, 0), (7, 1), (8, 0)) ) - intercept[IllegalArgumentException] { + intercept[StateDataSourceReadStateSchemaFailure] { spark.read.format("statestore").load(tempDir.getAbsolutePath) } } @@ -67,7 +67,7 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase { offsetLog.purgeAfter(0) commitLog.purgeAfter(-1) - intercept[IllegalStateException] { + intercept[StataDataSourceCommittedBatchUnavailable] { spark.read.format("statestore").load(tempDir.getAbsolutePath) } } @@ -98,67 +98,79 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase { rewriteStateSchemaFileToDummy() - intercept[IllegalArgumentException] { + intercept[StateDataSourceReadStateSchemaFailure] { spark.read.format("statestore").load(tempDir.getAbsolutePath) } } } test("ERROR: path is not specified") { - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceUnspecifiedRequiredOption] { spark.read.format("statestore").load() } + checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", + Map("optionName" -> StateSourceOptions.PATH)) } test("ERROR: operator ID specified to negative") { withTempDir { tempDir => - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] { spark.read.format("statestore") .option(StateSourceOptions.OPERATOR_ID, -1) // trick to bypass getting the last committed batch before validating operator ID .option(StateSourceOptions.BATCH_ID, 0) .load(tempDir.getAbsolutePath) } + checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616", + Map("optionName" -> StateSourceOptions.OPERATOR_ID)) } } test("ERROR: batch ID specified to negative") { withTempDir { tempDir => - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] { spark.read.format("statestore") .option(StateSourceOptions.BATCH_ID, -1) .load(tempDir.getAbsolutePath) } + checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616", + Map("optionName" -> StateSourceOptions.BATCH_ID)) } } test("ERROR: store name is empty") { withTempDir { tempDir => - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceInvalidOptionValueIsEmpty] { spark.read.format("statestore") .option(StateSourceOptions.STORE_NAME, "") // trick to bypass getting the last committed batch before validating operator ID .option(StateSourceOptions.BATCH_ID, 0) .load(tempDir.getAbsolutePath) } + checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_EMPTY", "42616", + Map("optionName" -> StateSourceOptions.STORE_NAME)) } } test("ERROR: invalid value for joinSide option") { withTempDir { tempDir => - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceInvalidOptionValue] { spark.read.format("statestore") .option(StateSourceOptions.JOIN_SIDE, "both") // trick to bypass getting the last committed batch before validating operator ID .option(StateSourceOptions.BATCH_ID, 0) .load(tempDir.getAbsolutePath) } + checkError(exc, "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE", "42616", + Map( + "optionName" -> StateSourceOptions.JOIN_SIDE, + "message" -> "Valid values are left,right,none")) } } test("ERROR: both options `joinSide` and `storeName` are specified") { withTempDir { tempDir => - intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceConflictOptions] { spark.read.format("statestore") .option(StateSourceOptions.JOIN_SIDE, "right") .option(StateSourceOptions.STORE_NAME, "right-keyToNumValues") @@ -166,6 +178,9 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase { .option(StateSourceOptions.BATCH_ID, 0) .load(tempDir.getAbsolutePath) } + checkError(exc, "STDS_CONFLICT_OPTIONS", "42613", + Map("options" -> + s"['${StateSourceOptions.JOIN_SIDE}', '${StateSourceOptions.STORE_NAME}']")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index 340187fa495..9115af456eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceUnspecifiedRequiredOption, StateSourceOptions} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{OutputMode, StreamTest} @@ -208,9 +209,10 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { } test("State metadata data source handle missing argument") { - val e = intercept[IllegalArgumentException] { + val exc = intercept[StateDataSourceUnspecifiedRequiredOption] { spark.read.format("state-metadata").load().collect() } - assert(e.getMessage == "Checkpoint path is not specified for state metadata data source.") + checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", + Map("optionName" -> StateSourceOptions.PATH)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org