This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7db0af5 [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter 7db0af5 is described below commit 7db0af578585ecaeee9fd23f8189292289b52a97 Author: Yuanjian Li <xyliyuanj...@gmail.com> AuthorDate: Thu Mar 5 15:29:39 2020 +0800 [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter ### What changes were proposed in this pull request? This is a follow-up work for #27441. For the cases of new TimestampFormatter return null while legacy formatter can return a value, we need to throw an exception instead of silent change. The legacy config will be referenced in the error message. ### Why are the changes needed? Avoid silent result change for new behavior in 3.0. ### Does this PR introduce any user-facing change? Yes, an exception is thrown when we detect legacy formatter can parse the string and the new formatter return null. ### How was this patch tested? Extend existing UT. Closes #27537 from xuanyuanking/SPARK-30668-follow. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/SparkException.scala | 7 +++ .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 + .../catalyst/expressions/datetimeExpressions.scala | 3 + .../spark/sql/catalyst/json/JacksonParser.scala | 2 + .../spark/sql/catalyst/util/DateFormatter.scala | 59 ++++++++++------- .../catalyst/util/DateTimeFormatterHelper.scala | 26 +++++++- .../sql/catalyst/util/TimestampFormatter.scala | 73 +++++++++++++--------- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++- .../expressions/DateExpressionsSuite.scala | 39 +++++++++--- .../sql/catalyst/json/JsonInferSchemaSuite.scala | 16 ++--- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 20 ++++-- .../org/apache/spark/sql/DateFunctionsSuite.scala | 58 ++++++++++------- .../sql/execution/datasources/csv/CSVSuite.scala | 22 ++++++- .../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++- 14 files changed, 269 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 4ad9a0c..81c087e 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -43,3 +43,10 @@ private[spark] case class SparkUserAppException(exitCode: Int) */ private[spark] case class ExecutorDeadException(message: String) extends SparkException(message) + +/** + * Exception thrown when Spark returns different result after upgrading to a new version. + */ +private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) + extends SparkException("You may get a different result due to the upgrading of Spark" + + s" $version: $message", cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index f829e6b..dd8537b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser +import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} @@ -285,6 +286,7 @@ class UnivocityParser( } } } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) row.setNullAt(i) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 767dacf..81815fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.apache.commons.text.StringEscapeUtils +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -789,6 +790,7 @@ abstract class ToTimestamp formatter.parse( t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: SparkUpgradeException => throw e case NonFatal(_) => null } } @@ -802,6 +804,7 @@ abstract class ToTimestamp TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: SparkUpgradeException => throw e case NonFatal(_) => null } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index da3b501..d0db06c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import com.fasterxml.jackson.core._ +import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -382,6 +383,7 @@ class JacksonParser( try { row.update(index, fieldConverters(index).apply(parser)) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 941c8fc..06ec918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -23,9 +23,9 @@ import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch @@ -35,16 +35,24 @@ sealed trait DateFormatter extends Serializable { class Iso8601DateFormatter( pattern: String, zoneId: ZoneId, - locale: Locale) extends DateFormatter with DateTimeFormatterHelper { + locale: Locale, + legacyFormat: LegacyDateFormats.LegacyDateFormat) + extends DateFormatter with DateTimeFormatterHelper { @transient private lazy val formatter = getOrCreateFormatter(pattern, locale) + @transient + private lazy val legacyFormatter = DateFormatter.getLegacyFormatter( + pattern, zoneId, locale, legacyFormat) + override def parse(s: String): Int = { val specialDate = convertSpecialDate(s.trim, zoneId) specialDate.getOrElse { - val localDate = LocalDate.parse(s, formatter) - localDateToDays(localDate) + try { + val localDate = LocalDate.parse(s, formatter) + localDateToDays(localDate) + } catch checkDiffResult(s, legacyFormatter.parse) } } @@ -88,33 +96,40 @@ object DateFormatter { val defaultLocale: Locale = Locale.US def defaultPattern(): String = { - if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd" + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" } private def getFormatter( - format: Option[String], - zoneId: ZoneId, - locale: Locale = defaultLocale, - legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { - + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { val pattern = format.getOrElse(defaultPattern) - if (SQLConf.get.legacyTimeParserEnabled) { - legacyFormat match { - case FAST_DATE_FORMAT => - new LegacyFastDateFormatter(pattern, locale) - case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => - new LegacySimpleDateFormatter(pattern, locale) - } + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { + getLegacyFormatter(pattern, zoneId, locale, legacyFormat) } else { - new Iso8601DateFormatter(pattern, zoneId, locale) + new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat) + } + } + + def getLegacyFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastDateFormatter(pattern, locale) + case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, locale) } } def apply( - format: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): DateFormatter = { + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { getFormatter(Some(format), zoneId, locale, legacyFormat) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index a7b6309..33aa733 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util import java.time._ import java.time.chrono.IsoChronology -import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, DateTimeParseException, ResolverStyle} import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} import java.util.Locale import com.google.common.cache.CacheBuilder +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ trait DateTimeFormatterHelper { // Converts the parsed temporal object to ZonedDateTime. It sets time components to zeros @@ -57,6 +60,27 @@ trait DateTimeFormatterHelper { } formatter } + + // When legacy time parser policy set to EXCEPTION, check whether we will get different results + // between legacy parser and new parser. If new parser fails but legacy parser works, throw a + // SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, + // DateTimeParseException will address by the caller side. + protected def checkDiffResult[T]( + s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { + case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => + val res = try { + Some(legacyParseFunc(s)) + } catch { + case _: Throwable => None + } + if (res.nonEmpty) { + throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " + + s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " + + s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e) + } else { + throw e + } + } } private object DateTimeFormatterHelper { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index b70a4ed..5c1a161 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -29,7 +29,9 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.types.Decimal sealed trait TimestampFormatter extends Serializable { @@ -52,21 +54,29 @@ sealed trait TimestampFormatter extends Serializable { class Iso8601TimestampFormatter( pattern: String, zoneId: ZoneId, - locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper { + locale: Locale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT) + extends TimestampFormatter with DateTimeFormatterHelper { @transient protected lazy val formatter = getOrCreateFormatter(pattern, locale) + @transient + protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter( + pattern, zoneId, locale, legacyFormat) + override def parse(s: String): Long = { val specialDate = convertSpecialTimestamp(s.trim, zoneId) specialDate.getOrElse { - val parsed = formatter.parse(s) - val parsedZoneId = parsed.query(TemporalQueries.zone()) - val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId - val zonedDateTime = toZonedDateTime(parsed, timeZoneId) - val epochSeconds = zonedDateTime.toEpochSecond - val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) - - Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) + try { + val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId + val zonedDateTime = toZonedDateTime(parsed, timeZoneId) + val epochSeconds = zonedDateTime.toEpochSecond + val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) + + Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) + } catch checkDiffResult(s, legacyFormatter.parse) } } @@ -186,31 +196,38 @@ object TimestampFormatter { def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss" private def getFormatter( - format: Option[String], - zoneId: ZoneId, - locale: Locale = defaultLocale, - legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { - + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { val pattern = format.getOrElse(defaultPattern) - if (SQLConf.get.legacyTimeParserEnabled) { - legacyFormat match { - case FAST_DATE_FORMAT => - new LegacyFastTimestampFormatter(pattern, zoneId, locale) - case SIMPLE_DATE_FORMAT => - new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) - case LENIENT_SIMPLE_DATE_FORMAT => - new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) - } + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { + getLegacyFormatter(pattern, zoneId, locale, legacyFormat) } else { - new Iso8601TimestampFormatter(pattern, zoneId, locale) + new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat) + } + } + + def getLegacyFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastTimestampFormatter(pattern, zoneId, locale) + case SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) + case LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) } } def apply( - format: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): TimestampFormatter = { + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { getFormatter(Some(format), zoneId, locale, legacyFormat) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f55f22..2d17fb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2352,6 +2352,18 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") + .internal() + .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + + "dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. " + + "When set to CORRECTED, classes from java.time.* packages are used for the same purpose. " + + "The default value is EXCEPTION, RuntimeException is thrown when we will get different " + + "results.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC = buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists") .internal() @@ -2743,7 +2755,9 @@ class SQLConf extends Serializable with Logging { def legacyMsSqlServerNumericMappingEnabled: Boolean = getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED) - def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { + LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) + } /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index e43eb59..7fced04 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -23,7 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId, ZoneOffset} import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.TimeUnit._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUpgradeException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter} @@ -241,8 +241,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("DateFormat") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkEvaluation( DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), null) @@ -710,8 +710,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("from_unixtime") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val fmt1 = "yyyy-MM-dd HH:mm:ss" val sdf1 = new SimpleDateFormat(fmt1, Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" @@ -758,8 +758,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" val sdf2 = new SimpleDateFormat(fmt2, Locale.US) @@ -824,8 +824,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("to_unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val fmt1 = "yyyy-MM-dd HH:mm:ss" val sdf1 = new SimpleDateFormat(fmt1, Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" @@ -1164,4 +1164,25 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal(LocalDate.of(1, 1, 1))), IntervalUtils.stringToInterval(UTF8String.fromString("interval 9999 years"))) } + + test("to_timestamp exception mode") { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkEvaluation( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), 1580184371847000L) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkEvaluation( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), null) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + checkExceptionInExpression[SparkUpgradeException]( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Fail to parse") + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index c2e03bd..bce917c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -40,8 +40,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("inferring timestamp type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkTimestampType("yyyy", """{"a": "2018"}""") checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") @@ -56,8 +56,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("prefer decimals over timestamps") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParser) { checkType( options = Map( "prefersDecimal" -> "true", @@ -71,8 +71,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("skip decimal type inferring") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkType( options = Map( "prefersDecimal" -> "false", @@ -86,8 +86,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("fallback to string type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkType( options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), json = """{"a": "20181202.210400123"}""", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 61f0e13..89fb4d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -59,10 +59,22 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val df2 = df .select(from_csv($"value", schemaWithCorrField1, Map( "mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord))) - - checkAnswer(df2, Seq( - Row(Row(0, null, "0,2013-111-11 12:13:14")), - Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(df2, Seq( + Row(Row(0, null, "0,2013-111-11 12:13:14")), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(df2, Seq( + Row(Row(0, java.sql.Date.valueOf("2022-03-11"), null)), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + df2.collect() + }.getCause.getMessage + assert(msg.contains("Fail to parse")) + } } test("schema_of_csv - infers schemas") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index fd65f75..3865012 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -23,7 +23,8 @@ import java.time.{Instant, LocalDateTime, ZoneId} import java.util.{Locale, TimeZone} import java.util.concurrent.TimeUnit -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -96,8 +97,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("date format") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") checkAnswer( @@ -377,6 +378,13 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30")))) } + def checkExceptionMessage(df: DataFrame): Unit = { + val message = intercept[SparkException] { + df.collect() + }.getCause.getMessage + assert(message.contains("Fail to parse")) + } + test("function to_date") { val d1 = Date.valueOf("2015-07-22") val d2 = Date.valueOf("2015-07-01") @@ -422,9 +430,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(to_date(col("d"), "yyyy-MM-dd")), Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-31")))) - checkAnswer( - df.select(to_date(col("s"), "yyyy-MM-dd")), - Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) + val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key + withSQLConf(confKey -> "corrected") { + checkAnswer( + df.select(to_date(col("s"), "yyyy-MM-dd")), + Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) + } + withSQLConf(confKey -> "exception") { + checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd"))) + } // now switch format checkAnswer( @@ -529,8 +543,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("from_unixtime") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" val sdf2 = new SimpleDateFormat(fmt2, Locale.US) @@ -562,8 +576,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") @@ -629,8 +643,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("to_unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") @@ -680,8 +694,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { test("to_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") @@ -701,7 +715,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(unix_timestamp(col("ss")).cast("timestamp"))) checkAnswer(df.select(to_timestamp(col("ss"))), Seq( Row(ts1), Row(ts2))) - if (legacyParser) { + if (legacyParserPolicy == "legacy") { // In Spark 2.4 and earlier, to_timestamp() parses in seconds precision and cuts off // the fractional part of seconds. The behavior was changed by SPARK-27438. val legacyFmt = "yyyy/MM/dd HH:mm:ss" @@ -819,16 +833,18 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("SPARK-30668: use legacy timestamp parser in to_timestamp") { - def checkTimeZoneParsing(expected: Any): Unit = { - val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key + val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + withSQLConf(confKey -> "legacy") { + val expected = Timestamp.valueOf("2020-01-27 20:06:11.847") checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(expected)) } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { - checkTimeZoneParsing(Timestamp.valueOf("2020-01-27 20:06:11.847")) + withSQLConf(confKey -> "corrected") { + checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null)) } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") { - checkTimeZoneParsing(null) + withSQLConf(confKey -> "exception") { + checkExceptionMessage(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz"))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 43553df..30ae9dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2307,6 +2307,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) } + + test("exception mode for parsing date/timestamp string") { + val ds = Seq("2020-01-27T20:06:11.847-0800").toDS() + val csv = spark.read + .option("header", false) + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz") + .schema("t timestamp").csv(ds) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + csv.collect() + }.getCause.getMessage + assert(msg.contains("Fail to parse")) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(csv, Row(null)) + } + } } class CSVv1Suite extends CSVSuite { @@ -2327,5 +2347,5 @@ class CSVLegacyTimeParserSuite extends CSVSuite { override protected def sparkConf: SparkConf = super .sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 26d600e..917da5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2669,6 +2669,26 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson Date.valueOf("2020-1-12"), Date.valueOf(LocalDate.ofEpochDay(12345)))) } + + test("exception mode for parsing date/timestamp string") { + val ds = Seq("{'t': '2020-01-27T20:06:11.847-0800'}").toDS() + val json = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz") + .json(ds) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + json.collect() + }.getCause.getMessage + assert(msg.contains("Fail to parse")) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(json, Row(null)) + } + } } class JsonV1Suite extends JsonSuite { @@ -2689,5 +2709,5 @@ class JsonLegacyTimeParserSuite extends JsonSuite { override protected def sparkConf: SparkConf = super .sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org