This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a9304455025 [SPARK-39731][SQL] Fix issue in CSV and JSON data sources when parsing dates in "yyyyMMdd" format with CORRECTED time parser policy a9304455025 is described below commit a93044550259fa0ee8897d0576f6eeac8ec73c27 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Wed Jul 27 16:46:10 2022 +0800 [SPARK-39731][SQL] Fix issue in CSV and JSON data sources when parsing dates in "yyyyMMdd" format with CORRECTED time parser policy ### What changes were proposed in this pull request? This PR fixes a correctness issue when reading a CSV or a JSON file with dates in "yyyyMMdd" format: ``` name,mydate 1,2020011 2,20201203 ``` or ``` {"date": "2020011"} {"date": "20201203"} ``` Prior to https://github.com/apache/spark/pull/32959, reading this CSV file would return: ``` +----+--------------+ |name|mydate | +----+--------------+ |1 |null | |2 |2020-12-03 | +----+--------------+ ``` However, after the patch, the invalid date is parsed because of the much more lenient parsing in `DateTimeUtils.stringToDate`, the method treats `2020011` as a full year: ``` +----+--------------+ |name|mydate | +----+--------------+ |1 |+2020011-01-01| |2 |2020-12-03 | +----+--------------+ ``` Similar result would be observed in JSON. This PR attempts to address correctness issue by introducing a new configuration option `enableDateTimeParsingFallback` which allows to enable/disable the backward compatible parsing. Currently, by default we will fall back to the backward compatible behavior only if parser policy is legacy and no custom pattern was set (this is defined in `UnivocityParser` and `JacksonParser` for csv and json respectively). ### Why are the changes needed? Fixes a correctness issue in Spark 3.4. ### Does this PR introduce _any_ user-facing change? In order to avoid correctness issues when reading CSV or JSON files with a custom pattern, a new configuration option `enableDateTimeParsingFallback` has been added to control whether or not the code would fall back to the backward compatible behavior of parsing dates and timestamps in CSV and JSON data sources. - If the config is enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. - If the config is enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. - Otherwise, depending on the parser policy and a custom pattern, the value will be parsed as null. ### How was this patch tested? I added unit tests for CSV and JSON to verify the fix and the config option. Closes #37147 from sadikovi/fix-csv-date-inference. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- docs/sql-data-sources-csv.md | 6 ++ docs/sql-data-sources-json.md | 6 ++ .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 11 ++++ .../spark/sql/catalyst/csv/UnivocityParser.scala | 38 ++++++++--- .../spark/sql/catalyst/json/JSONOptions.scala | 11 ++++ .../spark/sql/catalyst/json/JacksonParser.scala | 24 ++++++- .../sql/catalyst/csv/UnivocityParserSuite.scala | 19 +++++- .../sql/execution/datasources/csv/CSVSuite.scala | 75 +++++++++++++++++++++- .../sql/execution/datasources/json/JsonSuite.scala | 75 +++++++++++++++++++++- 9 files changed, 248 insertions(+), 17 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 8384f8332a6..7b538528219 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -174,6 +174,12 @@ Data source options of CSV can be set via: <td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td> <td>read/write</td> </tr> + <tr> + <td><code>enableDateTimeParsingFallback</code></td> + <td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td> + <td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td> + <td>read</td> + </tr> <tr> <td><code>maxColumns</code></td> <td>20480</td> diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md index 27d89875623..500cd65b58b 100644 --- a/docs/sql-data-sources-json.md +++ b/docs/sql-data-sources-json.md @@ -202,6 +202,12 @@ Data source options of JSON can be set via: <td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td> <td>read/write</td> </tr> + <tr> + <td><code>enableDateTimeParsingFallback</code></td> + <td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td> + <td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td> + <td>read</td> + </tr> <tr> <td><code>multiLine</code></td> <td><code>false</code></td> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index a033e3a3a8d..27806ea1c40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -190,6 +190,17 @@ class CSVOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see UnivocityParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 0237b6c454d..a6b4d7ea667 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -120,6 +120,20 @@ class UnivocityParser( new NoopFilters } + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { val currentContent = tokenizer.getContext.currentParsedContent() @@ -205,7 +219,10 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForTimestampType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToDate(str).getOrElse(throw e) } @@ -217,16 +234,17 @@ class UnivocityParser( timestampFormatter.parse(datum) } catch { case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { - // There may be date type entries in timestamp column due to schema inference - if (options.inferDate) { - daysToMicros(dateFormatter.parse(datum), options.zoneId) - } else { - throw(e) + // There may be date type entries in timestamp column due to schema inference + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility if enabled. + if (!enableParsingFallbackForDateType) { + throw e } + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5f90dbc49c9..66fd22894f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -111,6 +111,17 @@ private[sql] class JSONOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see JacksonParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7004d2a8f16..06133d44c13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -78,6 +78,20 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -257,7 +271,10 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForTimestampType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) } @@ -280,7 +297,10 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForDateType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToDate(str).getOrElse { // In Spark 1.5.0, we store the data as number of days since epoch in string. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 2589376bc3d..381ec57fcd1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -355,9 +355,22 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map.empty[String, String], false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) - val optionsWithPattern = new CSVOptions( - Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") - check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions( + Map( + "timestampFormat" -> "invalid", + "dateFormat" -> "invalid", + "enableDateTimeParsingFallback" -> s"$enableFallback"), + false, + "UTC") + + // With fallback enabled, we are still able to parse dates and timestamps. + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(true))) + + // With legacy parser disabled, parsing results in error. + val err = intercept[IllegalArgumentException] { + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(false))) + } + assert(err.getMessage.contains("Illegal pattern character: n")) } test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 758f5430608..0e571810390 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite @@ -2837,7 +2837,80 @@ abstract class CSVSuite ) assert(results.collect().toSeq.map(_.toSeq) == expected) } + } + } + + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { + withTempPath { path => + Seq( + "1,2020011,2020011", + "2,20201203,20201203").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("id", IntegerType) + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .csv(path.getAbsolutePath) + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(1, null, null), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) + } + } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean): DataFrame = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .csv(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3fe9c58c957..1ecaf748f5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ @@ -3249,6 +3249,79 @@ abstract class JsonSuite } } } + + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { + withTempPath { path => + Seq( + """{"date": "2020011", "ts": "2020011"}""", + """{"date": "20201203", "ts": "20201203"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .json(path.getAbsolutePath) + + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(null, null), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) + } + } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean): DataFrame = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .json(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) + } + } } class JsonV1Suite extends JsonSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org