This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1fc112677f [SPARK-47871][SQL] Oracle: Map TimestampType to TIMESTAMP 
WITH LOCAL TIME ZONE
9a1fc112677f is described below

commit 9a1fc112677f98089d946b3bf4f52b33ab0a5c23
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Apr 16 08:35:51 2024 -0700

    [SPARK-47871][SQL] Oracle: Map TimestampType to TIMESTAMP WITH LOCAL TIME 
ZONE
    
    ### What changes were proposed in this pull request?
    
    This PR map TimestampType to TIMESTAMP WITH LOCAL TIME ZONE
    
    ### Why are the changes needed?
    
    We currently map both TimestampType and TimestampNTZType to Oracle's 
TIMESTAMP which represents a timestamp without time zone. This is ambiguous
    
    ### Does this PR introduce _any_ user-facing change?
    
    It does not affect spark users to play a TimestampType read-write-read 
roundtrip, but might affect other systems' reading
    
    ### How was this patch tested?
    
    existing test with new configuration
    ```java
    SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE (9 seconds, 536 
milliseconds)
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #46080 from yaooqinn/SPARK-47871.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/jdbc/OracleIntegrationSuite.scala    | 39 ++++++++++++----------
 docs/sql-migration-guide.md                        |  1 +
 .../org/apache/spark/sql/internal/SQLConf.scala    | 12 +++++++
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  |  5 ++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      |  5 ++-
 5 files changed, 43 insertions(+), 19 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 418b86fb6b23..496498e5455b 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -547,23 +547,28 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSpark
   }
 
   test("SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE") {
-    val reader = spark.read.format("jdbc")
-      .option("url", jdbcUrl)
-      .option("dbtable", "test_ltz")
-    val df = reader.load()
-    val row1 = df.collect().head.getTimestamp(0)
-    assert(df.count() === 1)
-    assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33"))
-
-    df.write.format("jdbc")
-      .option("url", jdbcUrl)
-      .option("dbtable", "test_ltz")
-      .mode("append")
-      .save()
-
-    val df2 = reader.load()
-    assert(df.count() === 2)
-    assert(df2.collect().forall(_.getTimestamp(0) === row1))
+    Seq("true", "false").foreach { flag =>
+      withSQLConf((SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key, flag)) 
{
+        val df = spark.read.format("jdbc")
+          .option("url", jdbcUrl)
+          .option("dbtable", "test_ltz")
+          .load()
+        val row1 = df.collect().head.getTimestamp(0)
+        assert(df.count() === 1)
+        assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33"))
+
+        df.write.format("jdbc")
+          .option("url", jdbcUrl)
+          .option("dbtable", "test_ltz" + flag)
+          .save()
+
+        val df2 = spark.read.format("jdbc")
+          .option("url", jdbcUrl)
+          .option("dbtable", "test_ltz" + flag)
+          .load()
+        checkAnswer(df2, Row(row1))
+      }
+    }
   }
 
   test("SPARK-47761: Reading ANSI INTERVAL Types") {
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index c7bd0b55840c..3004008b8ec7 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -45,6 +45,7 @@ license: |
 - Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while 
in Spark 3.5 and previous, it was read as DoubleType. To restore the previous 
behavior, you can cast the column to the old type.
 - Since Spark 4.0, MySQL JDBC datasource will read BIT(n > 1) as BinaryType, 
while in Spark 3.5 and previous, read as LongType. To restore the previous 
behavior, set `spark.sql.legacy.mysql.bitArrayMapping.enabled` to `true`.
 - Since Spark 4.0, MySQL JDBC datasource will write ShortType as SMALLINT, 
while in Spark 3.5 and previous, write as INTEGER. To restore the previous 
behavior, you can replace the column with IntegerType whenever before writing.
+- Since Spark 4.0, Oracle JDBC datasource will write TimestampType as 
TIMESTAMP WITH LOCAL TIME ZONE, while in Spark 3.5 and previous, write as 
TIMESTAMP. To restore the previous behavior, set 
`spark.sql.legacy.oracle.timestampMapping.enabled` to `true`.
 - Since Spark 4.0, The default value for 
`spark.sql.legacy.ctePrecedencePolicy` has been changed from `EXCEPTION` to 
`CORRECTED`. Instead of raising an error, inner CTE definitions take precedence 
over outer definitions.
 - Since Spark 4.0, The default value for `spark.sql.legacy.timeParserPolicy` 
has been changed from `EXCEPTION` to `CORRECTED`. Instead of raising an 
`INCONSISTENT_BEHAVIOR_CROSS_VERSION` error, `CANNOT_PARSE_TIMESTAMP` will be 
raised if ANSI mode is enable. `NULL` will be returned if ANSI mode is 
disabled. See [Datetime Patterns for Formatting and 
Parsing](sql-ref-datetime-pattern.html).
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 278d4dc8d302..e5ba1be0f5f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4135,6 +4135,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED =
+    buildConf("spark.sql.legacy.oracle.timestampMapping.enabled")
+      .internal()
+      .doc("When true, TimestampType maps to TIMESTAMP in Oracle; otherwise, " 
+
+        "TIMESTAMP WITH LOCAL TIME ZONE.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val CSV_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.csv.filterPushdown.enabled")
     .doc("When true, enable filter pushdown to CSV datasource.")
     .version("3.0.0")
@@ -5235,6 +5244,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def legacyMySqlBitArrayMappingEnabled: Boolean =
     getConf(LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED)
 
+  def legacyOracleTimestampMappingEnabled: Boolean =
+    getConf(LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED)
+
   override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
     LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 001d47f13b21..26c816294b52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -23,13 +23,14 @@ import java.util.Locale
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
 import org.apache.spark.sql.jdbc.OracleDialect._
 import org.apache.spark.sql.types._
 
 
-private case class OracleDialect() extends JdbcDialect {
+private case class OracleDialect() extends JdbcDialect with SQLConfHelper {
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")
 
@@ -120,6 +121,8 @@ private case class OracleDialect() extends JdbcDialect {
     case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT))
     case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT))
     case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
+    case TimestampType if !conf.legacyOracleTimestampMappingEnabled =>
+      Some(JdbcType("TIMESTAMP WITH LOCAL TIME ZONE", TIMESTAMP_LTZ))
     case _ => None
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5e387a3f0791..88bb53cc7488 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1335,7 +1335,10 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
     assert(getJdbcType(oracleDialect, StringType) == "VARCHAR2(255)")
     assert(getJdbcType(oracleDialect, BinaryType) == "BLOB")
     assert(getJdbcType(oracleDialect, DateType) == "DATE")
-    assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
+    assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP WITH LOCAL 
TIME ZONE")
+    withSQLConf(SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key -> "true") 
{
+      assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
+    }
     assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to