This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 19ac1fc13646 [SPARK-47406][SQL] Handle TIMESTAMP and DATETIME in MYSQLDialect 19ac1fc13646 is described below commit 19ac1fc13646e982ef76718b5e7a0f0e5147794e Author: Kent Yao <y...@apache.org> AuthorDate: Fri Mar 15 16:38:08 2024 +0800 [SPARK-47406][SQL] Handle TIMESTAMP and DATETIME in MYSQLDialect ### What changes were proposed in this pull request? In MySQL, TIMESTAMP and DATETIME are different. The former is a TIMESTAMP WITH LOCAL TIME ZONE and the latter is a TIMESTAMP WITHOUT TIME ZONE Following [SPARK-47375](https://issues.apache.org/jira/browse/SPARK-47375), MySql TIMESTAMP goes directly to TimestampType, DATETIME's mapping is decided by preferTimestampNTZ. ### Why are the changes needed? align the guidelines for jdbc timestamps ### Does this PR introduce _any_ user-facing change? yes,migration guide provided ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? Closes #45530 from yaooqinn/SPARK-47406. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 14 +++++++ docs/sql-migration-guide.md | 1 + .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 +++-- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 7 ++++ .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 45 +++++++++++++--------- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 48b94cf28a63..b1d239337aa0 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} +import java.time.LocalDateTime import java.util.Properties import org.apache.spark.sql.Row @@ -134,6 +135,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { } } + test("SPARK-47406: MySQL datetime types with preferTimestampNTZ") { + withDefaultTimeZone(UTC) { + val df = sqlContext.read.option("preferTimestampNTZ", true) + .jdbc(jdbcUrl, "dates", new Properties) + checkAnswer(df, Row( + Date.valueOf("1991-11-09"), + LocalDateTime.of(1970, 1, 1, 13, 31, 24), + LocalDateTime.of(1996, 1, 1, 1, 23, 45), + Timestamp.valueOf("2009-02-13 23:31:30"), + Date.valueOf("2001-01-01"))) + } + } + test("String types") { val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) val rows = df.collect() diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 28fa19c351fc..27b62a6bd792 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -41,6 +41,7 @@ license: | - Since Spark 4.0, the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to change `strfmt` of the `format_string` function to use 1-based indexes. The first argument must be referenced by "1$", the second by "2$", etc. - Since Spark 4.0, the function `to_csv` no longer supports input with the data type `STRUCT`, `ARRAY`, `MAP`, `VARIANT` and `BINARY` (because the `CSV specification` does not have standards for these data types and cannot be read back using `from_csv`), Spark will throw `DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` exception. - Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert Postgres TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types to TimestampNTZType, which is available in Spark 3.5. +- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert MySQL TIMESTAMP to TimestampNTZType, which is available in Spark 3.5. MySQL DATETIME is not affected. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b037d862fa1a..393f09b6075e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -167,6 +167,10 @@ object JdbcUtils extends Logging with SQLConfHelper { throw QueryExecutionErrors.cannotGetJdbcTypeError(dt)) } + def getTimestampType(isTimestampNTZ: Boolean): DataType = { + if (isTimestampNTZ) TimestampNTZType else TimestampType + } + /** * Maps a JDBC type to a Catalyst type. This function is called only when * the JdbcDialect class corresponding to your database driver returns null. @@ -211,10 +215,8 @@ object JdbcUtils extends Logging with SQLConfHelper { case java.sql.Types.SMALLINT => IntegerType case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType - case java.sql.Types.TIME if isTimestampNTZ => TimestampNTZType - case java.sql.Types.TIME => TimestampType - case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType - case java.sql.Types.TIMESTAMP => TimestampType + case java.sql.Types.TIME => getTimestampType(isTimestampNTZ) + case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ) case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7c5e476d9786..7d2812d48cae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -781,6 +781,13 @@ abstract class JdbcDialect extends Serializable with Logging { def getFullyQualifiedQuotedTableName(ident: Identifier): String = { (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".") } + + /** + * Return TimestampType/TimestampNTZType based on the metadata. + */ + protected final def getTimestampType(md: Metadata): DataType = { + JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ")) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 7c63477d4584..42b1a3a2854e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType, TimestampType} private case object MySQLDialect extends JdbcDialect with SQLConfHelper { @@ -92,23 +92,32 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { - if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) { - // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as - // byte arrays instead of longs. - md.putLong("binarylong", 1) - Option(LongType) - } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) { - Option(BooleanType) - } else if ("TINYTEXT".equalsIgnoreCase(typeName)) { - // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason - Some(StringType) - } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) { - // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. - // Explicitly converts it into StringType here. - Some(StringType) - } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) { - Some(ByteType) - } else None + sqlType match { + case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 => + // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as + // byte arrays instead of longs. + md.putLong("binarylong", 1) + Some(LongType) + case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) => + Some(BooleanType) + case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) => + // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason + Some(StringType) + case Types.VARCHAR if "JSON".equalsIgnoreCase(typeName) => + // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. + // Explicitly converts it into StringType here. + Some(StringType) + case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) => + Some(ByteType) + case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) => + // scalastyle:off line.size.limit + // In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE + // https://github.com/mysql/mysql-connector-j/blob/8.3.0/src/main/core-api/java/com/mysql/cj/MysqlType.java#L251 + // scalastyle:on line.size.limit + Some(getTimestampType(md.build())) + case Types.TIMESTAMP => Some(TimestampType) + case _ => None + } } override def quoteIdentifier(colName: String): String = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org