This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 973b3d56aee [SPARK-42221][SQL] Introduce a new conf for TimestampNTZ 
schema inference in JSON/CSV
973b3d56aee is described below

commit 973b3d56aeeb4d6f6223b1abe04e27eefa8e4208
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Mon Jan 30 13:31:15 2023 -0800

    [SPARK-42221][SQL] Introduce a new conf for TimestampNTZ schema inference 
in JSON/CSV
    
    ### What changes were proposed in this pull request?
    
    The TimestampNTZ schema inference over data sources is not consistent in 
the current code (most of them are for the purpose of backward compatibility to 
infer as Timestamp LTZ by default):
    * CSV & JSON: depends on `spark.sql.timestampType` to determine the result
    * ORC: depends on whether there is metadata written. If not, inferred as 
Timestamp LTZ
    * Parquet: infer timestamp column with annotation isAdjustedToUTC = false 
as Timestamp NTZ. There is a configuration 
`spark.sql.parquet.timestampNTZ.enabled` to determine whether to support NTZ. 
When `spark.sql.parquet.timestampNTZ.enabled` is false, users can't write 
Timestamp NTZ columns to parquet files.
    * Avro: [Local 
timestamp](https://avro.apache.org/docs/1.10.2/spec.html#Local+timestamp+%28microsecond+precision%29)
 type is a new logical type so there is no backward compatibility issue and 
there is no configuration to control the inference.
    
    Since we are going to release Timestamp NTZ in Spark 3.4.0, I propose using 
a new configuration `spark.sql.inferTimestampNTZInDataSources.enabled` for 
TimestampNTZ schema inference. The flag is false by default for backward 
compatibility. When true, if a column can be either TimestampNTZ or 
TimestampLTZ, the infer result will be TimestampNTZ. This PR converts JSON/CSV 
data sources. If the proposal is fine to others, I will continue on the other 
data sources.
    
    ### Why are the changes needed?
    
    * The TimestampNTZ schema inference over data sources is not consistent in 
the current code
    * The configuration `spark.sql.timestampType` is heavy. It changes the 
DDL/SQL functions's default timestamp type. If a user only wants to read back 
the newly written TimestampNTZ data without breaking the existing workloads, 
having a lightweight flag is a good idea.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, TimestampNTZ is not released yet.
    
    ### How was this patch tested?
    
    UTs
    
    Closes #39777 from gengliangwang/ntzOptions.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala   |  6 +++++-
 .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala |  6 +++++-
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala   | 12 ++++++++++++
 .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala  |  3 +++
 .../spark/sql/execution/datasources/csv/CSVSuite.scala       | 10 +++++-----
 .../spark/sql/execution/datasources/json/JsonSuite.scala     | 10 +++++-----
 6 files changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index bdfa4ac3f0f..57e683abc13 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -203,7 +203,11 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
     // time-zone component and can be parsed with the timestamp formatter.
     // Otherwise, it is likely to be a timestamp with timezone.
     if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-      SQLConf.get.timestampType
+      if (SQLConf.get.inferTimestampNTZInDataSources) {
+        TimestampNTZType
+      } else {
+        TimestampType
+      }
     } else {
       tryParseTimestamp(field)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 5385afe8c93..f5721d7aa8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -152,7 +152,11 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
           decimalTry.get
         } else if (options.inferTimestamp &&
             timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-          SQLConf.get.timestampType
+          if (SQLConf.get.inferTimestampNTZInDataSources) {
+            TimestampNTZType
+          } else {
+            TimestampType
+          }
         } else if (options.inferTimestamp &&
             timestampFormatter.parseOptional(field).isDefined) {
           TimestampType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 79035285f2f..925769a91bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3507,6 +3507,16 @@ object SQLConf {
       .checkValues(TimestampTypes.values.map(_.toString))
       .createWithDefault(TimestampTypes.TIMESTAMP_LTZ.toString)
 
+  val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
+    buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
+      .doc("When true, the TimestampNTZ type is the prior choice of the schema 
inference " +
+        "over built-in data sources. Otherwise, the inference result will be 
TimestampLTZ for " +
+        "backward compatibility. As a result, for JSON/CSV files written with 
TimestampNTZ " +
+        "columns, the inference results will still be of TimestampLTZ types.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val DATETIME_JAVA8API_ENABLED = 
buildConf("spark.sql.datetime.java8API.enabled")
     .doc("If the configuration property is set to true, java.time.Instant and 
" +
       "java.time.LocalDate classes of Java 8 API are used as external types 
for " +
@@ -4795,6 +4805,8 @@ class SQLConf extends Serializable with Logging {
       TimestampNTZType
   }
 
+  def inferTimestampNTZInDataSources: Boolean = 
getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)
+
   def nestedSchemaPruningEnabled: Boolean = 
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
 
   def serializerNestedSchemaPruningEnabled: Boolean =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index 8cae2400e0c..fc508d9d09c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -252,6 +252,9 @@ class CSVInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
     withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
       assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
     }
+    withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+      assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
+    }
     assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == 
StringType)
     assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b458a0e1b08..69b41b66a0c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1047,7 +1047,7 @@ abstract class CSVSuite
         .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .save(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
         val res = spark.read
           .format("csv")
           .option("inferSchema", "true")
@@ -1070,7 +1070,7 @@ abstract class CSVSuite
         .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .save(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
         val res = spark.read
           .format("csv")
           .option("inferSchema", "true")
@@ -1117,15 +1117,15 @@ abstract class CSVSuite
         SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
         SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
 
-      for (timestampType <- timestampTypes) {
-        withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+      Seq(true, false).foreach { inferTimestampNTZ =>
+        withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
           val res = spark.read
             .format("csv")
             .option("inferSchema", "true")
             .option("header", "true")
             .load(path.getAbsolutePath)
 
-          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+          if (inferTimestampNTZ) {
             checkAnswer(res, exp)
           } else {
             checkAnswer(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index a4b7df9af42..af8d4bd58d6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2770,7 +2770,7 @@ abstract class JsonSuite
         .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .json(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
         val res = spark.read
           .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
           .option("inferTimestamp", "true")
@@ -2792,7 +2792,7 @@ abstract class JsonSuite
         .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .json(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
         val res = spark.read
           .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
           .option("inferTimestamp", "true")
@@ -2835,11 +2835,11 @@ abstract class JsonSuite
         SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
         SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
 
-      for (timestampType <- timestampTypes) {
-        withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+      Seq(true, false).foreach { inferTimestampNTZ =>
+        withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
           val res = spark.read.option("inferTimestamp", 
"true").json(path.getAbsolutePath)
 
-          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+          if (inferTimestampNTZ) {
             checkAnswer(res, exp)
           } else {
             checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to