This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 824d67052c75 [SPARK-47537][SQL] Fix error data type mapping on MySQL Connector/J 824d67052c75 is described below commit 824d67052c7542594fe98405b5062593d90233ee Author: Kent Yao <y...@apache.org> AuthorDate: Sun Mar 24 23:04:44 2024 -0700 [SPARK-47537][SQL] Fix error data type mapping on MySQL Connector/J ### What changes were proposed in this pull request? This PR fixes: - BIT(n>1) is wrongly mapping to boolean instead of long for MySQL Connector/J. This is because we only have a case branch for Maria Connector/J. - MySQL Docker Integration Tests were using Maria Connector/J, not MySQL Connector/J ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45689 from yaooqinn/SPARK-47537. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/jdbc/MySQLDatabaseOnDocker.scala | 4 +- .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 45 ++++++++++++++++++---- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 29 +++++++++++--- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 +++ 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala index 87b13a06d965..568eb5f10973 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala @@ -26,6 +26,6 @@ class MySQLDatabaseOnDocker extends DatabaseOnDocker { override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = - s"jdbc:mysql://$ip:$port/" + - s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false" + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + s"&useSSL=false&disableMariaDbDriver" } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 921e63acf7e1..09eb99c25227 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -22,9 +22,10 @@ import java.sql.{Connection, Date, Timestamp} import java.time.LocalDateTime import java.util.Properties +import scala.util.Using + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StructType} import org.apache.spark.tags.DockerTest /** @@ -85,6 +86,16 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { .executeUpdate() } + def testConnection(): Unit = { + Using.resource(getConnection()) { conn => + assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl") + } + } + + test("SPARK-47537: ensure use the right jdbc driver") { + testConnection() + } + test("Basic test") { val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties) val rows = df.collect() @@ -246,13 +257,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { checkAnswer(df, Row(true, true, true)) df.write.mode("append").jdbc(jdbcUrl, "bools", new Properties) checkAnswer(df, Seq(Row(true, true, true), Row(true, true, true))) - val mb = new MetadataBuilder() - .putBoolean("isTimestampNTZ", false) - .putLong("scale", 0) - assert(df.schema === new StructType() - .add("b1", BooleanType, nullable = true, mb.putBoolean("isSigned", true).build()) - .add("b2", BooleanType, nullable = true, mb.putBoolean("isSigned", false).build()) - .add("b3", BooleanType, nullable = true, mb.putBoolean("isSigned", true).build())) } test("SPARK-47515: Save TimestampNTZType as DATETIME in MySQL") { @@ -272,3 +276,28 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { checkAnswer(df, Row(1.23f, 4.56f, 7.89d, 1.23d, 4.56d, 7.89d)) } } + + +/** + * To run this test suite for a specific version (e.g., mysql:8.3.0): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0 + * ./build/sbt -Pdocker-integration-tests + * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite" + * }}} + */ +@DockerTest +class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite { + + override val db = new MySQLDatabaseOnDocker { + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + s"&useSSL=false" + } + + override def testConnection(): Unit = { + Using.resource(getConnection()) { conn => + assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection") + } + } +} diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 5a25509fd003..4997d335fda6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -72,12 +72,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest private var mySQLVersion = -1 - override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() - .putLong("scale", 0) - .putBoolean("isTimestampNTZ", false) - .putBoolean("isSigned", true) - .build() - override def tablePreparation(connection: Connection): Unit = { mySQLVersion = connection.getMetaData.getDatabaseMajorVersion connection.prepareStatement( @@ -162,3 +156,26 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest } } } + +/** + * To run this test suite for a specific version (e.g., mysql:8.3.0): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0 + * ./build/sbt -Pdocker-integration-tests + * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite" + * }}} + */ +@DockerTest +class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite { + override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", true) + .build() + + override val db = new MySQLDatabaseOnDocker { + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" + + s"&useSSL=false" + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5cd49961554b..2735abfe9c39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -94,10 +94,15 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 => + // MariaDB connector behaviour // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as // byte arrays instead of longs. md.putLong("binarylong", 1) Some(LongType) + case Types.BIT if size > 1 => + // MySQL connector behaviour + md.putLong("binarylong", 1) + Some(LongType) case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) => Some(BooleanType) case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org