This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 973b3d56aee [SPARK-42221][SQL] Introduce a new conf for TimestampNTZ schema inference in JSON/CSV 973b3d56aee is described below commit 973b3d56aeeb4d6f6223b1abe04e27eefa8e4208 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Mon Jan 30 13:31:15 2023 -0800 [SPARK-42221][SQL] Introduce a new conf for TimestampNTZ schema inference in JSON/CSV ### What changes were proposed in this pull request? The TimestampNTZ schema inference over data sources is not consistent in the current code (most of them are for the purpose of backward compatibility to infer as Timestamp LTZ by default): * CSV & JSON: depends on `spark.sql.timestampType` to determine the result * ORC: depends on whether there is metadata written. If not, inferred as Timestamp LTZ * Parquet: infer timestamp column with annotation isAdjustedToUTC = false as Timestamp NTZ. There is a configuration `spark.sql.parquet.timestampNTZ.enabled` to determine whether to support NTZ. When `spark.sql.parquet.timestampNTZ.enabled` is false, users can't write Timestamp NTZ columns to parquet files. * Avro: [Local timestamp](https://avro.apache.org/docs/1.10.2/spec.html#Local+timestamp+%28microsecond+precision%29) type is a new logical type so there is no backward compatibility issue and there is no configuration to control the inference. Since we are going to release Timestamp NTZ in Spark 3.4.0, I propose using a new configuration `spark.sql.inferTimestampNTZInDataSources.enabled` for TimestampNTZ schema inference. The flag is false by default for backward compatibility. When true, if a column can be either TimestampNTZ or TimestampLTZ, the infer result will be TimestampNTZ. This PR converts JSON/CSV data sources. If the proposal is fine to others, I will continue on the other data sources. ### Why are the changes needed? * The TimestampNTZ schema inference over data sources is not consistent in the current code * The configuration `spark.sql.timestampType` is heavy. It changes the DDL/SQL functions's default timestamp type. If a user only wants to read back the newly written TimestampNTZ data without breaking the existing workloads, having a lightweight flag is a good idea. ### Does this PR introduce _any_ user-facing change? No, TimestampNTZ is not released yet. ### How was this patch tested? UTs Closes #39777 from gengliangwang/ntzOptions. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 6 +++++- .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 6 +++++- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++++++ .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 3 +++ .../spark/sql/execution/datasources/csv/CSVSuite.scala | 10 +++++----- .../spark/sql/execution/datasources/json/JsonSuite.scala | 10 +++++----- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index bdfa4ac3f0f..57e683abc13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -203,7 +203,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType + if (SQLConf.get.inferTimestampNTZInDataSources) { + TimestampNTZType + } else { + TimestampType + } } else { tryParseTimestamp(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 5385afe8c93..f5721d7aa8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -152,7 +152,11 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { decimalTry.get } else if (options.inferTimestamp && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType + if (SQLConf.get.inferTimestampNTZInDataSources) { + TimestampNTZType + } else { + TimestampType + } } else if (options.inferTimestamp && timestampFormatter.parseOptional(field).isDefined) { TimestampType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 79035285f2f..925769a91bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3507,6 +3507,16 @@ object SQLConf { .checkValues(TimestampTypes.values.map(_.toString)) .createWithDefault(TimestampTypes.TIMESTAMP_LTZ.toString) + val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES = + buildConf("spark.sql.inferTimestampNTZInDataSources.enabled") + .doc("When true, the TimestampNTZ type is the prior choice of the schema inference " + + "over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " + + "backward compatibility. As a result, for JSON/CSV files written with TimestampNTZ " + + "columns, the inference results will still be of TimestampLTZ types.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val DATETIME_JAVA8API_ENABLED = buildConf("spark.sql.datetime.java8API.enabled") .doc("If the configuration property is set to true, java.time.Instant and " + "java.time.LocalDate classes of Java 8 API are used as external types for " + @@ -4795,6 +4805,8 @@ class SQLConf extends Serializable with Logging { TimestampNTZType } + def inferTimestampNTZInDataSources: Boolean = getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES) + def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) def serializerNestedSchemaPruningEnabled: Boolean = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 8cae2400e0c..fc508d9d09c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -252,6 +252,9 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { assert(inferSchema.inferField(DateType, "2003/02/05") == StringType) } + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") { + assert(inferSchema.inferField(DateType, "2003/02/05") == StringType) + } assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType) assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b458a0e1b08..69b41b66a0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1047,7 +1047,7 @@ abstract class CSVSuite .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .save(path.getAbsolutePath) - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") { val res = spark.read .format("csv") .option("inferSchema", "true") @@ -1070,7 +1070,7 @@ abstract class CSVSuite .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .save(path.getAbsolutePath) - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") { val res = spark.read .format("csv") .option("inferSchema", "true") @@ -1117,15 +1117,15 @@ abstract class CSVSuite SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) - for (timestampType <- timestampTypes) { - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + Seq(true, false).foreach { inferTimestampNTZ => + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) { val res = spark.read .format("csv") .option("inferSchema", "true") .option("header", "true") .load(path.getAbsolutePath) - if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + if (inferTimestampNTZ) { checkAnswer(res, exp) } else { checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a4b7df9af42..af8d4bd58d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2770,7 +2770,7 @@ abstract class JsonSuite .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .json(path.getAbsolutePath) - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") { val res = spark.read .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .option("inferTimestamp", "true") @@ -2792,7 +2792,7 @@ abstract class JsonSuite .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .json(path.getAbsolutePath) - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") { val res = spark.read .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .option("inferTimestamp", "true") @@ -2835,11 +2835,11 @@ abstract class JsonSuite SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) - for (timestampType <- timestampTypes) { - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + Seq(true, false).foreach { inferTimestampNTZ => + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) { val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath) - if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + if (inferTimestampNTZ) { checkAnswer(res, exp) } else { checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org