This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a59063d  [SPARK-35581][SQL] Support special datetime values in typed 
literals only
a59063d is described below

commit a59063d5446a78d713c19a0d43f86c9f72ffd77d
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Jun 1 15:29:05 2021 +0300

    [SPARK-35581][SQL] Support special datetime values in typed literals only
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to support special datetime values introduced by 
#25708 and by #25716 only in typed literals, and don't recognize them in 
parsing strings to dates/timestamps. The following string values are supported 
only in typed timestamp literals:
    - `epoch [zoneId]` - `1970-01-01 00:00:00+00 (Unix system time zero)`
    - `today [zoneId]` - midnight today.
    - `yesterday [zoneId]` - midnight yesterday
    - `tomorrow [zoneId]` - midnight tomorrow
    - `now` - current query start time.
    
    For example:
    ```sql
    spark-sql> SELECT timestamp 'tomorrow';
    2019-09-07 00:00:00
    ```
    
    Similarly, the following special date values are supported only in typed 
date literals:
    - `epoch [zoneId]` - `1970-01-01`
    - `today [zoneId]` - the current date in the time zone specified by 
`spark.sql.session.timeZone`.
    - `yesterday [zoneId]` - the current date -1
    - `tomorrow [zoneId]` - the current date + 1
    - `now` - the date of running the current query. It has the same notion as 
`today`.
    
    For example:
    ```sql
    spark-sql> SELECT date 'tomorrow' - date 'yesterday';
    2
    ```
    
    ### Why are the changes needed?
    In the current implementation, Spark supports the special date/timestamp 
value in any input strings casted to dates/timestamps that leads to the 
following problems:
    - If executors have different system time, the result is inconsistent, and 
random. Column values depend on where the conversions were performed.
    - The special values play the role of distributed non-deterministic 
functions though users might think of the values as constants.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes but the probability should be small.
    
    ### How was this patch tested?
    By running existing test suites:
    ```
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z 
interval.sql"
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z 
date.sql"
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z 
timestamp.sql"
    $ build/sbt "test:testOnly *DateTimeUtilsSuite"
    ```
    
    Closes #32714 from MaxGekk/remove-datetime-special-values.
    
    Lead-authored-by: Max Gekk <max.g...@gmail.com>
    Co-authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 docs/sql-migration-guide.md                        |  2 ++
 .../src/main/scala/org/apache/spark/sql/Row.scala  |  2 +-
 .../spark/sql/catalyst/catalog/interface.scala     |  4 +--
 .../sql/catalyst/csv/UnivocityGenerator.scala      |  1 -
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  3 +-
 .../spark/sql/catalyst/expressions/Cast.scala      | 20 +++++--------
 .../spark/sql/catalyst/expressions/literals.scala  |  2 +-
 .../spark/sql/catalyst/json/JacksonGenerator.scala |  1 -
 .../spark/sql/catalyst/json/JacksonParser.scala    |  3 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  9 ++++--
 .../spark/sql/catalyst/util/DateFormatter.scala    | 34 ++++++++-------------
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 35 +++++-----------------
 .../sql/catalyst/util/TimestampFormatter.scala     | 23 +++++++-------
 .../expressions/HashExpressionsSuite.scala         |  2 +-
 .../sql/catalyst/util/DateFormatterSuite.scala     | 34 ++++++---------------
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 32 ++++++++++----------
 .../sql/catalyst/util/DatetimeFormatterSuite.scala |  2 +-
 .../catalyst/util/TimestampFormatterSuite.scala    | 24 +--------------
 .../spark/sql/catalyst/util/UnsafeArraySuite.scala |  6 ++--
 .../execution/BaseScriptTransformationExec.scala   | 11 +++----
 .../apache/spark/sql/execution/HiveResult.scala    | 12 ++------
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../execution/datasources/jdbc/JDBCRelation.scala  |  5 ++--
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  4 +--
 .../sql-tests/inputs/postgreSQL/timestamp.sql      | 16 +++++-----
 .../sql-tests/results/postgreSQL/timestamp.sql.out | 16 +++++-----
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 19 ------------
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 19 ------------
 .../parquet/ParquetPartitionDiscoverySuite.scala   |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala       |  8 ++---
 .../apache/spark/sql/hive/client/HiveClient.scala  |  3 +-
 .../spark/sql/hive/client/HiveClientImpl.scala     |  5 ++--
 .../apache/spark/sql/hive/client/HiveShim.scala    | 17 +++++------
 .../spark/sql/hive/client/FiltersSuite.scala       | 14 ++++-----
 .../hive/client/HivePartitionFilteringSuite.scala  |  5 ++--
 .../spark/sql/hive/client/VersionsSuite.scala      |  3 +-
 36 files changed, 131 insertions(+), 269 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 50d75ac..a5b2d8b 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -91,6 +91,8 @@ license: |
 
   - In Spark 3.2, `CREATE TABLE AS SELECT` with non-empty `LOCATION` will 
