This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e8bc176e6fd1 [SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE e8bc176e6fd1 is described below commit e8bc176e6fd145bab4cde6bf38931a7ad4c7eecd Author: Kent Yao <y...@apache.org> AuthorDate: Tue Mar 12 07:33:24 2024 -0700 [SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE ### What changes were proposed in this pull request? This PR Supports TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE when `preferTimestampNTZ` option is set to true by users ### Why are the changes needed? improve DB2 connector ### Does this PR introduce _any_ user-facing change? yes, preferTimestampNTZ works for DB2 TIMESTAMP WITH TIME ZONE ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45471 from yaooqinn/SPARK-47342. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 14 ++++++++++++++ .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++++++-- .../main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 7 +++++++ .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 13 +++++-------- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 9 +++++++-- 6 files changed, 42 insertions(+), 13 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index cedb33d491fb..14776047cec4 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} +import java.time.LocalDateTime import java.util.Properties import org.scalatest.time.SpanSugar._ @@ -224,4 +225,17 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { assert(actual === expected) } + + test("SPARK-47342:gi Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE") { + // The test only covers TIMESTAMP WITHOUT TIME ZONE so far, we shall support + // TIMESTAMP WITH TIME ZONE but I don't figure it out to mock a TSTZ value. + withDefaultTimeZone(UTC) { + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("preferTimestampNTZ", "true") + .option("query", "select ts from dates") + .load() + checkAnswer(df, Row(LocalDateTime.of(2009, 2, 13, 23, 31, 30))) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index a7bbb832a839..27c032471b57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -212,8 +212,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType case java.sql.Types.TIME => TimestampType - case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType - case java.sql.Types.TIMESTAMP => TimestampType + case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ) case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType @@ -229,6 +228,13 @@ object JdbcUtils extends Logging with SQLConfHelper { throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, typeName) } + /** + * Return TimestampNTZType if isTimestampNT; otherwise TimestampType. + */ + def getTimestampType(isTimestampNTZ: Boolean): DataType = { + if (isTimestampNTZ) TimestampNTZType else TimestampType + } + /** * Returns the schema if the table already exists in the JDBC database. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 62c31b1c4c5d..ff3e74eae205 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -91,7 +91,7 @@ private object DB2Dialect extends JdbcDialect { typeName match { case "DECFLOAT" => Option(DecimalType(38, 18)) case "XML" => Option(StringType) - case t if (t.startsWith("TIMESTAMP")) => Option(TimestampType) // TIMESTAMP WITH TIMEZONE + case t if t.startsWith("TIMESTAMP") => Option(getTimestampType(md.build())) case _ => None } case _ => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 6621282647d4..6d67a0d91eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -744,6 +744,13 @@ abstract class JdbcDialect extends Serializable with Logging { def getFullyQualifiedQuotedTableName(ident: Identifier): String = { (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".") } + + /** + * Return TimestampType/TimestampNTZType based on the metadata. + */ + protected final def getTimestampType(md: Metadata): DataType = { + JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ")) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index d19b5ba3e0eb..6852f1f69984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -64,18 +64,15 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } else if ("text".equalsIgnoreCase(typeName)) { Some(StringType) // sqlType is Types.VARCHAR } else if (sqlType == Types.ARRAY) { - val scale = md.build().getLong("scale").toInt - val isTimestampNTZ = md.build().getBoolean("isTimestampNTZ") // postgres array type names start with underscore - toCatalystType(typeName.drop(1), size, scale, isTimestampNTZ).map(ArrayType(_)) + toCatalystType(typeName.drop(1), size, md.build()).map(ArrayType(_)) } else None } private def toCatalystType( typeName: String, precision: Int, - scale: Int, - isTimestampNTZ: Boolean): Option[DataType] = typeName match { + metadata: Metadata): Option[DataType] = typeName match { case "bool" => Some(BooleanType) case "bit" => Some(BinaryType) case "int2" => Some(ShortType) @@ -91,10 +88,10 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { "interval" | "pg_snapshot" => Some(StringType) case "bytea" => Some(BinaryType) - case "timestamp" | "timestamptz" | "time" | "timetz" => - Some(if (isTimestampNTZ) TimestampNTZType else TimestampType) + case "timestamp" | "timestamptz" | "time" | "timetz" => Some(getTimestampType(metadata)) case "date" => Some(DateType) - case "numeric" | "decimal" if precision > 0 => Some(DecimalType.bounded(precision, scale)) + case "numeric" | "decimal" if precision > 0 => + Some(DecimalType.bounded(precision, metadata.getLong("scale").toInt)) case "numeric" | "decimal" => // SPARK-26538: handle numeric without explicit precision and scale. Some(DecimalType.SYSTEM_DEFAULT) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b8ca70e0b175..47d60abd1dd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -903,6 +903,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("DB2Dialect type mapping") { val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") + val metadata = new MetadataBuilder().putBoolean("isTimestampNTZ", false) + assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") assert(db2Dialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == "SMALLINT") @@ -912,8 +914,11 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "DECFLOAT", 1, null) == Option(DecimalType(38, 18))) assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "XML", 1, null) == Option(StringType)) - assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, null) == - Option(TimestampType)) + assert(db2Dialect.getCatalystType( + java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) === Option(TimestampType)) + metadata.putBoolean("isTimestampNTZ", true) + assert(db2Dialect.getCatalystType( + java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) === Option(TimestampNTZType)) } test("MySQLDialect catalyst type mapping") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org