This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new c19bf01b5208 [SPARK-46769][SQL] Refine timestamp related schema inference c19bf01b5208 is described below commit c19bf01b5208bb3aad0e6264b64597e0809b1efe Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Sat Jan 20 20:57:09 2024 +0800 [SPARK-46769][SQL] Refine timestamp related schema inference This is a refinement of https://github.com/apache/spark/pull/43243 . This PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and only infer LTZ type using LTZ parser. This consistency is important to avoid nondeterministic behaviors. Avoid non-deterministic behaviors. After https://github.com/apache/spark/pull/43243 , we can still have inconsistency if the LEGACY mode is enabled. Yes for the legacy parser. Now it's more likely to infer string type instead of inferring timestamp type "by luck" existing tests no Closes https://github.com/apache/spark/pull/44789 Closes #44800 from cloud-fan/infer. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit e4e40762ca41931646b8f201028b1f2298252d96) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 18 +++++----- .../spark/sql/catalyst/json/JsonInferSchema.scala | 32 +++++++++++++---- .../sql/execution/datasources/csv/CSVSuite.scala | 42 +++++++++++----------- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ec01b56f9eb7..2c27da3cf6e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set( "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy") + private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType + /** * Similar to the JSON schema inference * 1. Infer type of each row @@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } private def tryParseTimestampNTZ(field: String): DataType = { - // We can only parse the value as TimestampNTZType if it does not have zone-offset or - // time-zone component and can be parsed with the timestamp formatter. - // Otherwise, it is likely to be a timestamp with timezone. - val timestampType = SQLConf.get.timestampType - if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || - timestampType == TimestampNTZType) && - timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - timestampType + // For text-based format, it's ambiguous to infer a timestamp string without timezone, as it can + // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support of NTZ, here + // we only try to infer NTZ if the config is set to use NTZ by default. + if (isDefaultNTZ && + timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + TimestampNTZType } else { tryParseTimestamp(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 4123c5290b6a..f6d32f39f64e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { @@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) + private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType + private val legacyMode = SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY + private def handleJsonErrorsByParseMode(parseMode: ParseMode, columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { parseMode match { @@ -148,16 +152,30 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } - val timestampType = SQLConf.get.timestampType if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get - } else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy == - LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) && + } else if (options.inferTimestamp) { + // For text-based format, it's ambiguous to infer a timestamp string without timezone, as + // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support + // of NTZ, here we only try to infer NTZ if the config is set to use NTZ by default. + if (isDefaultNTZ && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - timestampType - } else if (options.inferTimestamp && - timestampFormatter.parseOptional(field).isDefined) { - TimestampType + TimestampNTZType + } else if (timestampFormatter.parseOptional(field).isDefined) { + TimestampType + } else if (legacyMode) { + val utf8Value = UTF8String.fromString(field) + // There was a mistake that we use TIMESTAMP NTZ parser to infer LTZ type with legacy + // mode. The mistake makes it easier to infer TIMESTAMP LTZ type and we have to keep + // this behavior now. See SPARK-46769 for more details. + if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, false).isDefined) { + TimestampType + } else { + StringType + } + } else { + StringType + } } else { StringType } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3bd45ca0dcdb..78266acfd7de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1105,10 +1105,12 @@ abstract class CSVSuite test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") { withTempPath { path => - val exp = spark.sql(""" - select timestamp_ntz'2020-12-12 12:12:12' as col0 union all - select timestamp_ntz'2020-12-12 12:12:12' as col0 - """) + val exp = spark.sql( + """ + |select * + |from values (timestamp_ntz'2020-12-12 12:12:12'), (timestamp_ntz'2020-12-12 12:12:12') + |as t(col0) + |""".stripMargin) exp.write.format("csv").option("header", "true").save(path.getAbsolutePath) @@ -1126,6 +1128,15 @@ abstract class CSVSuite if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { checkAnswer(res, exp) + } else if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + // When legacy parser is enabled, we can't parse the NTZ string to LTZ, and eventually + // infer string type. + val expected = spark.read + .format("csv") + .option("inferSchema", "false") + .option("header", "true") + .load(path.getAbsolutePath) + checkAnswer(res, expected) } else { checkAnswer( res, @@ -2862,13 +2873,12 @@ abstract class CSVSuite test("SPARK-40474: Infer schema for columns with a mix of dates and timestamp") { withTempPath { path => - Seq( - "1765-03-28", + val input = Seq( "1423-11-12T23:41:00", + "1765-03-28", "2016-01-28T20:00:00" - ).toDF() - .repartition(1) - .write.text(path.getAbsolutePath) + ).toDF().repartition(1) + input.write.text(path.getAbsolutePath) if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { val options = Map( @@ -2879,12 +2889,7 @@ abstract class CSVSuite .format("csv") .options(options) .load(path.getAbsolutePath) - val expected = Seq( - Row(Timestamp.valueOf("1765-03-28 00:00:00.0")), - Row(Timestamp.valueOf("1423-11-12 23:41:00.0")), - Row(Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - checkAnswer(df, expected) + checkAnswer(df, input) } else { // When timestampFormat is specified, infer and parse the column as strings val options1 = Map( @@ -2895,12 +2900,7 @@ abstract class CSVSuite .format("csv") .options(options1) .load(path.getAbsolutePath) - val expected1 = Seq( - Row("1765-03-28"), - Row("1423-11-12T23:41:00"), - Row("2016-01-28T20:00:00") - ) - checkAnswer(df1, expected1) + checkAnswer(df1, input) // When timestampFormat is not specified, infer and parse the column as // timestamp type if possible --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org