This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2957bc9fa1f0 [SPARK-46608][SQL] Restore backward compatibility of `JdbcDialect.classifyException` 2957bc9fa1f0 is described below commit 2957bc9fa1f0d49f6fcfbf5ebcd43df1a76504b6 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Jan 8 17:01:50 2024 +0300 [SPARK-46608][SQL] Restore backward compatibility of `JdbcDialect.classifyException` ### What changes were proposed in this pull request? In the PR, I propose to restore `classifyException()` of `JdbcDialect` before the commit https://github.com/apache/spark/commit/14a933bbe2eb1c71988f475036735f07b2e1fa6a, and extends `classifyException()` with the error class parameter by `description`: ```scala def classifyException( e: Throwable, errorClass: String, messageParameters: Map[String, String], description: String): AnalysisException ``` The `description` parameter has the same meaning as `message` in the old version of `classifyException()` which is deprecated. Also old implementation of `classifyException()` has been restored in JDBC dialects: MySQL, PostgreSQL and so on. ### Why are the changes needed? To restore compatibility with existing JDBC dialects. ### Does this PR introduce _any_ user-facing change? No, this PR restores the behaviour prior https://github.com/apache/spark/pull/44358. ### How was this patch tested? By running the affected test suite: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" ``` and modified test suite: ``` $ build/sbt "test:testOnly *JDBCV2Suite" $ build/sbt "test:testOnly *JDBCTableCatalogSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44449 from MaxGekk/restore-jdbc-classifyException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 5 ++ .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 6 +-- ...sql-error-conditions-failed-jdbc-error-class.md | 4 ++ .../sql/execution/datasources/jdbc/JdbcUtils.scala | 6 ++- .../execution/datasources/v2/jdbc/JDBCTable.scala | 6 ++- .../datasources/v2/jdbc/JDBCTableCatalog.scala | 33 ++++++++----- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 16 ++----- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 33 ++++++------- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 22 ++++++++- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 16 ++----- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 28 ++++++----- .../apache/spark/sql/jdbc/PostgresDialect.scala | 55 ++++++++++------------ .../v2/jdbc/JDBCTableCatalogSuite.scala | 29 +++++------- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +-- 14 files changed, 143 insertions(+), 122 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 1812e9d76450..c7f8f59a7679 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1185,6 +1185,11 @@ "message" : [ "Check that the table <tableName> exists." ] + }, + "UNCLASSIFIED" : { + "message" : [ + "<message>" + ] } }, "sqlState" : "HV000" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b93106b0ce78..d1d247967b4b 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -224,7 +224,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") } - assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") + assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED") testCreateTableWithProperty(s"$catalogName.new_table") } } @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") + parameters = Map("indexName" -> "i1", "tableName" -> "new_table") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") + parameters = Map("indexName" -> "i1", "tableName" -> "new_table") ) } } diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md index 575441e3f347..1740ad49e7f3 100644 --- a/docs/sql-error-conditions-failed-jdbc-error-class.md +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -77,4 +77,8 @@ Rename the table `<oldName>` to `<newName>`. Check that the table `<tableName>` exists. +## UNCLASSIFIED + +`<message>` + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e7835514a384..467c489a50fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1183,12 +1183,14 @@ object JdbcUtils extends Logging with SQLConfHelper { def classifyException[T]( errorClass: String, messageParameters: Map[String, String], - dialect: JdbcDialect)(f: => T): T = { + dialect: JdbcDialect, + description: String)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e - case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters) + case e: Throwable => + throw dialect.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 120a68075a8f..6828bb0f0c4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -69,7 +69,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + description = s"Failed to create index $indexName in ${name()}") { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -90,7 +91,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + description = s"Failed to drop index $indexName in ${name()}") { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 2eb61b3d01fc..a5ef9bd5e6b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + description = s"Failed get tables from: ${namespace.mkString(".")}") { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -92,7 +93,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table existence check: $ident") { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -118,7 +120,8 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.getRedactUrl(), "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), - dialect) { + dialect, + description = s"Failed table renaming from $oldIdent to $newIdent") { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -183,7 +186,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table creation: $ident") { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -199,7 +203,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table altering: $ident") { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -214,7 +219,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + description = s"Failed namespace exists: ${namespace.mkString}") { JdbcUtils.schemaExists(conn, options, db) } } @@ -226,7 +232,8 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.getRedactUrl()), - dialect) { + dialect, + description = s"Failed list namespaces") { JdbcUtils.listSchemas(conn, options) } } @@ -279,7 +286,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed create name space: $db") { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -303,7 +311,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed create comment on name space: $db") { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -319,7 +328,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed remove comment on name space: $db") { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -346,7 +356,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed drop name space: $db") { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 4f81ee031d22..d5a132c7dd48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,22 +144,16 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => - throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case "42893" => throw NonEmptyNamespaceException( + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index c3b4092c8e37..fd20e495b10f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,7 +28,8 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.util.quoteNameParts import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -194,10 +195,7 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -208,16 +206,15 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException( - errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val relationName = messageParameters.getOrElse("tableName", "") + val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> relationName), + messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -227,21 +224,25 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 42111 => + // The message is: Failed to create index indexName in tableName + val regex = "(?s)Failed to create index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 42112 => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(e, errorClass, messageParameters) + super.classifyException(message, e) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 87d3e2d97eba..888ef4a20be3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -633,13 +633,31 @@ abstract class JdbcDialect extends Serializable with Logging { * @param e The dialect specific exception. * @param errorClass The error class assigned in the case of an unclassified `e` * @param messageParameters The message parameters of `errorClass` + * @param description The error description * @return `AnalysisException` or its sub-class. */ def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { - new AnalysisException(errorClass, messageParameters, cause = Some(e)) + messageParameters: Map[String, String], + description: String): AnalysisException = { + classifyException(description, e) + } + + /** + * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param message The error message to be placed to the returned exception. + * @param e The dialect specific exception. + * @return `AnalysisException` or its sub-class. + */ + @deprecated("Please override the classifyException method with an error class", "4.0.0") + def classifyException(message: String, e: Throwable): AnalysisException = { + new AnalysisException( + errorClass = "FAILED_JDBC.UNCLASSIFIED", + messageParameters = Map( + "url" -> "jdbc:", + "message" -> message), + cause = Some(e)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index f63a1abdce65..9776cff3f7c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,21 +190,15 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => - throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case 3729 => throw NonEmptyNamespaceException( + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index af50a8e3e359..dd74c93bc2e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,26 +270,28 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") - throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) - case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 1061 => + // The message is: Failed to create index indexName in tableName + val regex = "(?s)Failed to create index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) + throw new IndexAlreadyExistsException( + indexName = indexName, tableName = tableName, cause = Some(e)) + case 1091 => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 4637a96039b8..901e66e5efcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,47 +225,42 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - // Message pattern defined by postgres specification - private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - if (errorClass == "FAILED_JDBC.CREATE_INDEX") { - throw new IndexAlreadyExistsException( - indexName = messageParameters("indexName"), - tableName = messageParameters("tableName"), - cause = Some(e)) - } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { - val newTable = messageParameters("newName") - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - } else { - val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) - if (tblRegexp.nonEmpty) { - throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) - } else { - super.classifyException(e, errorClass, messageParameters) - } + // Message patterns defined at caller sides of spark + val indexRegex = "(?s)Failed to create index (.*) in (.*)".r + val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r + // Message pattern defined by postgres specification + val pgRegex = """(?:.*)relation "(.*)" already exists""".r + + message match { + case indexRegex(index, table) => + throw new IndexAlreadyExistsException( + indexName = index, tableName = table, cause = Some(e)) + case renameRegex(_, newTable) => + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => + val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) + throw QueryCompilationErrors.tableAlreadyExistsError(tableName) + case _ => super.classifyException(message, e) } - case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case "42704" => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case "2BP01" => throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index fa35c0f33ced..5408d434fced 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -161,7 +161,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withTable("h2.test.new_table") { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)") - val e = intercept[AnalysisException] { + val e = intercept[TableAlreadyExistsException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)") } checkErrorTableAlreadyExists(e, "`test`.`new_table`") @@ -554,22 +554,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { - val sqlText = "CREATE TABLE h2.test.new_table(i INT, j STRING)" + - " TBLPROPERTIES('ENGINE'='tableEngineName')" checkError( - exception = intercept[AnalysisException] { sql(sqlText) }, - errorClass = "FAILED_JDBC.CREATE_TABLE", + exception = intercept[AnalysisException] { + sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + + " TBLPROPERTIES('ENGINE'='tableEngineName')") + }, + errorClass = "FAILED_JDBC.UNCLASSIFIED", parameters = Map( - "url" -> url, - "tableName" -> "`test`.`new_table`")) - withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> ".*password=.*") { - checkError( - exception = intercept[AnalysisException] { sql(sqlText) }, - errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map( - "url" -> "*********(redacted)", - "tableName" -> "`test`.`new_table`")) - } + "url" -> "jdbc:", + "message" -> "Failed table creation: test.new_table")) } } @@ -585,10 +578,10 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException]{ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, - errorClass = "FAILED_JDBC.CREATE_TABLE", + errorClass = "FAILED_JDBC.UNCLASSIFIED", parameters = Map( - "url" -> url, - "tableName" -> "`test`.`new_table`")) + "url" -> "jdbc:", + "message" -> "Failed table creation: test.new_table")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 5e04fca92f4b..0a66680edd63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2924,8 +2924,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel }, errorClass = "INDEX_ALREADY_EXISTS", parameters = Map( - "indexName" -> "`people_index`", - "tableName" -> "`test`.`people`" + "indexName" -> "people_index", + "tableName" -> "test.people" ) ) assert(jdbcTable.indexExists("people_index")) @@ -2941,7 +2941,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") + parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org