This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a48365dd98c9 [SPARK-48387][SQL] Postgres: Map TimestampType to TIMESTAMP WITH TIME ZONE a48365dd98c9 is described below commit a48365dd98c9e52b5648d1cc0af203a7290cb1dc Author: Kent Yao <y...@apache.org> AuthorDate: Thu May 23 10:27:16 2024 +0800 [SPARK-48387][SQL] Postgres: Map TimestampType to TIMESTAMP WITH TIME ZONE ### What changes were proposed in this pull request? Currently, Both TimestampType/TimestampNTZType are mapped to TIMESTAMP WITHOUT TIME ZONE for writing while being differentiated for reading. In this PR, we map TimestampType to TIMESTAMP WITH TIME ZONE to differentiate TimestampType/TimestampNTZType for writing against Postgres. ### Why are the changes needed? TimestampType <-> TIMESTAMP WITHOUT TIME ZONE is incorrect and ambiguous with TimestampNTZType ### Does this PR introduce _any_ user-facing change? Yes migration guide and legacy configuration provided ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #46701 from yaooqinn/SPARK-48387. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 46 ++++++++++++++++++++++ docs/sql-data-sources-jdbc.md | 4 +- docs/sql-migration-guide.md | 3 +- .../org/apache/spark/sql/internal/SQLConf.scala | 14 +++++++ .../apache/spark/sql/jdbc/PostgresDialect.scala | 6 ++- 5 files changed, 68 insertions(+), 5 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index dd6f1bfd3b3f..5ad4f15216b7 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -583,4 +584,49 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(cause.getSQLState === "22003") } } + + test("SPARK-48387: Timestamp write as timestamp with time zone") { + val df = spark.sql("select TIMESTAMP '2018-11-17 13:33:33' as col0") + // write timestamps for preparation + withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> "false") { + // write timestamp as timestamp with time zone + df.write.jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties) + } + withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> "true") { + // write timestamp as timestamp without time zone + df.write.jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties) + } + + // read timestamps for test + withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> "true") { + val df1 = spark.read.option("preferTimestampNTZ", false) + .jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties) + checkAnswer(df1, Row(Timestamp.valueOf("2018-11-17 13:33:33"))) + val df2 = spark.read.option("preferTimestampNTZ", true) + .jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties) + checkAnswer(df2, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33))) + + val df3 = spark.read.option("preferTimestampNTZ", false) + .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties) + checkAnswer(df3, Row(Timestamp.valueOf("2018-11-17 13:33:33"))) + val df4 = spark.read.option("preferTimestampNTZ", true) + .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties) + checkAnswer(df4, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33))) + } + withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> "false") { + Seq("true", "false").foreach { prefer => + val prop = new Properties + prop.setProperty("preferTimestampNTZ", prefer) + val dfCopy = spark.read.jdbc(jdbcUrl, "ts_with_timezone_copy_false", prop) + checkAnswer(dfCopy, Row(Timestamp.valueOf("2018-11-17 13:33:33"))) + } + + val df5 = spark.read.option("preferTimestampNTZ", false) + .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties) + checkAnswer(df5, Row(Timestamp.valueOf("2018-11-17 13:33:33"))) + val df6 = spark.read.option("preferTimestampNTZ", true) + .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties) + checkAnswer(df6, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33))) + } + } } diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index 54a8506bff51..371dc0595071 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -1074,8 +1074,8 @@ the [PostgreSQL JDBC Driver](https://mvnrepository.com/artifact/org.postgresql/p </tr> <tr> <td>TimestampType</td> - <td>timestamp</td> - <td></td> + <td>timestamp with time zone</td> + <td>Before Spark 4.0, it was mapped as timestamp. Please refer to the migration guide for more information</td> </tr> <tr> <td>TimestampNTZType</td> diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index e668a9f9ef75..8f6a41556986 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -41,7 +41,8 @@ license: | - `spark.sql.avro.datetimeRebaseModeInRead` instead of `spark.sql.legacy.avro.datetimeRebaseModeInRead` - Since Spark 4.0, the default value of `spark.sql.orc.compression.codec` is changed from `snappy` to `zstd`. To restore the previous behavior, set `spark.sql.orc.compression.codec` to `snappy`. - Since Spark 4.0, the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to change `strfmt` of the `format_string` function to use 1-based indexes. The first argument must be referenced by "1$", the second by "2$", etc. -- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert Postgres TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types to TimestampNTZType, which is available in Spark 3.5. +- Since Spark 4.0, Postgres JDBC datasource will read JDBC read TIMESTAMP WITH TIME ZONE as TimestampType regardless of the JDBC read option `preferTimestampNTZ`, while in 3.5 and previous, TimestampNTZType when `preferTimestampNTZ=true`. To restore the previous behavior, set `spark.sql.legacy.postgres.datetimeMapping.enabled` to `true`. +- Since Spark 4.0, Postgres JDBC datasource will write TimestampType as TIMESTAMP WITH TIME ZONE, while in 3.5 and previous, it wrote as TIMESTAMP a.k.a. TIMESTAMP WITHOUT TIME ZONE. To restore the previous behavior, set `spark.sql.legacy.postgres.datetimeMapping.enabled` to `true`. - Since Spark 4.0, MySQL JDBC datasource will read TIMESTAMP as TimestampType regardless of the JDBC read option `preferTimestampNTZ`, while in 3.5 and previous, TimestampNTZType when `preferTimestampNTZ=true`. To restore the previous behavior, set `spark.sql.legacy.mysql.timestampNTZMapping.enabled` to `true`, MySQL DATETIME is not affected. - Since Spark 4.0, MySQL JDBC datasource will read SMALLINT as ShortType, while in Spark 3.5 and previous, it was read as IntegerType. MEDIUMINT UNSIGNED is read as IntegerType, while in Spark 3.5 and previous, it was read as LongType. To restore the previous behavior, you can cast the column to the old type. - Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while in Spark 3.5 and previous, it was read as DoubleType. To restore the previous behavior, you can cast the column to the old type. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 545b0a610cdf..06e0c6eda589 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4265,6 +4265,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED = + buildConf("spark.sql.legacy.postgres.datetimeMapping.enabled") + .internal() + .doc("When true, TimestampType maps to TIMESTAMP WITHOUT TIME ZONE in PostgreSQL for " + + "writing; otherwise, TIMESTAMP WITH TIME ZONE. When true, TIMESTAMP WITH TIME ZONE " + + "can be converted to TimestampNTZType when JDBC read option preferTimestampNTZ is " + + "true; otherwise, converted to TimestampType regardless of preferTimestampNTZ.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled") .doc("When true, enable filter pushdown to CSV datasource.") .version("3.0.0") @@ -5410,6 +5421,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyDB2BooleanMappingEnabled: Boolean = getConf(LEGACY_DB2_BOOLEAN_MAPPING_ENABLED) + def legacyPostgresDatetimeMappingEnabled: Boolean = + getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED) + override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index f3fb115c7057..93052a0c37b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -61,8 +61,8 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { // money type seems to be broken but one workaround is to handle it as string. // See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/100 Some(StringType) - case Types.TIMESTAMP - if "timestamptz".equalsIgnoreCase(typeName) => + case Types.TIMESTAMP if "timestamptz".equalsIgnoreCase(typeName) && + !conf.legacyPostgresDatetimeMappingEnabled => // timestamptz represents timestamp with time zone, currently it maps to Types.TIMESTAMP. // We need to change to Types.TIMESTAMP_WITH_TIMEZONE if the upstream changes. Some(TimestampType) @@ -149,6 +149,8 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT)) + case TimestampType if !conf.legacyPostgresDatetimeMappingEnabled => + Some(JdbcType("TIMESTAMP WITH TIME ZONE", Types.TIMESTAMP)) case t: DecimalType => Some( JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] || et.isInstanceOf[ArrayType] => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org