This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 824d67052c75 [SPARK-47537][SQL] Fix error data type mapping on MySQL 
Connector/J
824d67052c75 is described below

commit 824d67052c7542594fe98405b5062593d90233ee
Author: Kent Yao <y...@apache.org>
AuthorDate: Sun Mar 24 23:04:44 2024 -0700

    [SPARK-47537][SQL] Fix error data type mapping on MySQL Connector/J
    
    ### What changes were proposed in this pull request?
    
    This PR fixes:
    - BIT(n>1) is wrongly mapping to boolean instead of long for MySQL 
Connector/J. This is because we only have a case branch for Maria Connector/J.
    - MySQL Docker Integration Tests were using Maria Connector/J, not MySQL 
Connector/J
    
    ### Why are the changes needed?
    
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45689 from yaooqinn/SPARK-47537.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/jdbc/MySQLDatabaseOnDocker.scala     |  4 +-
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     | 45 ++++++++++++++++++----
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 29 +++++++++++---
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  5 +++
 4 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
index 87b13a06d965..568eb5f10973 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
@@ -26,6 +26,6 @@ class MySQLDatabaseOnDocker extends DatabaseOnDocker {
   override val jdbcPort: Int = 3306
 
   override def getJdbcUrl(ip: String, port: Int): String =
-    s"jdbc:mysql://$ip:$port/" +
-      
s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+    
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+      s"&useSSL=false&disableMariaDbDriver"
 }
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 921e63acf7e1..09eb99c25227 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -22,9 +22,10 @@ import java.sql.{Connection, Date, Timestamp}
 import java.time.LocalDateTime
 import java.util.Properties
 
+import scala.util.Using
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StructType}
 import org.apache.spark.tags.DockerTest
 
 /**
@@ -85,6 +86,16 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
       .executeUpdate()
   }
 
+  def testConnection(): Unit = {
+    Using.resource(getConnection()) { conn =>
+      assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl")
+    }
+  }
+
+  test("SPARK-47537: ensure use the right jdbc driver") {
+    testConnection()
+  }
+
   test("Basic test") {
     val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
     val rows = df.collect()
@@ -246,13 +257,6 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     checkAnswer(df, Row(true, true, true))
     df.write.mode("append").jdbc(jdbcUrl, "bools", new Properties)
     checkAnswer(df, Seq(Row(true, true, true), Row(true, true, true)))
-    val mb = new MetadataBuilder()
-      .putBoolean("isTimestampNTZ", false)
-      .putLong("scale", 0)
-    assert(df.schema === new StructType()
-      .add("b1", BooleanType, nullable = true, mb.putBoolean("isSigned", 
true).build())
-      .add("b2", BooleanType, nullable = true, mb.putBoolean("isSigned", 
false).build())
-      .add("b3", BooleanType, nullable = true, mb.putBoolean("isSigned", 
true).build()))
   }
 
   test("SPARK-47515: Save TimestampNTZType as DATETIME in MySQL") {
@@ -272,3 +276,28 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     checkAnswer(df, Row(1.23f, 4.56f, 7.89d, 1.23d, 4.56d, 7.89d))
   }
 }
+
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ *   ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ *     ./build/sbt -Pdocker-integration-tests
+ *     "docker-integration-tests/testOnly 
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+
+  override val db = new MySQLDatabaseOnDocker {
+    override def getJdbcUrl(ip: String, port: Int): String =
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+        s"&useSSL=false"
+  }
+
+  override def testConnection(): Unit = {
+    Using.resource(getConnection()) { conn =>
+      assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection")
+    }
+  }
+}
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 5a25509fd003..4997d335fda6 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -72,12 +72,6 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
 
   private var mySQLVersion = -1
 
-  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
-    .putLong("scale", 0)
-    .putBoolean("isTimestampNTZ", false)
-    .putBoolean("isSigned", true)
-    .build()
-
   override def tablePreparation(connection: Connection): Unit = {
     mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
     connection.prepareStatement(
@@ -162,3 +156,26 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
     }
   }
 }
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ *   ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ *     ./build/sbt -Pdocker-integration-tests
+ *     "docker-integration-tests/testOnly 
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", true)
+    .build()
+
+  override val db = new MySQLDatabaseOnDocker {
+    override def getJdbcUrl(ip: String, port: Int): String =
+      
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
 +
+        s"&useSSL=false"
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5cd49961554b..2735abfe9c39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -94,10 +94,15 @@ private case class MySQLDialect() extends JdbcDialect with 
SQLConfHelper {
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     sqlType match {
       case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
+        // MariaDB connector behaviour
         // This could instead be a BinaryType if we'd rather return 
bit-vectors of up to 64 bits as
         // byte arrays instead of longs.
         md.putLong("binarylong", 1)
         Some(LongType)
+      case Types.BIT if size > 1 =>
+        // MySQL connector behaviour
+        md.putLong("binarylong", 1)
+        Some(LongType)
       case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) =>
         Some(BooleanType)
       case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to