This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9a1fc112677f [SPARK-47871][SQL] Oracle: Map TimestampType to TIMESTAMP WITH LOCAL TIME ZONE 9a1fc112677f is described below commit 9a1fc112677f98089d946b3bf4f52b33ab0a5c23 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Apr 16 08:35:51 2024 -0700 [SPARK-47871][SQL] Oracle: Map TimestampType to TIMESTAMP WITH LOCAL TIME ZONE ### What changes were proposed in this pull request? This PR map TimestampType to TIMESTAMP WITH LOCAL TIME ZONE ### Why are the changes needed? We currently map both TimestampType and TimestampNTZType to Oracle's TIMESTAMP which represents a timestamp without time zone. This is ambiguous ### Does this PR introduce _any_ user-facing change? It does not affect spark users to play a TimestampType read-write-read roundtrip, but might affect other systems' reading ### How was this patch tested? existing test with new configuration ```java SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE (9 seconds, 536 milliseconds) ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #46080 from yaooqinn/SPARK-47871. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/jdbc/OracleIntegrationSuite.scala | 39 ++++++++++++---------- docs/sql-migration-guide.md | 1 + .../org/apache/spark/sql/internal/SQLConf.scala | 12 +++++++ .../org/apache/spark/sql/jdbc/OracleDialect.scala | 5 ++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 ++- 5 files changed, 43 insertions(+), 19 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 418b86fb6b23..496498e5455b 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -547,23 +547,28 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark } test("SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE") { - val reader = spark.read.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "test_ltz") - val df = reader.load() - val row1 = df.collect().head.getTimestamp(0) - assert(df.count() === 1) - assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33")) - - df.write.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "test_ltz") - .mode("append") - .save() - - val df2 = reader.load() - assert(df.count() === 2) - assert(df2.collect().forall(_.getTimestamp(0) === row1)) + Seq("true", "false").foreach { flag => + withSQLConf((SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key, flag)) { + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "test_ltz") + .load() + val row1 = df.collect().head.getTimestamp(0) + assert(df.count() === 1) + assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33")) + + df.write.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "test_ltz" + flag) + .save() + + val df2 = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "test_ltz" + flag) + .load() + checkAnswer(df2, Row(row1)) + } + } } test("SPARK-47761: Reading ANSI INTERVAL Types") { diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index c7bd0b55840c..3004008b8ec7 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -45,6 +45,7 @@ license: | - Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while in Spark 3.5 and previous, it was read as DoubleType. To restore the previous behavior, you can cast the column to the old type. - Since Spark 4.0, MySQL JDBC datasource will read BIT(n > 1) as BinaryType, while in Spark 3.5 and previous, read as LongType. To restore the previous behavior, set `spark.sql.legacy.mysql.bitArrayMapping.enabled` to `true`. - Since Spark 4.0, MySQL JDBC datasource will write ShortType as SMALLINT, while in Spark 3.5 and previous, write as INTEGER. To restore the previous behavior, you can replace the column with IntegerType whenever before writing. +- Since Spark 4.0, Oracle JDBC datasource will write TimestampType as TIMESTAMP WITH LOCAL TIME ZONE, while in Spark 3.5 and previous, write as TIMESTAMP. To restore the previous behavior, set `spark.sql.legacy.oracle.timestampMapping.enabled` to `true`. - Since Spark 4.0, The default value for `spark.sql.legacy.ctePrecedencePolicy` has been changed from `EXCEPTION` to `CORRECTED`. Instead of raising an error, inner CTE definitions take precedence over outer definitions. - Since Spark 4.0, The default value for `spark.sql.legacy.timeParserPolicy` has been changed from `EXCEPTION` to `CORRECTED`. Instead of raising an `INCONSISTENT_BEHAVIOR_CROSS_VERSION` error, `CANNOT_PARSE_TIMESTAMP` will be raised if ANSI mode is enable. `NULL` will be returned if ANSI mode is disabled. See [Datetime Patterns for Formatting and Parsing](sql-ref-datetime-pattern.html). diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 278d4dc8d302..e5ba1be0f5f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4135,6 +4135,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED = + buildConf("spark.sql.legacy.oracle.timestampMapping.enabled") + .internal() + .doc("When true, TimestampType maps to TIMESTAMP in Oracle; otherwise, " + + "TIMESTAMP WITH LOCAL TIME ZONE.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled") .doc("When true, enable filter pushdown to CSV datasource.") .version("3.0.0") @@ -5235,6 +5244,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyMySqlBitArrayMappingEnabled: Boolean = getConf(LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED) + def legacyOracleTimestampMappingEnabled: Boolean = + getConf(LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED) + override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 001d47f13b21..26c816294b52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -23,13 +23,14 @@ import java.util.Locale import scala.util.control.NonFatal import org.apache.spark.SparkUnsupportedOperationException +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.jdbc.OracleDialect._ import org.apache.spark.sql.types._ -private case class OracleDialect() extends JdbcDialect { +private case class OracleDialect() extends JdbcDialect with SQLConfHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle") @@ -120,6 +121,8 @@ private case class OracleDialect() extends JdbcDialect { case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT)) case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) + case TimestampType if !conf.legacyOracleTimestampMappingEnabled => + Some(JdbcType("TIMESTAMP WITH LOCAL TIME ZONE", TIMESTAMP_LTZ)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5e387a3f0791..88bb53cc7488 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1335,7 +1335,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(getJdbcType(oracleDialect, StringType) == "VARCHAR2(255)") assert(getJdbcType(oracleDialect, BinaryType) == "BLOB") assert(getJdbcType(oracleDialect, DateType) == "DATE") - assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP") + assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP WITH LOCAL TIME ZONE") + withSQLConf(SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key -> "true") { + assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP") + } assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org