This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 9ad7b75784da [SPARK-47537][SQL][3.5] Fix error data type mapping on 
MySQL Connector/J
9ad7b75784da is described below

commit 9ad7b75784daa48bf20dd00ae3288c718272fd69
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Mar 25 08:50:00 2024 -0700

    [SPARK-47537][SQL][3.5] Fix error data type mapping on MySQL Connector/J
    
    ### What changes were proposed in this pull request?
    
    This PR fixes:
    - BIT(n>1) is wrongly mapping to boolean instead of long for MySQL 
Connector/J. This is because we only have a case branch for Maria Connector/J.
    - MySQL Docker Integration Tests were using Maria Connector/J, not MySQL 
Connector/J
    
    ### Why are the changes needed?
    
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45690 from yaooqinn/SPARK-47537-B.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     | 47 +++++++++++++++++++++-
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 38 +++++++++++++----
 .../spark/sql/jdbc/v2/MySQLNamespaceSuite.scala    |  4 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  5 +++
 4 files changed, 84 insertions(+), 10 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index dcf4225d522d..68d88fbc552a 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -43,7 +43,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     override val usesIpc = false
     override val jdbcPort: Int = 3306
     override def getJdbcUrl(ip: String, port: Int): String =
-      s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&disableMariaDbDriver"
   }
 
   override def dataPreparation(conn: Connection): Unit = {
@@ -75,6 +75,19 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
       "'jumps', 'over', 'the', 'lazy', 'dog', '{\"status\": 
\"merrily\"}')").executeUpdate()
   }
 
+  def testConnection(): Unit = {
+    val conn = getConnection()
+    try {
+      assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl")
+    } finally {
+      conn.close()
+    }
+  }
+
+  test("SPARK-47537: ensure use the right jdbc driver") {
+    testConnection()
+  }
+
   test("Basic test") {
     val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
     val rows = df.collect()
@@ -200,3 +213,35 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
   }
 }
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ *   ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ *     ./build/sbt -Pdocker-integration-tests
+ *     "docker-integration-tests/testOnly 
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+
+  override val db = new DatabaseOnDocker {
+    override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", 
"mysql:8.0.31")
+    override val env = Map(
+      "MYSQL_ROOT_PASSWORD" -> "rootpass"
+    )
+    override val usesIpc = false
+    override val jdbcPort: Int = 3306
+    override def getJdbcUrl(ip: String, port: Int): String =
+      s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+  }
+
+  override def testConnection(): Unit = {
+    val conn = getConnection()
+    try {
+      assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection")
+    } finally {
+      conn.close()
+    }
+  }
+}
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 719b858b87b6..f6f264804e7d 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -68,8 +68,8 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
     override val jdbcPort: Int = 3306
 
     override def getJdbcUrl(ip: String, port: Int): String =
-      s"jdbc:mysql://$ip:$port/" +
-        
s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+        "&useSSL=false&disableMariaDbDriver"
   }
 
   override def sparkConf: SparkConf = super.sparkConf
@@ -83,11 +83,6 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
 
   private var mySQLVersion = -1
 
-  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
-    .putLong("scale", 0)
-    .putBoolean("isSigned", true)
-    .build()
-
   override def tablePreparation(connection: Connection): Unit = {
     mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
     connection.prepareStatement(
@@ -172,3 +167,32 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
     }
   }
 }
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ *   ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ *     ./build/sbt -Pdocker-integration-tests
+ *     "docker-integration-tests/testOnly 
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isSigned", true)
+    .build()
+
+  override val db = new DatabaseOnDocker {
+    override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", 
"mysql:8.0.31")
+    override val env = Map(
+      "MYSQL_ROOT_PASSWORD" -> "rootpass"
+    )
+    override val usesIpc = false
+    override val jdbcPort: Int = 3306
+
+    override def getJdbcUrl(ip: String, port: Int): String =
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+        "&useSSL=false"
+  }
+}
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
index d58146fecdf4..8b889f8509f5 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
@@ -45,8 +45,8 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite 
with V2JDBCNamespac
     override val jdbcPort: Int = 3306
 
     override def getJdbcUrl(ip: String, port: Int): String =
-      s"jdbc:mysql://$ip:$port/" +
-        
s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+        "&useSSL=false&disableMariaDbDriver"
   }
 
   val map = new CaseInsensitiveStringMap(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index bf27b748e24e..ef2be1c9c5c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -89,10 +89,15 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
+      // MariaDB connector behaviour
       // This could instead be a BinaryType if we'd rather return bit-vectors 
of up to 64 bits as
       // byte arrays instead of longs.
       md.putLong("binarylong", 1)
       Option(LongType)
+    } else if (sqlType == Types.BIT && size > 1) {
+      // MySQL connector behaviour
+      md.putLong("binarylong", 1)
+      Option(LongType)
     } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
       Option(BooleanType)
     } else if ("TINYTEXT".equalsIgnoreCase(typeName)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to