This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 7e3ddc1e582 [SPARK-45433][SQL] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat 7e3ddc1e582 is described below commit 7e3ddc1e582a6e4fa96bab608c4c2bbc2c93b449 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Oct 11 19:33:23 2023 +0300 [SPARK-45433][SQL] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat ### What changes were proposed in this pull request? This PR fix CSV/JSON schema inference when timestamps do not match specified timestampFormat will report error. ```scala //eg val csv = spark.read.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS()) csv.show() //error Caused by: java.time.format.DateTimeParseException: Text '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 19 ``` This bug only happend when partition had one row. The data type should be `StringType` not `TimestampType` because the value not match `timestampFormat`. Use csv as eg, in `CSVInferSchema::tryParseTimestampNTZ`, first, use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to inferring return `TimestampType`, if same partition had another row, it will use `tryParseTimestamp` to parse row with user defined `timestampFormat`, then found it can't be convert to timestamp with `timestampFormat`. Finally return `StringType`. But when only one row, we use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to parse normally timestamp not r [...] ### Why are the changes needed? Fix schema inference bug. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43243 from Hisoka-X/SPARK-45433-inference-mismatch-timestamp-one-row. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit eae5c0e1efce83c2bb08754784db070be285285a) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 9 ++++++--- .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 8 +++++--- .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 10 ++++++++++ .../apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala | 8 ++++++++ 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 51586a0065e..ec01b56f9eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -202,8 +202,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType + val timestampType = SQLConf.get.timestampType + if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || + timestampType == TimestampNTZType) && + timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + timestampType } else { tryParseTimestamp(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 5385afe8c93..4123c5290b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -148,11 +148,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } + val timestampType = SQLConf.get.timestampType if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get - } else if (options.inferTimestamp && + } else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy == + LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType + timestampType } else if (options.inferTimestamp && timestampFormatter.parseOptional(field).isDefined) { TimestampType diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index acedf7998c2..fb91200557a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -263,4 +263,14 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) } + + test("SPARK-45433: inferring the schema when timestamps do not match specified timestampFormat" + + " with only one row") { + val options = new CSVOptions( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + columnPruning = false, + defaultTimeZoneId = "UTC") + val inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2884-06-24T02:45:51.138") == StringType) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 8290b38e339..81a4858dce8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -112,4 +112,12 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { checkType(Map("inferTimestamp" -> "true"), json, TimestampType) checkType(Map("inferTimestamp" -> "false"), json, StringType) } + + test("SPARK-45433: inferring the schema when timestamps do not match specified timestampFormat" + + " with only one row") { + checkType( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "inferTimestamp" -> "true"), + """{"a": "2884-06-24T02:45:51.138"}""", + StringType) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org