This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ac1fc13646 [SPARK-47406][SQL] Handle TIMESTAMP and DATETIME in 
MYSQLDialect
19ac1fc13646 is described below

commit 19ac1fc13646e982ef76718b5e7a0f0e5147794e
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri Mar 15 16:38:08 2024 +0800

    [SPARK-47406][SQL] Handle TIMESTAMP and DATETIME in MYSQLDialect
    
    ### What changes were proposed in this pull request?
    
    In MySQL, TIMESTAMP and DATETIME are different. The former is a TIMESTAMP 
WITH LOCAL TIME ZONE and the latter is a TIMESTAMP WITHOUT TIME ZONE
    
    Following [SPARK-47375](https://issues.apache.org/jira/browse/SPARK-47375), 
MySql TIMESTAMP goes directly to TimestampType, DATETIME's mapping is decided 
by preferTimestampNTZ.
    
    ### Why are the changes needed?
    
    align the guidelines for jdbc timestamps
    ### Does this PR introduce _any_ user-facing change?
    
    yes,migration guide provided
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #45530 from yaooqinn/SPARK-47406.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     | 14 +++++++
 docs/sql-migration-guide.md                        |  1 +
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 +++--
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  7 ++++
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 45 +++++++++++++---------
 5 files changed, 55 insertions(+), 22 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 48b94cf28a63..b1d239337aa0 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
+import java.time.LocalDateTime
 import java.util.Properties
 
 import org.apache.spark.sql.Row
@@ -134,6 +135,19 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     }
   }
 
+  test("SPARK-47406: MySQL datetime types with preferTimestampNTZ") {
+    withDefaultTimeZone(UTC) {
+      val df = sqlContext.read.option("preferTimestampNTZ", true)
+        .jdbc(jdbcUrl, "dates", new Properties)
+      checkAnswer(df, Row(
+        Date.valueOf("1991-11-09"),
+        LocalDateTime.of(1970, 1, 1, 13, 31, 24),
+        LocalDateTime.of(1996, 1, 1, 1, 23, 45),
+        Timestamp.valueOf("2009-02-13 23:31:30"),
+        Date.valueOf("2001-01-01")))
+    }
+  }
+
   test("String types") {
     val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
     val rows = df.collect()
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 28fa19c351fc..27b62a6bd792 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -41,6 +41,7 @@ license: |
 - Since Spark 4.0, the SQL config 
`spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to 
change `strfmt` of the `format_string` function to use 1-based indexes. The 
first argument must be referenced by "1$", the second by "2$", etc.
 - Since Spark 4.0, the function `to_csv` no longer supports input with the 
data type `STRUCT`, `ARRAY`, `MAP`, `VARIANT` and `BINARY` (because the `CSV 
specification` does not have standards for these data types and cannot be read 
back using `from_csv`), Spark will throw 
`DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` exception.
 - Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert 
Postgres TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types to 
TimestampNTZType, which is available in Spark 3.5. 
+- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert 
MySQL TIMESTAMP to TimestampNTZType, which is available in Spark 3.5. MySQL 
DATETIME is not affected.
 
 ## Upgrading from Spark SQL 3.4 to 3.5
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b037d862fa1a..393f09b6075e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -167,6 +167,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
       throw QueryExecutionErrors.cannotGetJdbcTypeError(dt))
   }
 
+  def getTimestampType(isTimestampNTZ: Boolean): DataType = {
+    if (isTimestampNTZ) TimestampNTZType else TimestampType
+  }
+
   /**
    * Maps a JDBC type to a Catalyst type.  This function is called only when
    * the JdbcDialect class corresponding to your database driver returns null.
@@ -211,10 +215,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
     case java.sql.Types.SMALLINT => IntegerType
     case java.sql.Types.SQLXML => StringType
     case java.sql.Types.STRUCT => StringType
-    case java.sql.Types.TIME if isTimestampNTZ => TimestampNTZType
-    case java.sql.Types.TIME => TimestampType
-    case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType
-    case java.sql.Types.TIMESTAMP => TimestampType
+    case java.sql.Types.TIME => getTimestampType(isTimestampNTZ)
+    case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ)
     case java.sql.Types.TINYINT => IntegerType
     case java.sql.Types.VARBINARY => BinaryType
     case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 7c5e476d9786..7d2812d48cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -781,6 +781,13 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   def getFullyQualifiedQuotedTableName(ident: Identifier): String = {
     (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
   }
+
+  /**
+   * Return TimestampType/TimestampNTZType based on the metadata.
+   */
+  protected final def getTimestampType(md: Metadata): DataType = {
+    JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ"))
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 7c63477d4584..42b1a3a2854e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference, NullOrdering, SortDirection}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, 
LongType, MetadataBuilder, StringType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, 
LongType, MetadataBuilder, StringType, TimestampType}
 
 private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
 
@@ -92,23 +92,32 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
 
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
-    if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
-      // This could instead be a BinaryType if we'd rather return bit-vectors 
of up to 64 bits as
-      // byte arrays instead of longs.
-      md.putLong("binarylong", 1)
-      Option(LongType)
-    } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
-      Option(BooleanType)
-    } else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
-      // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for 
historical reason
-      Some(StringType)
-    } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) {
-      // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a 
precision of -1.
-      // Explicitly converts it into StringType here.
-      Some(StringType)
-    } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) {
-      Some(ByteType)
-    } else None
+    sqlType match {
+      case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
+        // This could instead be a BinaryType if we'd rather return 
bit-vectors of up to 64 bits as
+        // byte arrays instead of longs.
+        md.putLong("binarylong", 1)
+        Some(LongType)
+      case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) =>
+        Some(BooleanType)
+      case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>
+        // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS 
for historical reason
+        Some(StringType)
+      case Types.VARCHAR if "JSON".equalsIgnoreCase(typeName) =>
+        // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with 
a precision of -1.
+        // Explicitly converts it into StringType here.
+        Some(StringType)
+      case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) =>
+        Some(ByteType)
+      case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) =>
+        // scalastyle:off line.size.limit
+        // In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE
+        // 
https://github.com/mysql/mysql-connector-j/blob/8.3.0/src/main/core-api/java/com/mysql/cj/MysqlType.java#L251
+        // scalastyle:on line.size.limit
+        Some(getTimestampType(md.build()))
+      case Types.TIMESTAMP => Some(TimestampType)
+      case _ => None
+    }
   }
 
   override def quoteIdentifier(colName: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to