This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 9ad7b75784da [SPARK-47537][SQL][3.5] Fix error data type mapping on MySQL Connector/J 9ad7b75784da is described below commit 9ad7b75784daa48bf20dd00ae3288c718272fd69 Author: Kent Yao <y...@apache.org> AuthorDate: Mon Mar 25 08:50:00 2024 -0700 [SPARK-47537][SQL][3.5] Fix error data type mapping on MySQL Connector/J ### What changes were proposed in this pull request? This PR fixes: - BIT(n>1) is wrongly mapping to boolean instead of long for MySQL Connector/J. This is because we only have a case branch for Maria Connector/J. - MySQL Docker Integration Tests were using Maria Connector/J, not MySQL Connector/J ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45690 from yaooqinn/SPARK-47537-B. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 47 +++++++++++++++++++++- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 38 +++++++++++++---- .../spark/sql/jdbc/v2/MySQLNamespaceSuite.scala | 4 +- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 +++ 4 files changed, 84 insertions(+), 10 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index dcf4225d522d..68d88fbc552a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -43,7 +43,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { override val usesIpc = false override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = - s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass" + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&disableMariaDbDriver" } override def dataPreparation(conn: Connection): Unit = { @@ -75,6 +75,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { "'jumps', 'over', 'the', 'lazy', 'dog', '{\"status\": \"merrily\"}')").executeUpdate() } + def testConnection(): Unit = { + val conn = getConnection() + try { + assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl") + } finally { + conn.close() + } + } + + test("SPARK-47537: ensure use the right jdbc driver") { + testConnection() + } + test("Basic test") { val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties) val rows = df.collect() @@ -200,3 +213,35 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(sql("select x, y from queryOption").collect.toSet == expectedResult) } } + +/** + * To run this test suite for a specific version (e.g., mysql:8.3.0): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0 + * ./build/sbt -Pdocker-integration-tests + * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite" + * }}} + */ +@DockerTest +class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite { + + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:8.0.31") + override val env = Map( + "MYSQL_ROOT_PASSWORD" -> "rootpass" + ) + override val usesIpc = false + override val jdbcPort: Int = 3306 + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass" + } + + override def testConnection(): Unit = { + val conn = getConnection() + try { + assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection") + } finally { + conn.close() + } + } +} diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 719b858b87b6..f6f264804e7d 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -68,8 +68,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = - s"jdbc:mysql://$ip:$port/" + - s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false" + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + "&useSSL=false&disableMariaDbDriver" } override def sparkConf: SparkConf = super.sparkConf @@ -83,11 +83,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest private var mySQLVersion = -1 - override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() - .putLong("scale", 0) - .putBoolean("isSigned", true) - .build() - override def tablePreparation(connection: Connection): Unit = { mySQLVersion = connection.getMetaData.getDatabaseMajorVersion connection.prepareStatement( @@ -172,3 +167,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest } } } + +/** + * To run this test suite for a specific version (e.g., mysql:8.3.0): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0 + * ./build/sbt -Pdocker-integration-tests + * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite" + * }}} + */ +@DockerTest +class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite { + override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isSigned", true) + .build() + + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:8.0.31") + override val env = Map( + "MYSQL_ROOT_PASSWORD" -> "rootpass" + ) + override val usesIpc = false + override val jdbcPort: Int = 3306 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + "&useSSL=false" + } +} diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala index d58146fecdf4..8b889f8509f5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala @@ -45,8 +45,8 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespac override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = - s"jdbc:mysql://$ip:$port/" + - s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false" + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + "&useSSL=false&disableMariaDbDriver" } val map = new CaseInsensitiveStringMap( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index bf27b748e24e..ef2be1c9c5c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -89,10 +89,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) { + // MariaDB connector behaviour // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as // byte arrays instead of longs. md.putLong("binarylong", 1) Option(LongType) + } else if (sqlType == Types.BIT && size > 1) { + // MySQL connector behaviour + md.putLong("binarylong", 1) + Option(LongType) } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) { Option(BooleanType) } else if ("TINYTEXT".equalsIgnoreCase(typeName)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org