This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7db0af5  [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of 
silent change for new DateFormatter
7db0af5 is described below

commit 7db0af578585ecaeee9fd23f8189292289b52a97
Author: Yuanjian Li <xyliyuanj...@gmail.com>
AuthorDate: Thu Mar 5 15:29:39 2020 +0800

    [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for 
new DateFormatter
    
    ### What changes were proposed in this pull request?
    This is a follow-up work for #27441. For the cases of new 
TimestampFormatter return null while legacy formatter can return a value, we 
need to throw an exception instead of silent change. The legacy config will be 
referenced in the error message.
    
    ### Why are the changes needed?
    Avoid silent result change for new behavior in 3.0.
    
    ### Does this PR introduce any user-facing change?
    Yes, an exception is thrown when we detect legacy formatter can parse the 
string and the new formatter return null.
    
    ### How was this patch tested?
    Extend existing UT.
    
    Closes #27537 from xuanyuanking/SPARK-30668-follow.
    
    Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/SparkException.scala    |  7 +++
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +
 .../catalyst/expressions/datetimeExpressions.scala |  3 +
 .../spark/sql/catalyst/json/JacksonParser.scala    |  2 +
 .../spark/sql/catalyst/util/DateFormatter.scala    | 59 ++++++++++-------
 .../catalyst/util/DateTimeFormatterHelper.scala    | 26 +++++++-
 .../sql/catalyst/util/TimestampFormatter.scala     | 73 +++++++++++++---------
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++-
 .../expressions/DateExpressionsSuite.scala         | 39 +++++++++---
 .../sql/catalyst/json/JsonInferSchemaSuite.scala   | 16 ++---
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 20 ++++--
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 58 ++++++++++-------
 .../sql/execution/datasources/csv/CSVSuite.scala   | 22 ++++++-
 .../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++-
 14 files changed, 269 insertions(+), 96 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala 
b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4ad9a0c..81c087e 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -43,3 +43,10 @@ private[spark] case class SparkUserAppException(exitCode: 
Int)
  */
 private[spark] case class ExecutorDeadException(message: String)
   extends SparkException(message)
+
+/**
+ * Exception thrown when Spark returns different result after upgrading to a 
new version.
+ */
+private[spark] class SparkUpgradeException(version: String, message: String, 
cause: Throwable)
+  extends SparkException("You may get a different result due to the upgrading 
of Spark" +
+    s" $version: $message", cause)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index f829e6b..dd8537b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 
 import com.univocity.parsers.csv.CsvParser
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericInternalRow}
@@ -285,6 +286,7 @@ class UnivocityParser(
           }
         }
       } catch {
+        case e: SparkUpgradeException => throw e
         case NonFatal(e) =>
           badRecordException = badRecordException.orElse(Some(e))
           row.setNullAt(i)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 767dacf..81815fc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.text.StringEscapeUtils
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -789,6 +790,7 @@ abstract class ToTimestamp
               formatter.parse(
                 t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
+              case e: SparkUpgradeException => throw e
               case NonFatal(_) => null
             }
           }
