This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d9f0fccd967b [SPARK-46332][SQL] Migrate `CatalogNotFoundException` to the error class `CATALOG_NOT_FOUND` d9f0fccd967b is described below commit d9f0fccd967b5c8686353d524d2b31e27b7a473b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri Dec 8 12:54:20 2023 -0800 [SPARK-46332][SQL] Migrate `CatalogNotFoundException` to the error class `CATALOG_NOT_FOUND` ### What changes were proposed in this pull request? In the PR, I propose to migrate the `CatalogNotFoundException` exception to the new error class `CATALOG_NOT_FOUND`, improve the format of the exception message, and prohibit creation of the exception without the error class. ### Why are the changes needed? This is a part of the migration process onto error classes and new error framework. The changes improve user experience w/ Spark SQL, and make `CatalogNotFoundException` consistent to other Spark exceptions. ### Does this PR introduce _any_ user-facing change? Yes, if user's code depends on the error message format of `CatalogNotFoundException`. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44259 from MaxGekk/catalog-plugin-not-found. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- R/pkg/tests/fulltests/test_sparkSQL.R | 5 +---- common/utils/src/main/resources/error/error-classes.json | 6 ++++++ .../jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala | 6 +++--- docs/sql-error-conditions.md | 6 ++++++ .../spark/sql/connector/catalog/CatalogNotFoundException.scala | 10 +++++++--- .../org/apache/spark/sql/connector/catalog/Catalogs.scala | 2 +- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 7 +++++-- .../spark/sql/connector/catalog/CatalogLoadingSuite.java | 7 ++----- .../spark/sql/catalyst/analysis/TableLookupCacheSuite.scala | 6 +++--- .../spark/sql/connector/catalog/LookupCatalogSuite.scala | 5 +++-- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++-- .../sql/execution/command/AlignAssignmentsSuiteBase.scala | 5 +++-- .../spark/sql/execution/command/PlanResolutionSuite.scala | 9 ++++----- 13 files changed, 48 insertions(+), 32 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index f2bef7a00446..0d96f708a544 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4103,10 +4103,7 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", { expect_equal(currentCatalog(), "spark_catalog") expect_error(setCurrentCatalog("spark_catalog"), NA) expect_error(setCurrentCatalog("zxwtyswklpf"), - paste0("Error in setCurrentCatalog : ", - "org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ", - "Catalog 'zxwtyswklpf' plugin class not found: ", - "spark.sql.catalog.zxwtyswklpf is not defined")) + "[CATALOG_NOT_FOUND]*`zxwtyswklpf`*") catalogs <- collect(listCatalogs()) }) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 7a672fa5e557..62d10c0d34cb 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -407,6 +407,12 @@ ], "sqlState" : "22003" }, + "CATALOG_NOT_FOUND" : { + "message" : [ + "The catalog <catalogName> not found. Consider to set the SQL config <config> to a catalog plugin." + ], + "sqlState" : "42P08" + }, "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : { "message" : [ "Checkpoint block <rddBlockId> not found!", diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala index cefa63ecd353..d646fad00c07 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala @@ -66,10 +66,10 @@ class CatalogSuite extends RemoteSparkSession with SQLHelper { val catalogs = spark.catalog.listCatalogs().collect() assert(catalogs.length == 1) assert(catalogs.map(_.name) sameElements Array("spark_catalog")) - val message = intercept[SparkException] { + val exception = intercept[SparkException] { spark.catalog.setCurrentCatalog("notExists") - }.getMessage - assert(message.contains("plugin class not found")) + } + assert(exception.getErrorClass == "CATALOG_NOT_FOUND") spark.catalog.setCurrentCatalog("testcat") assert(spark.catalog.currentCatalog().equals("testcat")) val catalogsAfterChange = spark.catalog.listCatalogs().collect() diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index d97e2ceef4c2..82befaae81df 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -335,6 +335,12 @@ The value `<value>` of the type `<sourceType>` cannot be cast to `<targetType>` Fail to assign a value of `<sourceType>` type to the `<targetType>` type column or variable `<columnName>` due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead. +### CATALOG_NOT_FOUND + +[SQLSTATE: 42P08](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +The catalog `<catalogName>` not found. Consider to set the SQL config `<config>` to a catalog plugin. + ### CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND SQLSTATE: 56000 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala index d376b98afa41..4a8910fde4c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala @@ -21,8 +21,12 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.Experimental @Experimental -class CatalogNotFoundException(message: String, cause: Throwable) - extends SparkException(message, cause) { +class CatalogNotFoundException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable) + extends SparkException(errorClass, messageParameters, cause) { - def this(message: String) = this(message, null) + def this(errorClass: String, messageParameters: Map[String, String]) = + this(errorClass, messageParameters, null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala index 5a49883be408..419191f8f9c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala @@ -53,7 +53,7 @@ private[sql] object Catalogs { _pluginClassName } catch { case _: NoSuchElementException => - throw QueryExecutionErrors.catalogPluginClassNotFoundError(name) + throw QueryExecutionErrors.catalogNotFoundError(name) } val loader = Utils.getContextOrSparkClassLoader try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 24332479f193..113f995968a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1811,9 +1811,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE cause = null) } - def catalogPluginClassNotFoundError(name: String): Throwable = { + def catalogNotFoundError(name: String): Throwable = { new CatalogNotFoundException( - s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined") + errorClass = "CATALOG_NOT_FOUND", + messageParameters = Map( + "catalogName" -> toSQLId(name), + "config" -> toSQLConf(s"spark.sql.catalog.$name"))) } def catalogPluginClassNotImplementedError(name: String, pluginClassName: String): Throwable = { diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java index e6c6a18623b3..238b8ac04e7e 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java @@ -80,11 +80,8 @@ public class CatalogLoadingSuite { SparkException exc = Assertions.assertThrows(CatalogNotFoundException.class, () -> Catalogs.load("missing", conf)); - Assertions.assertTrue( - exc.getMessage().contains("plugin class not found: spark.sql.catalog.missing is not defined"), - "Should complain that implementation is not configured"); - Assertions.assertTrue(exc.getMessage().contains("missing"), - "Should identify the catalog by name"); + Assertions.assertEquals(exc.getErrorClass(), "CATALOG_NOT_FOUND"); + Assertions.assertEquals(exc.getMessageParameters().get("catalogName"), "`missing`"); } @Test diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 2c4215e70287..189509e31736 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -29,7 +29,8 @@ import org.scalatest.matchers.must.Matchers import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, InMemoryTable, InMemoryTableCatalog, Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryTable, InMemoryTableCatalog, Table} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { @@ -60,8 +61,7 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { when(catalogManager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { invocation.getArgument[String](0) match { case CatalogManager.SESSION_CATALOG_NAME => v2Catalog - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala index 0db758d5147f..49e119b56bc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -46,7 +47,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { val manager = mock(classOf[CatalogManager]) when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { val name = invocation.getArgument[String](0) - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name)) }) when(manager.currentCatalog).thenReturn(sessionCatalog) when(manager.v2SessionCatalog).thenReturn(sessionCatalog) @@ -114,7 +115,7 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit val manager = mock(classOf[CatalogManager]) when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { val name = invocation.getArgument[String](0) - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name)) }) when(manager.currentCatalog).thenReturn(catalogs("prod")) when(manager.currentNamespace).thenReturn(Array.empty[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2b93e8bd3200..302a8e5d41db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2763,8 +2763,10 @@ class DataSourceV2SQLSuiteV1Filter exception = intercept[CatalogNotFoundException] { sql("SET CATALOG not_exist_catalog") }, - errorClass = null, - parameters = Map.empty) + errorClass = "CATALOG_NOT_FOUND", + parameters = Map( + "catalogName" -> "`not_exist_catalog`", + "config" -> "\"spark.sql.catalog.not_exist_catalog\"")) } test("SPARK-35973: ShowCatalogs") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala index 2bc747c0abee..ebb719a35a8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala @@ -30,8 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType} @@ -177,7 +178,7 @@ abstract class AlignAssignmentsSuiteBase extends AnalysisTest { invocation.getArguments()(0).asInstanceOf[String] match { case "testcat" => v2Catalog case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog - case name => throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(v2Catalog) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 69b3285fc7f1..db6c7175c526 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCom import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -194,8 +195,7 @@ class PlanResolutionSuite extends AnalysisTest { testCat case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(testCat) @@ -211,8 +211,7 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArguments()(0).asInstanceOf[String] match { case "testcat" => testCat - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(v2SessionCatalog) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org