throw `AnalysisException`. To restore the behavior before Spark 3.2, you can 
set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`.
 
+  - In Spark 3.2, special datetime values such as `epoch`, `today`, 
`yesterday`, `tomorrow`, and `now` are supported in typed literals only, for 
instance, `select timestamp'now'`. In Spark 3.1 and 3.0, such special values 
are supported in any casts of strings to dates/timestamps. To keep these 
special values as dates/timestamps in Spark 3.1 and 3.0, you should replace 
them manually, e.g. `if (c in ('now', 'today'), current_date(), cast(c as 
date))`.
+
 ## Upgrading from Spark SQL 3.0 to 3.1
 
   - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, 
`stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, 
`corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs 
during expression evaluation, for example, when `stddev_samp` applied on a 
single element set. In Spark version 3.0 and earlier, it will return 
`Double.NaN` in such case. To restore the behavior before Spark 3.1, you can 
set `spark.sql.legacy.statisticalAggrega [...]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index d43c57e..558fddb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -549,7 +549,7 @@ trait Row extends Serializable {
     require(schema != null, "JSON serialization requires a non-null schema.")
 
     lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
-    lazy val dateFormatter = DateFormatter.apply(zoneId)
+    lazy val dateFormatter = DateFormatter()
     lazy val timestampFormatter = TimestampFormatter(zoneId)
 
     // Convert an iterator of values to a json array
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 307c67b..3dc5aca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -668,7 +668,7 @@ object CatalogColumnStat extends Logging {
     dataType match {
       case BooleanType => s.toBoolean
       case DateType if version == 1 => 
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
-      case DateType => DateFormatter(ZoneOffset.UTC).parse(s)
+      case DateType => DateFormatter().parse(s)
       case TimestampType if version == 1 =>
         DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
       case TimestampType => getTimestampFormatter(isParsing = true).parse(s)
@@ -693,7 +693,7 @@ object CatalogColumnStat extends Logging {
    */
   def toExternalString(v: Any, colName: String, dataType: DataType): String = {
     val externalValue = dataType match {
-      case DateType => 
DateFormatter(ZoneOffset.UTC).format(v.asInstanceOf[Int])
+      case DateType => DateFormatter().format(v.asInstanceOf[Int])
       case TimestampType => getTimestampFormatter(isParsing = 
false).format(v.asInstanceOf[Long])
       case BooleanType | _: IntegralType | FloatType | DoubleType => v
       case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index a3ee129..11b31ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -50,7 +50,6 @@ class UnivocityGenerator(
     isParsing = false)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
-    options.zoneId,
     options.locale,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = false)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 08d2273..672d133 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -95,7 +95,6 @@ class UnivocityParser(
     isParsing = true)
   private lazy val dateFormatter = DateFormatter(
     options.dateFormat,
-    options.zoneId,
     options.locale,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
@@ -206,7 +205,7 @@ class UnivocityParser(
             // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
             // compatibility.
             val str = 
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
-            DateTimeUtils.stringToDate(str, options.zoneId).getOrElse(throw e)
+            DateTimeUtils.stringToDate(str).getOrElse(throw e)
         }
       }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 631f993..d879fb0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -304,7 +304,7 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
   // [[func]] assumes the input is no longer null because eval already does 
the null check.
   @inline protected[this] def buildCast[T](a: Any, func: T => Any): Any = 
func(a.asInstanceOf[T])
 
-  private lazy val dateFormatter = DateFormatter(zoneId)
+  private lazy val dateFormatter = DateFormatter()
   private lazy val timestampFormatter = 
TimestampFormatter.getFractionFormatter(zoneId)
 
   private val legacyCastToStr = 
SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING)
@@ -520,9 +520,9 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
   private[this] def castToDate(from: DataType): Any => Any = from match {
     case StringType =>
       if (ansiEnabled) {
-        buildCast[UTF8String](_, s => DateTimeUtils.stringToDateAnsi(s, 
zoneId))
+        buildCast[UTF8String](_, s => DateTimeUtils.stringToDateAnsi(s))
       } else {
-        buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s, 
zoneId).orNull)
+        buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).orNull)
       }
     case TimestampType =>
       // throw valid precision more than seconds, according to Hive.
@@ -1167,25 +1167,18 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
   private[this] def castToDateCode(
       from: DataType,
       ctx: CodegenContext): CastFunction = {
-    def getZoneId() = {
-      val zoneIdClass = classOf[ZoneId]
-      JavaCode.global(
-        ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
-        zoneIdClass)
-    }
     from match {
       case StringType =>
         val intOpt = ctx.freshVariable("intOpt", classOf[Option[Integer]])
-        val zid = getZoneId()
         (c, evPrim, evNull) =>
           if (ansiEnabled) {
             code"""
-              $evPrim = 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDateAnsi($c, $zid);
+              $evPrim = 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDateAnsi($c);
             """
           } else {
             code"""
               scala.Option<Integer> $intOpt =
-                
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c, $zid);
+                
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c);
               if ($intOpt.isDefined()) {
                 $evPrim = ((Integer) $intOpt.get()).intValue();
               } else {
@@ -1195,7 +1188,8 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
           }
 
       case TimestampType =>
-        val zid = getZoneId()
+        val zidClass = classOf[ZoneId]
+        val zid = JavaCode.global(ctx.addReferenceObj("zoneId", zoneId, 
zidClass.getName), zidClass)
         (c, evPrim, evNull) =>
           code"""$evPrim =
             org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, 
$zid);"""
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 1a531e2..9ffa58b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -339,7 +339,7 @@ case class Literal (value: Any, dataType: DataType) extends 
LeafExpression {
     case other =>
       dataType match {
         case DateType =>
-          DateFormatter(timeZoneId).format(value.asInstanceOf[Int])
+          DateFormatter().format(value.asInstanceOf[Int])
         case TimestampType =>
           
TimestampFormatter.getFractionFormatter(timeZoneId).format(value.asInstanceOf[Long])
         case DayTimeIntervalType => 
toDayTimeIntervalString(value.asInstanceOf[Long], ANSI_STYLE)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index 6187278..2567438 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -92,7 +92,6 @@ private[sql] class JacksonGenerator(
     isParsing = false)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
-    options.zoneId,
     options.locale,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = false)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 18f71b9..27e1411 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -67,7 +67,6 @@ class JacksonParser(
     isParsing = true)
   private lazy val dateFormatter = DateFormatter(
     options.dateFormat,
-    options.zoneId,
     options.locale,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
@@ -264,7 +263,7 @@ class JacksonParser(
               // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
               // compatibility.
               val str = 
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
-              DateTimeUtils.stringToDate(str, options.zoneId).getOrElse {
+              DateTimeUtils.stringToDate(str).getOrElse {
                 // In Spark 1.5.0, we store the data as number of days since 
epoch in string.
                 // So, we just convert it to Int.
                 try {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c3f6b64..6c7c766 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, IntervalUtils}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
stringToDate, stringToTimestamp}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, getZoneId, stringToDate, stringToTimestamp}
 import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, 
TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -2094,10 +2094,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with SQLConfHelper with Logg
     try {
       valueType match {
         case "DATE" =>
-          toLiteral(stringToDate(_, getZoneId(conf.sessionLocalTimeZone)), 
DateType)
+          val zoneId = getZoneId(conf.sessionLocalTimeZone)
+          val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, 
DateType))
+          specialDate.getOrElse(toLiteral(stringToDate, DateType))
         case "TIMESTAMP" =>
           val zoneId = getZoneId(conf.sessionLocalTimeZone)
-          toLiteral(stringToTimestamp(_, zoneId), TimestampType)
+          val specialTs = convertSpecialTimestamp(value, 
zoneId).map(Literal(_, TimestampType))
+          specialTs.getOrElse(toLiteral(stringToTimestamp(_, zoneId), 
TimestampType))
         case "INTERVAL" =>
           val interval = try {
             IntervalUtils.stringToInterval(UTF8String.fromString(value))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index da80e62..76bc196 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.util
 
 import java.text.SimpleDateFormat
-import java.time.{LocalDate, ZoneId}
+import java.time.LocalDate
 import java.util.{Date, Locale}
 
 import org.apache.commons.lang3.time.FastDateFormat
@@ -39,7 +39,6 @@ sealed trait DateFormatter extends Serializable {
 
 class Iso8601DateFormatter(
     pattern: String,
-    zoneId: ZoneId,
     locale: Locale,
     legacyFormat: LegacyDateFormats.LegacyDateFormat,
     isParsing: Boolean)
@@ -49,17 +48,13 @@ class Iso8601DateFormatter(
   private lazy val formatter = getOrCreateFormatter(pattern, locale, isParsing)
 
   @transient
-  private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(
-    pattern, zoneId, locale, legacyFormat)
+  private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(pattern, 
locale, legacyFormat)
 
   override def parse(s: String): Int = {
-    val specialDate = convertSpecialDate(s.trim, zoneId)
-    specialDate.getOrElse {
-      try {
-        val localDate = toLocalDate(formatter.parse(s))
-        localDateToDays(localDate)
-      } catch checkParsedDiff(s, legacyFormatter.parse)
-    }
+    try {
+      val localDate = toLocalDate(formatter.parse(s))
+      localDateToDays(localDate)
+    } catch checkParsedDiff(s, legacyFormatter.parse)
   }
 
   override def format(localDate: LocalDate): String = {
@@ -153,15 +148,14 @@ object DateFormatter {
 
   private def getFormatter(
       format: Option[String],
-      zoneId: ZoneId,
       locale: Locale = defaultLocale,
       legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
       isParsing: Boolean): DateFormatter = {
     val pattern = format.getOrElse(defaultPattern)
     if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
-      getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
+      getLegacyFormatter(pattern, locale, legacyFormat)
     } else {
-      val df = new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat, 
isParsing)
+      val df = new Iso8601DateFormatter(pattern, locale, legacyFormat, 
isParsing)
       df.validatePatternString()
       df
     }
@@ -169,7 +163,6 @@ object DateFormatter {
 
   def getLegacyFormatter(
       pattern: String,
-      zoneId: ZoneId,
       locale: Locale,
       legacyFormat: LegacyDateFormat): DateFormatter = {
     legacyFormat match {
@@ -182,18 +175,17 @@ object DateFormatter {
 
   def apply(
       format: String,
-      zoneId: ZoneId,
       locale: Locale,
       legacyFormat: LegacyDateFormat,
       isParsing: Boolean): DateFormatter = {
-    getFormatter(Some(format), zoneId, locale, legacyFormat, isParsing)
+    getFormatter(Some(format), locale, legacyFormat, isParsing)
   }
 
-  def apply(format: String, zoneId: ZoneId, isParsing: Boolean = false): 
DateFormatter = {
-    getFormatter(Some(format), zoneId, isParsing = isParsing)
+  def apply(format: String, isParsing: Boolean = false): DateFormatter = {
+    getFormatter(Some(format), isParsing = isParsing)
   }
 
-  def apply(zoneId: ZoneId): DateFormatter = {
-    getFormatter(None, zoneId, isParsing = false)
+  def apply(): DateFormatter = {
+    getFormatter(None, isParsing = false)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index eeaf086..9f4abde 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.time._
 import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
@@ -246,8 +245,6 @@ object DateTimeUtils {
     var i = 0
     var currentSegmentValue = 0
     val bytes = s.trimAll().getBytes
-    val specialTimestamp = convertSpecialTimestamp(bytes, timeZoneId)
-    if (specialTimestamp.isDefined) return specialTimestamp
     var j = 0
     var digitsMilli = 0
     var justTime = false
@@ -419,7 +416,7 @@ object DateTimeUtils {
    * `yyyy-[m]m-[d]d *`
    * `yyyy-[m]m-[d]dT*`
    */
-  def stringToDate(s: UTF8String, zoneId: ZoneId): Option[Int] = {
+  def stringToDate(s: UTF8String): Option[Int] = {
     if (s == null) {
       return None
     }
@@ -427,8 +424,6 @@ object DateTimeUtils {
     var i = 0
     var currentSegmentValue = 0
     val bytes = s.trimAll().getBytes
-    val specialDate = convertSpecialDate(bytes, zoneId)
-    if (specialDate.isDefined) return specialDate
     var j = 0
     while (j < bytes.length && (i < 3 && !(bytes(j) == ' ' || bytes(j) == 
'T'))) {
       val b = bytes(j)
@@ -467,8 +462,8 @@ object DateTimeUtils {
     }
   }
 
-  def stringToDateAnsi(s: UTF8String, zoneId: ZoneId): Int = {
-    stringToDate(s, zoneId).getOrElse {
+  def stringToDateAnsi(s: UTF8String): Int = {
+    stringToDate(s).getOrElse {
       throw QueryExecutionErrors.cannotCastUTF8StringToDataTypeError(s, 
DateType)
     }
   }
@@ -908,13 +903,13 @@ object DateTimeUtils {
   /**
    * Converts notational shorthands that are converted to ordinary timestamps.
    *
-   * @param input A trimmed string
+   * @param input A string to parse. It can contain trailing or leading 
whitespaces.
    * @param zoneId Zone identifier used to get the current date.
    * @return Some of microseconds since the epoch if the conversion completed
    *         successfully otherwise None.
    */
   def convertSpecialTimestamp(input: String, zoneId: ZoneId): Option[Long] = {
-    extractSpecialValue(input, zoneId).flatMap {
+    extractSpecialValue(input.trim, zoneId).flatMap {
       case "epoch" => Some(0)
       case "now" => Some(currentTimestamp())
       case "today" => Some(instantToMicros(today(zoneId).toInstant))
@@ -924,23 +919,15 @@ object DateTimeUtils {
     }
   }
 
-  private def convertSpecialTimestamp(bytes: Array[Byte], zoneId: ZoneId): 
Option[Long] = {
-    if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) {
-      convertSpecialTimestamp(new String(bytes, StandardCharsets.UTF_8), 
zoneId)
-    } else {
-      None
-    }
-  }
-
   /**
    * Converts notational shorthands that are converted to ordinary dates.
    *
-   * @param input A trimmed string
+   * @param input A string to parse. It can contain trailing or leading 
whitespaces.
    * @param zoneId Zone identifier used to get the current date.
    * @return Some of days since the epoch if the conversion completed 
successfully otherwise None.
    */
   def convertSpecialDate(input: String, zoneId: ZoneId): Option[Int] = {
-    extractSpecialValue(input, zoneId).flatMap {
+    extractSpecialValue(input.trim, zoneId).flatMap {
       case "epoch" => Some(0)
       case "now" | "today" => Some(currentDate(zoneId))
       case "tomorrow" => Some(Math.addExact(currentDate(zoneId), 1))
@@ -949,14 +936,6 @@ object DateTimeUtils {
     }
   }
 
-  private def convertSpecialDate(bytes: Array[Byte], zoneId: ZoneId): 
Option[Int] = {
-    if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) {
-      convertSpecialDate(new String(bytes, StandardCharsets.UTF_8), zoneId)
-    } else {
-      None
-    }
-  }
-
   /**
    * Subtracts two dates expressed as days since 1970-01-01.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 6832d1a..130bfa4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -72,19 +72,16 @@ class Iso8601TimestampFormatter(
     pattern, zoneId, locale, legacyFormat)
 
   override def parse(s: String): Long = {
-    val specialDate = convertSpecialTimestamp(s.trim, zoneId)
-    specialDate.getOrElse {
-      try {
-        val parsed = formatter.parse(s)
-        val parsedZoneId = parsed.query(TemporalQueries.zone())
-        val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
-        val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
-        val epochSeconds = zonedDateTime.toEpochSecond
-        val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
-
-        Math.addExact(Math.multiplyExact(epochSeconds, MICROS_PER_SECOND), 
microsOfSecond)
-      } catch checkParsedDiff(s, legacyFormatter.parse)
-    }
+    try {
+      val parsed = formatter.parse(s)
+      val parsedZoneId = parsed.query(TemporalQueries.zone())
+      val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
+      val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
+      val epochSeconds = zonedDateTime.toEpochSecond
+      val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
+
+      Math.addExact(Math.multiplyExact(epochSeconds, MICROS_PER_SECOND), 
microsOfSecond)
+    } catch checkParsedDiff(s, legacyFormatter.parse)
   }
 
   override def format(instant: Instant): String = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index bd981d1..97c2797 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -176,7 +176,7 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   test("hive-hash for date type") {
     def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
       checkHiveHash(
-        DateTimeUtils.stringToDate(UTF8String.fromString(dateString), 
ZoneOffset.UTC).get,
+        DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
         DateType,
         expected)
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala
index 0a29d94..4c22e67 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 class DateFormatterSuite extends DatetimeFormatterSuite {
 
   override def checkFormatterCreation(pattern: String, isParsing: Boolean): 
Unit = {
-    DateFormatter(pattern, UTC, isParsing)
+    DateFormatter(pattern, isParsing)
   }
 
   override protected def useDateFormatter: Boolean = true
@@ -36,7 +36,7 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
   test("parsing dates") {
     outstandingTimezonesIds.foreach { timeZone =>
       withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
-        val formatter = DateFormatter(getZoneId(timeZone))
+        val formatter = DateFormatter()
         val daysSinceEpoch = formatter.parse("2018-12-02")
         assert(daysSinceEpoch === 17867)
       }
@@ -46,7 +46,7 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
   test("format dates") {
     outstandingTimezonesIds.foreach { timeZone =>
       withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
-        val formatter = DateFormatter(getZoneId(timeZone))
+        val formatter = DateFormatter()
         val (days, expected) = (17867, "2018-12-02")
         val date = formatter.format(days)
         assert(date === expected)
@@ -75,7 +75,6 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
               withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
                 val formatter = DateFormatter(
                   DateFormatter.defaultPattern,
-                  getZoneId(timeZone),
                   DateFormatter.defaultLocale,
                   legacyFormat,
                   isParsing = false)
@@ -110,7 +109,6 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
               withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
                 val formatter = DateFormatter(
                   DateFormatter.defaultPattern,
-                  getZoneId(timeZone),
                   DateFormatter.defaultLocale,
                   legacyFormat,
                   isParsing = false)
@@ -126,31 +124,18 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
   }
 
   test("parsing date without explicit day") {
-    val formatter = DateFormatter("yyyy MMM", UTC)
+    val formatter = DateFormatter("yyyy MMM")
     val daysSinceEpoch = formatter.parse("2018 Dec")
     assert(daysSinceEpoch === days(2018, 12, 1))
   }
 
   test("formatting negative years with default pattern") {
     val epochDays = days(-99, 1, 1)
-    assert(DateFormatter(UTC).format(epochDays) === "-0099-01-01")
-  }
-
-  test("special date values") {
-    testSpecialDatetimeValues { zoneId =>
-      val formatter = DateFormatter(zoneId)
-
-      assert(formatter.parse("EPOCH") === 0)
-      val today = localDateToDays(LocalDate.now(zoneId))
-      assert(formatter.parse("Yesterday") === today - 1)
-      assert(formatter.parse("now") === today)
-      assert(formatter.parse("today ") === today)
-      assert(formatter.parse("tomorrow UTC") === today + 1)
-    }
+    assert(DateFormatter().format(epochDays) === "-0099-01-01")
   }
 
   test("SPARK-30958: parse date with negative year") {
-    val formatter1 = DateFormatter("yyyy-MM-dd", UTC)
+    val formatter1 = DateFormatter("yyyy-MM-dd")
     assert(formatter1.parse("-1234-02-22") === days(-1234, 2, 22))
 
     def assertParsingError(f: => Unit): Unit = {
@@ -163,7 +148,7 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
     }
 
     // "yyyy" with "G" can't parse negative year or year 0000.
-    val formatter2 = DateFormatter("G yyyy-MM-dd", UTC)
+    val formatter2 = DateFormatter("G yyyy-MM-dd")
     assertParsingError(formatter2.parse("BC -1234-02-22"))
     assertParsingError(formatter2.parse("AD 0000-02-22"))
 
@@ -178,7 +163,6 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
           withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
             val formatter = DateFormatter(
               DateFormatter.defaultPattern,
-              getZoneId(timeZone),
               DateFormatter.defaultLocale,
               legacyFormat,
               isParsing = false)
@@ -193,13 +177,13 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
   }
 
   test("missing date fields") {
-    val formatter = DateFormatter("HH", UTC)
+    val formatter = DateFormatter("HH")
     val daysSinceEpoch = formatter.parse("20")
     assert(daysSinceEpoch === days(1970, 1, 1))
   }
 
   test("missing year field with invalid date") {
-    val formatter = DateFormatter("MM-dd", UTC)
+    val formatter = DateFormatter("MM-dd")
     // The date parser in 2.4 accepts 1970-02-29 and turn it into 1970-03-01, 
so we should get a
     // SparkUpgradeException here.
     intercept[SparkUpgradeException](formatter.parse("02-29"))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4de4397..5afd13a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -123,8 +123,8 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
     checkFromToJavaDate(new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime))
   }
 
-  private def toDate(s: String, zoneId: ZoneId = UTC): Option[Int] = {
-    stringToDate(UTF8String.fromString(s), zoneId)
+  private def toDate(s: String): Option[Int] = {
+    stringToDate(UTF8String.fromString(s))
   }
 
   test("string to date") {
@@ -673,35 +673,35 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
     assert(DateTimeUtils.microsToMillis(-157700927876544L) === -157700927877L)
   }
 
-  test("special timestamp values") {
+  test("SPARK-29012: special timestamp values") {
     testSpecialDatetimeValues { zoneId =>
       val tolerance = TimeUnit.SECONDS.toMicros(30)
 
-      assert(toTimestamp("Epoch", zoneId).get === 0)
+      assert(convertSpecialTimestamp("Epoch", zoneId).get === 0)
       val now = instantToMicros(Instant.now())
-      toTimestamp("NOW", zoneId).get should be(now +- tolerance)
-      assert(toTimestamp("now UTC", zoneId) === None)
+      convertSpecialTimestamp("NOW", zoneId).get should be(now +- tolerance)
+      assert(convertSpecialTimestamp("now UTC", zoneId) === None)
       val localToday = LocalDateTime.now(zoneId)
         .`with`(LocalTime.MIDNIGHT)
         .atZone(zoneId)
       val yesterday = instantToMicros(localToday.minusDays(1).toInstant)
-      toTimestamp(" Yesterday", zoneId).get should be(yesterday +- tolerance)
+      convertSpecialTimestamp(" Yesterday", zoneId).get should be(yesterday +- 
tolerance)
       val today = instantToMicros(localToday.toInstant)
-      toTimestamp("Today ", zoneId).get should be(today +- tolerance)
+      convertSpecialTimestamp("Today ", zoneId).get should be(today +- 
tolerance)
       val tomorrow = instantToMicros(localToday.plusDays(1).toInstant)
-      toTimestamp(" tomorrow CET ", zoneId).get should be(tomorrow +- 
tolerance)
+      convertSpecialTimestamp(" tomorrow CET ", zoneId).get should be(tomorrow 
+- tolerance)
     }
   }
 
-  test("special date values") {
+  test("SPARK-28141: special date values") {
     testSpecialDatetimeValues { zoneId =>
-      assert(toDate("epoch", zoneId).get === 0)
+      assert(convertSpecialDate("epoch", zoneId).get === 0)
       val today = localDateToDays(LocalDate.now(zoneId))
-      assert(toDate("YESTERDAY", zoneId).get === today - 1)
-      assert(toDate(" Now ", zoneId).get === today)
-      assert(toDate("now UTC", zoneId) === None) // "now" does not accept time 
zones
-      assert(toDate("today", zoneId).get === today)
-      assert(toDate("tomorrow CET ", zoneId).get === today + 1)
+      assert(convertSpecialDate("YESTERDAY", zoneId).get === today - 1)
+      assert(convertSpecialDate(" Now ", zoneId).get === today)
+      assert(convertSpecialDate("now UTC", zoneId) === None) // "now" does not 
accept time zones
+      assert(convertSpecialDate("today", zoneId).get === today)
+      assert(convertSpecialDate("tomorrow CET ", zoneId).get === today + 1)
     }
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala
index 54a0aec..0640dc7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala
@@ -32,7 +32,7 @@ trait DatetimeFormatterSuite extends SparkFunSuite with 
SQLHelper with Matchers
 
   private def dateFormatter(
       pattern: String, ldf: LegacyDateFormat = FAST_DATE_FORMAT): 
DateFormatter = {
-    DateFormatter(pattern, UTC, DateFormatter.defaultLocale, ldf, isParsing = 
true)
+    DateFormatter(pattern, DateFormatter.defaultLocale, ldf, isParsing = true)
   }
 
   private def timestampFormatter(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
index c65fec2..6e93866 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.time.{DateTimeException, Instant, LocalDateTime, LocalTime}
-import java.util.concurrent.TimeUnit
+import java.time.{DateTimeException, LocalDateTime}
 
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-import org.scalatest.matchers.should.Matchers._
 
 import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
@@ -167,26 +165,6 @@ class TimestampFormatterSuite extends 
DatetimeFormatterSuite {
     }
   }
 
-  test("special timestamp values") {
-    testSpecialDatetimeValues { zoneId =>
-      val formatter = TimestampFormatter(zoneId)
-      val tolerance = TimeUnit.SECONDS.toMicros(30)
-
-      assert(formatter.parse("EPOCH") === 0)
-      val now = instantToMicros(Instant.now())
-      formatter.parse("now") should be(now +- tolerance)
-      val localToday = LocalDateTime.now(zoneId)
-        .`with`(LocalTime.MIDNIGHT)
-        .atZone(zoneId)
-      val yesterday = instantToMicros(localToday.minusDays(1).toInstant)
-      formatter.parse("yesterday CET") should be(yesterday +- tolerance)
-      val today = instantToMicros(localToday.toInstant)
-      formatter.parse(" TODAY ") should be(today +- tolerance)
-      val tomorrow = instantToMicros(localToday.plusDays(1).toInstant)
-      formatter.parse("Tomorrow ") should be(tomorrow +- tolerance)
-    }
-  }
-
   test("parsing timestamp strings with various seconds fractions") {
     outstandingZoneIds.foreach { zoneId =>
       def check(pattern: String, input: String, reference: String): Unit = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
index 5729b02..34e1330 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.time.{ZoneId, ZoneOffset}
+import java.time.ZoneId
 
 import scala.reflect.runtime.universe.TypeTag
 
@@ -40,8 +40,8 @@ class UnsafeArraySuite extends SparkFunSuite {
   val doubleArray = Array(1.1, 2.2, 3.3)
   val stringArray = Array("1", "10", "100")
   val dateArray = Array(
-    DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), 
ZoneOffset.UTC).get,
-    DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26"), 
ZoneOffset.UTC).get)
+    DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get,
+    DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get)
   private def defaultZoneId = ZoneId.systemDefault()
   val timestampArray = Array(
     DateTimeUtils.stringToTimestamp(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index da5c6f0..498957b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -206,14 +206,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
       case DoubleType => wrapperConvertException(data => data.toDouble, 
converter)
       case _: DecimalType => wrapperConvertException(data => BigDecimal(data), 
converter)
       case DateType if conf.datetimeJava8ApiEnabled =>
-        wrapperConvertException(data => DateTimeUtils.stringToDate(
-          UTF8String.fromString(data),
-          DateTimeUtils.getZoneId(conf.sessionLocalTimeZone))
+        wrapperConvertException(data => 
DateTimeUtils.stringToDate(UTF8String.fromString(data))
           .map(DateTimeUtils.daysToLocalDate).orNull, converter)
-      case DateType => wrapperConvertException(data => 
DateTimeUtils.stringToDate(
-        UTF8String.fromString(data),
-        DateTimeUtils.getZoneId(conf.sessionLocalTimeZone))
-        .map(DateTimeUtils.toJavaDate).orNull, converter)
+      case DateType =>
+        wrapperConvertException(data => 
DateTimeUtils.stringToDate(UTF8String.fromString(data))
+          .map(DateTimeUtils.toJavaDate).orNull, converter)
       case TimestampType if conf.datetimeJava8ApiEnabled =>
         wrapperConvertException(data => DateTimeUtils.stringToTimestamp(
           UTF8String.fromString(data),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index 52394c1..5e70649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.time.{Duration, Instant, LocalDate, Period, ZoneOffset}
+import java.time.{Duration, Instant, LocalDate, Period}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
@@ -38,15 +38,7 @@ object HiveResult {
   case class TimeFormatters(date: DateFormatter, timestamp: TimestampFormatter)
 
   def getTimeFormatters: TimeFormatters = {
-    // The date formatter does not depend on Spark's session time zone 
controlled by
-    // the SQL config `spark.sql.session.timeZone`. The `zoneId` parameter is 
used only in
-    // parsing of special date values like `now`, `yesterday` and etc. but not 
in date formatting.
-    // While formatting of:
-    // - `java.time.LocalDate`, zone id is not used by `DateTimeFormatter` at 
all.
-    // - `java.sql.Date`, the date formatter delegates formatting to the 
legacy formatter
-    //   which uses the default system time zone `TimeZone.getDefault`. This 
works correctly
-    //   due to `DateTimeUtils.toJavaDate` which is based on the system time 
zone too.
-    val dateFormatter = DateFormatter(ZoneOffset.UTC)
+    val dateFormatter = DateFormatter()
     val timestampFormatter = TimestampFormatter.getFractionFormatter(
       DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
     TimeFormatters(dateFormatter, timestampFormatter)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 1cd5e14..91029ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -135,7 +135,7 @@ object PartitioningUtils {
       Map.empty[String, String]
     }
 
-    val dateFormatter = DateFormatter(zoneId)
+    val dateFormatter = DateFormatter()
     val timestampFormatter = TimestampFormatter(
       timestampPartitionPattern,
       zoneId,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index e6d8819..b5a9c35 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -202,7 +202,7 @@ private[sql] object JDBCRelation extends Logging {
     }
     columnType match {
       case _: NumericType => value.toLong
-      case DateType => parse(stringToDate(_, getZoneId(timeZoneId))).toLong
+      case DateType => parse(stringToDate).toLong
       case TimestampType => parse(stringToTimestamp(_, getZoneId(timeZoneId)))
     }
   }
@@ -214,8 +214,7 @@ private[sql] object JDBCRelation extends Logging {
     def dateTimeToString(): String = {
       val dateTimeStr = columnType match {
         case DateType =>
-          val dateFormatter = 
DateFormatter(DateTimeUtils.getZoneId(timeZoneId))
-          dateFormatter.format(value.toInt)
+          DateFormatter().format(value.toInt)
         case TimestampType =>
           val timestampFormatter = TimestampFormatter.getFractionFormatter(
             DateTimeUtils.getZoneId(timeZoneId))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 6c72172..d2d1286 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -183,9 +183,7 @@ abstract class JdbcDialect extends Serializable with 
Logging{
         DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
       s"'${timestampFormatter.format(timestampValue)}'"
     case dateValue: Date => "'" + dateValue + "'"
-    case dateValue: LocalDate =>
-      val dateFormatter = 
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
-      s"'${dateFormatter.format(dateValue)}'"
+    case dateValue: LocalDate => s"'${DateFormatter().format(dateValue)}'"
     case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
     case _ => value
   }
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/timestamp.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/timestamp.sql
index ade29cc..0630262 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/timestamp.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/timestamp.sql
@@ -18,21 +18,21 @@ CREATE TABLE TIMESTAMP_TBL (d1 timestamp) USING parquet;
 
 -- PostgreSQL implicitly casts string literals to data with timestamp types, 
but
 -- Spark does not support that kind of implicit casts.
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('now'));
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'now');
 -- SELECT pg_sleep(0.1);
 
 -- BEGIN;
 
 -- PostgreSQL implicitly casts string literals to data with timestamp types, 
but
 -- Spark does not support that kind of implicit casts.
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('now'));
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('today'));
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('yesterday'));
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow'));
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'now');
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'today');
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'yesterday');
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow');
 -- time zone should be ignored by this data type
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow EST'));
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow EST');
 -- [SPARK-29024] Ignore case while resolving time zones
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow Zulu'));
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow Zulu');
 
 SELECT count(*) AS One FROM TIMESTAMP_TBL WHERE d1 = timestamp 'today';
 SELECT count(*) AS Three FROM TIMESTAMP_TBL WHERE d1 = timestamp 'tomorrow';
@@ -60,7 +60,7 @@ TRUNCATE TABLE TIMESTAMP_TBL;
 -- INSERT INTO TIMESTAMP_TBL VALUES ('infinity');
 -- PostgreSQL implicitly casts string literals to data with timestamp types, 
but
 -- Spark does not support that kind of implicit casts.
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('epoch'));
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'epoch');
 -- [SPARK-27923] Spark SQL insert there obsolete special values to NULL
 -- Obsolete special values
 -- INSERT INTO TIMESTAMP_TBL VALUES ('invalid');
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out
index 68d2b5c..9847386 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out
@@ -11,7 +11,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('now'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'now')
 -- !query schema
 struct<>
 -- !query output
@@ -19,7 +19,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('now'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'now')
 -- !query schema
 struct<>
 -- !query output
@@ -27,7 +27,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('today'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'today')
 -- !query schema
 struct<>
 -- !query output
@@ -35,7 +35,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('yesterday'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'yesterday')
 -- !query schema
 struct<>
 -- !query output
@@ -43,7 +43,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow')
 -- !query schema
 struct<>
 -- !query output
@@ -51,7 +51,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow EST'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow EST')
 -- !query schema
 struct<>
 -- !query output
@@ -59,7 +59,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('tomorrow Zulu'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'tomorrow Zulu')
 -- !query schema
 struct<>
 -- !query output
@@ -99,7 +99,7 @@ struct<>
 
 
 -- !query
-INSERT INTO TIMESTAMP_TBL VALUES (timestamp('epoch'))
+INSERT INTO TIMESTAMP_TBL VALUES (timestamp'epoch')
 -- !query schema
 struct<>
 -- !query output
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 16b92d6..6ae57cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
 
@@ -195,24 +194,6 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
     }
   }
 
-  test("special timestamp values") {
-    Seq("now", "today", "epoch", "tomorrow", "yesterday").foreach { 
specialValue =>
-      val input = Seq(specialValue).toDS()
-      val readback = input.select(from_csv($"value", lit("t timestamp"),
-        Map.empty[String, String].asJava)).collect()
-      assert(readback(0).getAs[Row](0).getAs[Timestamp](0).getTime >= 0)
-    }
-  }
-
-  test("special date values") {
-    Seq("now", "today", "epoch", "tomorrow", "yesterday").foreach { 
specialValue =>
-      val input = Seq(specialValue).toDS()
-      val readback = input.select(from_csv($"value", lit("d date"),
-        Map.empty[String, String].asJava)).collect()
-      assert(readback(0).getAs[Row](0).getAs[Date](0).getTime >= 0)
-    }
-  }
-
   test("support foldable schema by from_csv") {
     val options = Map[String, String]().asJava
     val schema = concat_ws(",", lit("i int"), lit("s string"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 310e170..5485cc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
 
@@ -620,24 +619,6 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
     }
   }
 
-  test("special timestamp values") {
-    Seq("now", "today", "epoch", "tomorrow", "yesterday").foreach { 
specialValue =>
-      val input = Seq(s"""{"t": "$specialValue"}""").toDS()
-      val readback = input.select(from_json($"value", lit("t timestamp"),
-        Map.empty[String, String].asJava)).collect()
-      assert(readback(0).getAs[Row](0).getAs[Timestamp](0).getTime >= 0)
-    }
-  }
-
-  test("special date values") {
-    Seq("now", "today", "epoch", "tomorrow", "yesterday").foreach { 
specialValue =>
-      val input = Seq(s"""{"d": "$specialValue"}""").toDS()
-      val readback = input.select(from_json($"value", lit("d date"),
-        Map.empty[String, String].asJava)).collect()
-      assert(readback(0).getAs[Row](0).getAs[Date](0).getTime >= 0)
-    }
-  }
-
   test("from_json - timestamp in micros") {
     val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
     val schema = new StructType().add("time", TimestampType)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 5ea8c61..c152761 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -56,7 +56,7 @@ abstract class ParquetPartitionDiscoverySuite
   val defaultPartitionName = ExternalCatalogUtils.DEFAULT_PARTITION_NAME
 
   val timeZoneId = ZoneId.systemDefault()
-  val df = DateFormatter(timeZoneId)
+  val df = DateFormatter()
   val tf = TimestampFormatter(
     timestampPartitionPattern, timeZoneId, isParsing = true)
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9bcc19b..5f4f2f3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{PartitioningUtils, 
SourceOptions}
 import org.apache.spark.sql.hive.client.HiveClient
@@ -1260,13 +1260,9 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
       defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
     val rawTable = getRawTable(db, table)
     val catalogTable = restoreTableMetadata(rawTable)
-    val timeZoneId = 
CaseInsensitiveMap(catalogTable.storage.properties).getOrElse(
-      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
-
     val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
-
     val clientPrunedPartitions =
-      client.getPartitionsByFilter(rawTable, predicates, timeZoneId).map { 
part =>
+      client.getPartitionsByFilter(rawTable, predicates).map { part =>
         part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
       }
     prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, 
defaultTimeZoneId)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 48f3837..3ea80ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -233,8 +233,7 @@ private[hive] trait HiveClient {
   /** Returns partitions filtered by predicates for the given table. */
   def getPartitionsByFilter(
       catalogTable: CatalogTable,
-      predicates: Seq[Expression],
-      timeZoneId: String): Seq[CatalogTablePartition]
+      predicates: Seq[Expression]): Seq[CatalogTablePartition]
 
   /** Loads a static partition into an existing table. */
   def loadPartition(
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index ebef0b9..bdf4905 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -765,10 +765,9 @@ private[hive] class HiveClientImpl(
 
   override def getPartitionsByFilter(
       table: CatalogTable,
-      predicates: Seq[Expression],
-      timeZoneId: String): Seq[CatalogTablePartition] = withHiveState {
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState 
{
     val hiveTable = toHiveTable(table, Some(userName))
-    val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, 
timeZoneId)
+    val parts = shim.getPartitionsByFilter(client, hiveTable, predicates)
       .map(fromHivePartition)
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 2f7fe96..0a5b514 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TypeUtils}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TypeUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, 
StringType}
 import org.apache.spark.unsafe.types.UTF8String
@@ -82,8 +82,7 @@ private[client] sealed abstract class Shim {
   def getPartitionsByFilter(
       hive: Hive,
       table: Table,
-      predicates: Seq[Expression],
-      timeZoneId: String): Seq[Partition]
+      predicates: Seq[Expression]): Seq[Partition]
 
   def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
 
@@ -353,8 +352,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
   override def getPartitionsByFilter(
       hive: Hive,
       table: Table,
-      predicates: Seq[Expression],
-      timeZoneId: String): Seq[Partition] = {
+      predicates: Seq[Expression]): Seq[Partition] = {
     // getPartitionsByFilter() doesn't support binary comparison ops in Hive 
0.12.
     // See HIVE-4888.
     logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
@@ -637,8 +635,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
    *
    * Unsupported predicates are skipped.
    */
-  def convertFilters(table: Table, filters: Seq[Expression], timeZoneId: 
String): String = {
-    lazy val dateFormatter = DateFormatter(DateTimeUtils.getZoneId(timeZoneId))
+  def convertFilters(table: Table, filters: Seq[Expression]): String = {
+    lazy val dateFormatter = DateFormatter()
 
     /**
      * An extractor that matches all binary comparison operators except 
null-safe equality.
@@ -869,12 +867,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   override def getPartitionsByFilter(
       hive: Hive,
       table: Table,
-      predicates: Seq[Expression],
-      timeZoneId: String): Seq[Partition] = {
+      predicates: Seq[Expression]): Seq[Partition] = {
 
     // Hive getPartitionsByFilter() takes a string that represents partition
     // predicates like "str_key=\"value\" and int_key=1 ..."
-    val filter = convertFilters(table, predicates, timeZoneId)
+    val filter = convertFilters(table, predicates)
 
     val partitions =
       if (filter.isEmpty) {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index fcdc973..29b51e1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -162,7 +162,7 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
   private def filterTest(name: String, filters: Seq[Expression], result: 
String) = {
     test(name) {
       withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true") 
{
-        val converted = shim.convertFilters(testTable, filters, 
conf.sessionLocalTimeZone)
+        val converted = shim.convertFilters(testTable, filters)
         if (converted != result) {
           fail(s"Expected ${filters.mkString(",")} to convert to '$result' but 
got '$converted'")
         }
@@ -177,7 +177,7 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
         val filters =
           (Literal(1) === a("intcol", IntegerType) ||
             Literal(2) === a("intcol", IntegerType)) :: Nil
-        val converted = shim.convertFilters(testTable, filters, 
conf.sessionLocalTimeZone)
+        val converted = shim.convertFilters(testTable, filters)
         if (enabled) {
           assert(converted == "(1 = intcol or 2 = intcol)")
         } else {
@@ -189,7 +189,7 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
 
   test("SPARK-33416: Avoid Hive metastore stack overflow when InSet predicate 
have many values") {
     def checkConverted(inSet: InSet, result: String): Unit = {
-      assert(shim.convertFilters(testTable, inSet :: Nil, 
conf.sessionLocalTimeZone) == result)
+      assert(shim.convertFilters(testTable, inSet :: Nil) == result)
     }
 
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "15") {
@@ -223,7 +223,7 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
   test("SPARK-34515: Fix NPE if InSet contains null value during 
getPartitionsByFilter") {
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "2") {
       val filter = InSet(a("p", IntegerType), Set(null, 1, 2))
-      val converted = shim.convertFilters(testTable, Seq(filter), 
conf.sessionLocalTimeZone)
+      val converted = shim.convertFilters(testTable, Seq(filter))
       assert(converted == "(p >= 1 and p <= 2)")
     }
   }
@@ -231,7 +231,7 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
   test("Don't push not inset if it's values exceeds the threshold") {
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "2") {
       val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
-      val converted = shim.convertFilters(testTable, Seq(filter), 
conf.sessionLocalTimeZone)
+      val converted = shim.convertFilters(testTable, Seq(filter))
       assert(converted.isEmpty)
     }
   }
@@ -239,14 +239,14 @@ class FiltersSuite extends SparkFunSuite with Logging 
with PlanTest {
   test("SPARK-34538: Skip InSet null value during push filter to Hive 
metastore") {
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "3") {
       val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
-      val intConverted = shim.convertFilters(testTable, Seq(intFilter), 
conf.sessionLocalTimeZone)
+      val intConverted = shim.convertFilters(testTable, Seq(intFilter))
       assert(intConverted == "(p = 1 or p = 2)")
     }
 
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "3") {
       val dateFilter = InSet(a("p", DateType), Set(null,
         Literal(Date.valueOf("2020-01-01")).eval(), 
Literal(Date.valueOf("2021-01-01")).eval()))
-      val dateConverted = shim.convertFilters(testTable, Seq(dateFilter), 
conf.sessionLocalTimeZone)
+      val dateConverted = shim.convertFilters(testTable, Seq(dateFilter))
       assert(dateConverted == "(p = 2020-01-01 or p = 2021-01-01)")
     }
   }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index 16e1a41..07ecadc 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.catalog._
 import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
LongType, StringType, StructType}
 import org.apache.spark.util.Utils
 
@@ -114,7 +113,7 @@ class HivePartitionFilteringSuite(version: String)
   test(s"getPartitionsByFilter returns all partitions when 
$tryDirectSqlKey=false") {
     val client = init(false)
     val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
-      Seq(attr("ds") === 20170101), SQLConf.get.sessionLocalTimeZone)
+      Seq(attr("ds") === 20170101))
 
     assert(filteredPartitions.size == testPartitionCount)
   }
@@ -604,7 +603,7 @@ class HivePartitionFilteringSuite(version: String)
     val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
       Seq(
         transform(filterExpr)
-      ), SQLConf.get.sessionLocalTimeZone)
+      ))
 
     val expectedPartitionCount = expectedPartitionCubes.map {
       case (expectedDs, expectedH, expectedChunks, expectedD, expectedDatestr) 
=>
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index b5500ea..ce4415d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -488,8 +488,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: getPartitionsByFilter") {
       // Only one partition [1, 1] for key2 == 1
       val result = client.getPartitionsByFilter(client.getTable("default", 
"src_part"),
-        Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))),
-        versionSpark.conf.sessionLocalTimeZone)
+        Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
 
       // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the 
filter condition.
       if (version != "0.12") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to