This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ebd2b78f87fa [SPARK-46727][SQL] Port `classifyException()` in JDBC 
dialects on error classes
ebd2b78f87fa is described below

commit ebd2b78f87fa6086c41d5e6bcade5efeefac75d0
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Jan 16 17:47:09 2024 +0300

    [SPARK-46727][SQL] Port `classifyException()` in JDBC dialects on error 
classes
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to port the existing `classifyException()` method 
which accepts a description to new one w/ an error class added by 
https://github.com/apache/spark/pull/44358. The modified JDBC dialects are: 
DB2, H2, Oracle, MS SQL Server, MySQL and PostgreSQL.
    
    ### Why are the changes needed?
    The old method `classifyException()` which accepts a `description` only has 
been deprecated already by ...
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By existing integration tests, and the modified test suite:
    ```
    $ build/sbt "test:testOnly *JDBCV2Suite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44739 from MaxGekk/port-jdbc-classifyException.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  |  4 +-
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala     | 17 +++++--
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 34 ++++++-------
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 17 +++++--
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 29 ++++++-----
 .../apache/spark/sql/jdbc/PostgresDialect.scala    | 56 ++++++++++++----------
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    |  6 +--
 7 files changed, 91 insertions(+), 72 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index d1d247967b4b..bae274788212 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
             sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
           },
           errorClass = "INDEX_ALREADY_EXISTS",
-          parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+          parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
         )
 
         sql(s"DROP index i1 ON $catalogName.new_table")
@@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
             sql(s"DROP index i1 ON $catalogName.new_table")
           },
           errorClass = "INDEX_NOT_FOUND",
-          parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+          parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
         )
       }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index d5a132c7dd48..f745e466ed9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect {
     s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+  override def classifyException(
+      e: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      description: String): AnalysisException = {
     e match {
       case sqlException: SQLException =>
         sqlException.getSQLState match {
           // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
-          case "42893" => throw NonEmptyNamespaceException(
-            namespace = Array.empty, details = message, cause = Some(e))
-          case _ => super.classifyException(message, e)
+          case "42893" =>
+            throw NonEmptyNamespaceException(
+              namespace = messageParameters.get("namespace").toArray,
+              details = sqlException.getMessage,
+              cause = Some(e))
+          case _ => super.classifyException(e, errorClass, messageParameters, 
description)
         }
-      case _ => super.classifyException(message, e)
+      case _ => super.classifyException(e, errorClass, messageParameters, 
description)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index ae3a3addf7bf..cd151f790adf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -28,8 +28,7 @@ import scala.util.control.NonFatal
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.util.quoteNameParts
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect {
     (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+  override def classifyException(
+      e: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      description: String): AnalysisException = {
     e match {
       case exception: SQLException =>
         // Error codes are from 
https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
@@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect {
             val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
             val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
             val quotedName = 
org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
-            throw new TableAlreadyExistsException(errorClass = 
"TABLE_OR_VIEW_ALREADY_EXISTS",
+            throw new TableAlreadyExistsException(
+              errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
               messageParameters = Map("relationName" -> quotedName),
               cause = Some(e))
           // TABLE_OR_VIEW_NOT_FOUND_1
           case 42102 =>
-            val quotedName = 
quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
+            val relationName = messageParameters.getOrElse("tableName", "")
             throw new NoSuchTableException(
               errorClass = "TABLE_OR_VIEW_NOT_FOUND",
-              messageParameters = Map("relationName" -> quotedName),
+              messageParameters = Map("relationName" -> relationName),
               cause = Some(e))
           // SCHEMA_NOT_FOUND_1
           case 90079 =>
@@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect {
             throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
               messageParameters = Map("schemaName" -> quotedName))
           // INDEX_ALREADY_EXISTS_1
-          case 42111 =>
-            // The message is: Failed to create index indexName in tableName
-            val regex = "(?s)Failed to create index (.*) in (.*)".r
-            val indexName = regex.findFirstMatchIn(message).get.group(1)
-            val tableName = regex.findFirstMatchIn(message).get.group(2)
+          case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+            val indexName = messageParameters("indexName")
+            val tableName = messageParameters("tableName")
             throw new IndexAlreadyExistsException(
               indexName = indexName, tableName = tableName, cause = Some(e))
           // INDEX_NOT_FOUND_1
-          case 42112 =>
-            // The message is: Failed to drop index indexName in tableName
-            val regex = "(?s)Failed to drop index (.*) in (.*)".r
-            val indexName = regex.findFirstMatchIn(message).get.group(1)
-            val tableName = regex.findFirstMatchIn(message).get.group(2)
+          case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+            val indexName = messageParameters("indexName")
+            val tableName = messageParameters("tableName")
             throw new NoSuchIndexException(indexName, tableName, cause = 
Some(e))
           case _ => // do nothing
         }
       case _ => // do nothing
     }
-    super.classifyException(message, e)
+    super.classifyException(e, errorClass, messageParameters, description)
   }
 
   override def compileExpression(expr: Expression): Option[String] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 9776cff3f7c8..aaee6be24e61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect {
     if (limit > 0) s"TOP ($limit)" else ""
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+  override def classifyException(
+      e: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      description: String): AnalysisException = {
     e match {
       case sqlException: SQLException =>
         sqlException.getErrorCode match {
-          case 3729 => throw NonEmptyNamespaceException(
-            namespace = Array.empty, details = message, cause = Some(e))
-          case _ => super.classifyException(message, e)
+          case 3729 =>
+            throw NonEmptyNamespaceException(
+              namespace = messageParameters.get("namespace").toArray,
+              details = sqlException.getMessage,
+              cause = Some(e))
+          case _ => super.classifyException(e, errorClass, messageParameters, 
description)
         }
-      case _ => super.classifyException(message, e)
+      case _ => super.classifyException(e, errorClass, messageParameters, 
description)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index dd74c93bc2e1..cbed1d1e6384 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
     indexMap.values.toArray
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+  override def classifyException(
+      e: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      description: String): AnalysisException = {
     e match {
       case sqlException: SQLException =>
         sqlException.getErrorCode match {
           // ER_DUP_KEYNAME
-          case 1061 =>
-            // The message is: Failed to create index indexName in tableName
-            val regex = "(?s)Failed to create index (.*) in (.*)".r
-            val indexName = regex.findFirstMatchIn(message).get.group(1)
-            val tableName = regex.findFirstMatchIn(message).get.group(2)
-            throw new IndexAlreadyExistsException(
-              indexName = indexName, tableName = tableName, cause = Some(e))
-          case 1091 =>
-            // The message is: Failed to drop index indexName in tableName
-            val regex = "(?s)Failed to drop index (.*) in (.*)".r
-            val indexName = regex.findFirstMatchIn(message).get.group(1)
-            val tableName = regex.findFirstMatchIn(message).get.group(2)
+          case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+            val indexName = messageParameters("indexName")
+            val tableName = messageParameters("tableName")
+            throw new IndexAlreadyExistsException(indexName, tableName, cause 
= Some(e))
+          case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+            val indexName = messageParameters("indexName")
+            val tableName = messageParameters("tableName")
             throw new NoSuchIndexException(indexName, tableName, cause = 
Some(e))
-          case _ => super.classifyException(message, e)
+          case _ => super.classifyException(e, errorClass, messageParameters, 
description)
         }
       case unsupported: UnsupportedOperationException => throw unsupported
-      case _ => super.classifyException(message, e)
+      case _ => super.classifyException(e, errorClass, messageParameters, 
description)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 901e66e5efcb..3eb065a5d4f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with 
SQLConfHelper {
     s"DROP INDEX ${quoteIdentifier(indexName)}"
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+  // Message pattern defined by postgres specification
+  private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already 
exists""".r
+
+  override def classifyException(
+      e: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      description: String): AnalysisException = {
     e match {
       case sqlException: SQLException =>
         sqlException.getSQLState match {
           // https://www.postgresql.org/docs/14/errcodes-appendix.html
           case "42P07" =>
-            // Message patterns defined at caller sides of spark
-            val indexRegex = "(?s)Failed to create index (.*) in (.*)".r
-            val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r
-            // Message pattern defined by postgres specification
-            val pgRegex = """(?:.*)relation "(.*)" already exists""".r
-
-            message match {
-              case indexRegex(index, table) =>
-                throw new IndexAlreadyExistsException(
-                  indexName = index, tableName = table, cause = Some(e))
-              case renameRegex(_, newTable) =>
-                throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
-              case _ if 
pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty =>
-                val tableName = 
pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1)
-                throw QueryCompilationErrors.tableAlreadyExistsError(tableName)
-              case _ => super.classifyException(message, e)
+            if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
+              throw new IndexAlreadyExistsException(
+                indexName = messageParameters("indexName"),
+                tableName = messageParameters("tableName"),
+                cause = Some(e))
+            } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") {
+              val newTable = messageParameters("newName")
+              throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
+            } else {
+              val tblRegexp = 
pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage)
+              if (tblRegexp.nonEmpty) {
+                throw 
QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
+              } else {
+                super.classifyException(e, errorClass, messageParameters, 
description)
+              }
             }
-          case "42704" =>
-            // The message is: Failed to drop index indexName in tableName
-            val regex = "(?s)Failed to drop index (.*) in (.*)".r
-            val indexName = regex.findFirstMatchIn(message).get.group(1)
-            val tableName = regex.findFirstMatchIn(message).get.group(2)
+          case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+            val indexName = messageParameters("indexName")
+            val tableName = messageParameters("tableName")
             throw new NoSuchIndexException(indexName, tableName, cause = 
Some(e))
           case "2BP01" =>
             throw NonEmptyNamespaceException(
-              namespace = Array.empty, details = message, cause = Some(e))
-          case _ => super.classifyException(message, e)
+              namespace = messageParameters.get("namespace").toArray,
+              details = sqlException.getMessage,
+              cause = Some(e))
+          case _ => super.classifyException(e, errorClass, messageParameters, 
description)
         }
       case unsupported: UnsupportedOperationException => throw unsupported
-      case _ => super.classifyException(message, e)
+      case _ => super.classifyException(e, errorClass, messageParameters, 
description)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 05b3787d0ff2..a3990f3cfbb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
       },
       errorClass = "INDEX_ALREADY_EXISTS",
       parameters = Map(
-        "indexName" -> "people_index",
-        "tableName" -> "test.people"
+        "indexName" -> "`people_index`",
+        "tableName" -> "`test`.`people`"
       )
     )
     assert(jdbcTable.indexExists("people_index"))
@@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
         sql(s"DROP INDEX people_index ON TABLE h2.test.people")
       },
       errorClass = "INDEX_NOT_FOUND",
-      parameters = Map("indexName" -> "people_index", "tableName" -> 
"test.people")
+      parameters = Map("indexName" -> "`people_index`", "tableName" -> 
"`test`.`people`")
     )
     assert(jdbcTable.indexExists("people_index") == false)
     val indexes3 = jdbcTable.listIndexes()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to