@@ -802,6 +804,7 @@ abstract class ToTimestamp
               TimestampFormatter(formatString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
                 .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
+              case e: SparkUpgradeException => throw e
               case NonFatal(_) => null
             }
           }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index da3b501..d0db06c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -382,6 +383,7 @@ class JacksonParser(
           try {
             row.update(index, fieldConverters(index).apply(parser))
           } catch {
+            case e: SparkUpgradeException => throw e
             case NonFatal(e) =>
               badRecordException = badRecordException.orElse(Some(e))
               parser.skipChildren()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index 941c8fc..06ec918 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -23,9 +23,9 @@ import java.util.{Date, Locale}
 
 import org.apache.commons.lang3.time.FastDateFormat
 
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 
 sealed trait DateFormatter extends Serializable {
   def parse(s: String): Int // returns days since epoch
@@ -35,16 +35,24 @@ sealed trait DateFormatter extends Serializable {
 class Iso8601DateFormatter(
     pattern: String,
     zoneId: ZoneId,
-    locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
+    locale: Locale,
+    legacyFormat: LegacyDateFormats.LegacyDateFormat)
+  extends DateFormatter with DateTimeFormatterHelper {
 
   @transient
   private lazy val formatter = getOrCreateFormatter(pattern, locale)
 
+  @transient
+  private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(
+    pattern, zoneId, locale, legacyFormat)
+
   override def parse(s: String): Int = {
     val specialDate = convertSpecialDate(s.trim, zoneId)
     specialDate.getOrElse {
-      val localDate = LocalDate.parse(s, formatter)
-      localDateToDays(localDate)
+      try {
+        val localDate = LocalDate.parse(s, formatter)
+        localDateToDays(localDate)
+      } catch checkDiffResult(s, legacyFormatter.parse)
     }
   }
 
@@ -88,33 +96,40 @@ object DateFormatter {
   val defaultLocale: Locale = Locale.US
 
   def defaultPattern(): String = {
-    if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else 
"uuuu-MM-dd"
   }
 
   private def getFormatter(
-    format: Option[String],
-    zoneId: ZoneId,
-    locale: Locale = defaultLocale,
-    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
DateFormatter = {
-
+      format: Option[String],
+      zoneId: ZoneId,
+      locale: Locale = defaultLocale,
+      legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
DateFormatter = {
     val pattern = format.getOrElse(defaultPattern)
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      legacyFormat match {
-        case FAST_DATE_FORMAT =>
-          new LegacyFastDateFormatter(pattern, locale)
-        case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
-          new LegacySimpleDateFormatter(pattern, locale)
-      }
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+      getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
     } else {
-      new Iso8601DateFormatter(pattern, zoneId, locale)
+      new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat)
+    }
+  }
+
+  def getLegacyFormatter(
+      pattern: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): DateFormatter = {
+    legacyFormat match {
+      case FAST_DATE_FORMAT =>
+        new LegacyFastDateFormatter(pattern, locale)
+      case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
+        new LegacySimpleDateFormatter(pattern, locale)
     }
   }
 
   def apply(
-    format: String,
-    zoneId: ZoneId,
-    locale: Locale,
-    legacyFormat: LegacyDateFormat): DateFormatter = {
+      format: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): DateFormatter = {
     getFormatter(Some(format), zoneId, locale, legacyFormat)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
index a7b6309..33aa733 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util
 
 import java.time._
 import java.time.chrono.IsoChronology
-import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, 
ResolverStyle}
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, 
DateTimeParseException, ResolverStyle}
 import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
 import java.util.Locale
 
 import com.google.common.cache.CacheBuilder
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 
 trait DateTimeFormatterHelper {
   // Converts the parsed temporal object to ZonedDateTime. It sets time 
components to zeros
@@ -57,6 +60,27 @@ trait DateTimeFormatterHelper {
     }
     formatter
   }
+
+  // When legacy time parser policy set to EXCEPTION, check whether we will 
get different results
+  // between legacy parser and new parser. If new parser fails but legacy 
parser works, throw a
+  // SparkUpgradeException. On the contrary, if the legacy policy set to 
CORRECTED,
+  // DateTimeParseException will address by the caller side.
+  protected def checkDiffResult[T](
+      s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] 
= {
+    case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == 
EXCEPTION =>
+      val res = try {
+        Some(legacyParseFunc(s))
+      } catch {
+        case _: Throwable => None
+      }
+      if (res.nonEmpty) {
+        throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new 
parser. You can " +
+          s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore 
the behavior " +
+          s"before Spark 3.0, or set to CORRECTED and treat it as an invalid 
datetime string.", e)
+      } else {
+        throw e
+      }
+  }
 }
 
 private object DateTimeFormatterHelper {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index b70a4ed..5c1a161 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -29,7 +29,9 @@ import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, 
LENIENT_SIMPLE_DATE_FORMAT}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.types.Decimal
 
 sealed trait TimestampFormatter extends Serializable {
@@ -52,21 +54,29 @@ sealed trait TimestampFormatter extends Serializable {
 class Iso8601TimestampFormatter(
     pattern: String,
     zoneId: ZoneId,
-    locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
+    locale: Locale,
+    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT)
+  extends TimestampFormatter with DateTimeFormatterHelper {
   @transient
   protected lazy val formatter = getOrCreateFormatter(pattern, locale)
 
+  @transient
+  protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter(
+    pattern, zoneId, locale, legacyFormat)
+
   override def parse(s: String): Long = {
     val specialDate = convertSpecialTimestamp(s.trim, zoneId)
     specialDate.getOrElse {
-      val parsed = formatter.parse(s)
-      val parsedZoneId = parsed.query(TemporalQueries.zone())
-      val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
-      val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
-      val epochSeconds = zonedDateTime.toEpochSecond
-      val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
-
-      Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+      try {
+        val parsed = formatter.parse(s)
+        val parsedZoneId = parsed.query(TemporalQueries.zone())
+        val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
+        val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
+        val epochSeconds = zonedDateTime.toEpochSecond
+        val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
+
+        Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+      } catch checkDiffResult(s, legacyFormatter.parse)
     }
   }
 
@@ -186,31 +196,38 @@ object TimestampFormatter {
   def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss"
 
   private def getFormatter(
-    format: Option[String],
-    zoneId: ZoneId,
-    locale: Locale = defaultLocale,
-    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
TimestampFormatter = {
-
+      format: Option[String],
+      zoneId: ZoneId,
+      locale: Locale = defaultLocale,
+      legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
TimestampFormatter = {
     val pattern = format.getOrElse(defaultPattern)
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      legacyFormat match {
-        case FAST_DATE_FORMAT =>
-          new LegacyFastTimestampFormatter(pattern, zoneId, locale)
-        case SIMPLE_DATE_FORMAT =>
-          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient 
= false)
-        case LENIENT_SIMPLE_DATE_FORMAT =>
-          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient 
= true)
-      }
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+      getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
     } else {
-      new Iso8601TimestampFormatter(pattern, zoneId, locale)
+      new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat)
+    }
+  }
+
+  def getLegacyFormatter(
+      pattern: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): TimestampFormatter = {
+    legacyFormat match {
+      case FAST_DATE_FORMAT =>
+        new LegacyFastTimestampFormatter(pattern, zoneId, locale)
+      case SIMPLE_DATE_FORMAT =>
+        new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = 
false)
+      case LENIENT_SIMPLE_DATE_FORMAT =>
+        new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = 
true)
     }
   }
 
   def apply(
-    format: String,
-    zoneId: ZoneId,
-    locale: Locale,
-    legacyFormat: LegacyDateFormat): TimestampFormatter = {
+      format: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): TimestampFormatter = {
     getFormatter(Some(format), zoneId, locale, legacyFormat)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7f55f22..2d17fb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2352,6 +2352,18 @@ object SQLConf {
     .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
     .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
+  val LEGACY_TIME_PARSER_POLICY = 
buildConf("spark.sql.legacy.timeParserPolicy")
+    .internal()
+    .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and 
parsing " +
+      "dates/timestamps in a locale-sensitive manner, which is the approach 
before Spark 3.0. " +
+      "When set to CORRECTED, classes from java.time.* packages are used for 
the same purpose. " +
+      "The default value is EXCEPTION, RuntimeException is thrown when we will 
get different " +
+      "results.")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+    .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
   val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
     buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists")
       .internal()
@@ -2743,7 +2755,9 @@ class SQLConf extends Serializable with Logging {
   def legacyMsSqlServerNumericMappingEnabled: Boolean =
     getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
 
-  def legacyTimeParserEnabled: Boolean = 
getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
+  def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
+    LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
+  }
 
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used 
to determine if two
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index e43eb59..7fced04 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -23,7 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId, 
ZoneOffset}
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, 
TimestampFormatter}
@@ -241,8 +241,8 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("DateFormat") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         checkEvaluation(
           DateFormatClass(Literal.create(null, TimestampType), Literal("y"), 
gmtId),
           null)
@@ -710,8 +710,8 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("from_unixtime") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val fmt1 = "yyyy-MM-dd HH:mm:ss"
         val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -758,8 +758,8 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
         val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -824,8 +824,8 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("to_unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val fmt1 = "yyyy-MM-dd HH:mm:ss"
         val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -1164,4 +1164,25 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
         Literal(LocalDate.of(1, 1, 1))),
       IntervalUtils.stringToInterval(UTF8String.fromString("interval 9999 
years")))
   }
+
+  test("to_timestamp exception mode") {
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkEvaluation(
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), 1580184371847000L)
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkEvaluation(
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), null)
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      checkExceptionInExpression[SparkUpgradeException](
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Fail to parse")
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
index c2e03bd..bce917c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
@@ -40,8 +40,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("inferring timestamp type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         checkTimestampType("yyyy", """{"a": "2018"}""")
         checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
         checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
@@ -56,8 +56,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("prefer decimals over timestamps") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParser) {
         checkType(
           options = Map(
             "prefersDecimal" -> "true",
@@ -71,8 +71,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("skip decimal type inferring") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         checkType(
           options = Map(
             "prefersDecimal" -> "false",
@@ -86,8 +86,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("fallback to string type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         checkType(
           options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
           json = """{"a": "20181202.210400123"}""",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 61f0e13..89fb4d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -59,10 +59,22 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val df2 = df
       .select(from_csv($"value", schemaWithCorrField1, Map(
         "mode" -> "Permissive", "columnNameOfCorruptRecord" -> 
columnNameOfCorruptRecord)))
-
-    checkAnswer(df2, Seq(
-      Row(Row(0, null, "0,2013-111-11 12:13:14")),
-      Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(df2, Seq(
+        Row(Row(0, null, "0,2013-111-11 12:13:14")),
+        Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(df2, Seq(
+        Row(Row(0, java.sql.Date.valueOf("2022-03-11"), null)),
+        Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        df2.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
   }
 
   test("schema_of_csv - infers schemas") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index fd65f75..3865012 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -23,7 +23,8 @@ import java.time.{Instant, LocalDateTime, ZoneId}
 import java.util.{Locale, TimeZone}
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -96,8 +97,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("date format") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c")
 
         checkAnswer(
@@ -377,6 +378,13 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
       Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30"))))
   }
 
+  def checkExceptionMessage(df: DataFrame): Unit = {
+    val message = intercept[SparkException] {
+      df.collect()
+    }.getCause.getMessage
+    assert(message.contains("Fail to parse"))
+  }
+
   test("function to_date") {
     val d1 = Date.valueOf("2015-07-22")
     val d2 = Date.valueOf("2015-07-01")
@@ -422,9 +430,15 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
       df.select(to_date(col("d"), "yyyy-MM-dd")),
       Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")),
         Row(Date.valueOf("2014-12-31"))))
-    checkAnswer(
-      df.select(to_date(col("s"), "yyyy-MM-dd")),
-      Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+    val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+    withSQLConf(confKey -> "corrected") {
+      checkAnswer(
+        df.select(to_date(col("s"), "yyyy-MM-dd")),
+        Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+    }
+    withSQLConf(confKey -> "exception") {
+      checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd")))
+    }
 
     // now switch format
     checkAnswer(
@@ -529,8 +543,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("from_unixtime") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
         val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -562,8 +576,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   private def secs(millis: Long): Long = 
TimeUnit.MILLISECONDS.toSeconds(millis)
 
   test("unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -629,8 +643,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("to_unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -680,8 +694,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
 
   test("to_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) 
{
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00")
@@ -701,7 +715,7 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
           df.select(unix_timestamp(col("ss")).cast("timestamp")))
         checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
           Row(ts1), Row(ts2)))
-        if (legacyParser) {
+        if (legacyParserPolicy == "legacy") {
           // In Spark 2.4 and earlier, to_timestamp() parses in seconds 
precision and cuts off
           // the fractional part of seconds. The behavior was changed by 
SPARK-27438.
           val legacyFmt = "yyyy/MM/dd HH:mm:ss"
@@ -819,16 +833,18 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("SPARK-30668: use legacy timestamp parser in to_timestamp") {
-    def checkTimeZoneParsing(expected: Any): Unit = {
-      val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+    val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+    val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+    withSQLConf(confKey -> "legacy") {
+      val expected = Timestamp.valueOf("2020-01-27 20:06:11.847")
       checkAnswer(df.select(to_timestamp(col("ts"), 
"yyyy-MM-dd'T'HH:mm:ss.SSSz")),
         Row(expected))
     }
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
-      checkTimeZoneParsing(Timestamp.valueOf("2020-01-27 20:06:11.847"))
+    withSQLConf(confKey -> "corrected") {
+      checkAnswer(df.select(to_timestamp(col("ts"), 
"yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null))
     }
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") {
-      checkTimeZoneParsing(null)
+    withSQLConf(confKey -> "exception") {
+      checkExceptionMessage(df.select(to_timestamp(col("ts"), 
"yyyy-MM-dd'T'HH:mm:ss.SSSz")))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 43553df..30ae9dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2307,6 +2307,26 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
     val csv = spark.read.option("header", false).schema("t timestamp, d 
date").csv(ds)
     checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), 
Date.valueOf("2020-1-12")))
   }
+
+  test("exception mode for parsing date/timestamp string") {
+    val ds = Seq("2020-01-27T20:06:11.847-0800").toDS()
+    val csv = spark.read
+      .option("header", false)
+      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+      .schema("t timestamp").csv(ds)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        csv.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(csv, Row(null))
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {
@@ -2327,5 +2347,5 @@ class CSVLegacyTimeParserSuite extends CSVSuite {
   override protected def sparkConf: SparkConf =
     super
       .sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 26d600e..917da5e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2669,6 +2669,26 @@ abstract class JsonSuite extends QueryTest with 
SharedSparkSession with TestJson
       Date.valueOf("2020-1-12"),
       Date.valueOf(LocalDate.ofEpochDay(12345))))
   }
+
+  test("exception mode for parsing date/timestamp string") {
+    val ds = Seq("{'t': '2020-01-27T20:06:11.847-0800'}").toDS()
+    val json = spark.read
+      .schema("t timestamp")
+      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+      .json(ds)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        json.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(json, Row(null))
+    }
+  }
 }
 
 class JsonV1Suite extends JsonSuite {
@@ -2689,5 +2709,5 @@ class JsonLegacyTimeParserSuite extends JsonSuite {
   override protected def sparkConf: SparkConf =
     super
       .sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to