This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new b8c0fb9c760 [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source b8c0fb9c760 is described below commit b8c0fb9c7605f7ab51fbb0e900f9334f4e748218 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri Feb 3 10:06:26 2023 -0800 [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source ### What changes were proposed in this pull request? Simliar to https://github.com/apache/spark/pull/39777 and https://github.com/apache/spark/pull/39812, this PR proposes to use `spark.sql.inferTimestampNTZInDataSources.enabled` to control the behavior of timestamp type inference on JDBC data sources. ### Why are the changes needed? Unify the TimestampNTZ type inference behavior over data sources. In JDBC/JSON/CSV data sources, a column can be Timestamp type or TimestampNTZ type. We need a lightweight configuration to control the behavior. ### Does this PR introduce _any_ user-facing change? No, TimestampNTZ is not released yet. ### How was this patch tested? UTs Closes #39868 from gengliangwang/jdbcNTZ. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 4760a8bd845292f7d6d6a35320bd80082a76c7c5) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/sql/internal/SQLConf.scala | 9 ++--- .../execution/datasources/jdbc/JDBCOptions.scala | 7 +++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 43 ++++++++++++++++------ 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1cc3b61b834..363e763be4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,11 +3520,10 @@ object SQLConf { val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES = buildConf("spark.sql.inferTimestampNTZInDataSources.enabled") - .doc("When true, the TimestampNTZ type is the prior choice of the schema inference " + - "over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " + - "backward compatibility. As a result, for JSON/CSV files and partition directories " + - "written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " + - "types.") + .doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " + + "this config determines whether to choose the TimestampNTZ type if a column can be " + + "either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " + + "the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.") .version("3.4.0") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index e725de95335..888951cf9a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf /** * Options for the JDBC data source. @@ -232,7 +233,11 @@ class JDBCOptions( val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("") // Infers timestamp values as TimestampNTZ type when reading data. - val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean + val inferTimestampNTZType = + parameters + .get(JDBC_INFER_TIMESTAMP_NTZ) + .map(_.toBoolean) + .getOrElse(SQLConf.get.inferTimestampNTZInDataSources) } class JdbcOptionsInWrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 3e317dc9547..3b3b1bfdb60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime} import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} import scala.collection.JavaConverters._ +import scala.util.Random import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ @@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession { .option("url", urlWithUserAndPass) .option("dbtable", tableName).save() - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") + val readDf = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", tableName) - .load() - checkAnswer(res, Seq(Row(null))) + Seq(true, false).foreach { inferTimestampNTZ => + val tsType = if (inferTimestampNTZ) { + TimestampNTZType + } else { + TimestampType + } + val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load() + checkAnswer(res, Seq(Row(null))) + assert(res.schema.fields.head.dataType == tsType) + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) { + val res2 = readDf.load() + checkAnswer(res2, Seq(Row(null))) + assert(res2.schema.fields.head.dataType == tsType) + } + } + } test("SPARK-39339: TimestampNTZType with different local time zones") { @@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession { .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() - - DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => - DateTimeTestUtils.withDefaultTimeZone(zoneId) { - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") + val zoneId = DateTimeTestUtils.outstandingZoneIds( + Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length)) + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + // Infer TimestmapNTZ column with data source option + val res = spark.read.format("jdbc") + .option("inferTimestampNTZType", "true") + .option("url", urlWithUserAndPass) + .option("dbtable", tableName) + .load() + checkAnswer(res, df) + + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") { + val res2 = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .load() - - checkAnswer(res, df) + checkAnswer(res2, df) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org