This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d20d13  [SPARK-25496][SQL] Deprecate from_utc_timestamp and 
to_utc_timestamp
1d20d13 is described below

commit 1d20d13149140f53df307f47420740f45b4fa5f6
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Wed Apr 3 10:55:56 2019 +0800

    [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to deprecate the `from_utc_timestamp()` and 
`to_utc_timestamp`, and disable them by default. The functions can be enabled 
back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By 
default, any calls of the functions throw an analysis exception.
    
    One of the reason for deprecation is functions violate semantic of 
`TimestampType` which is number of microseconds since epoch in UTC time zone. 
Shifting microseconds since epoch by time zone offset doesn't make sense 
because the result doesn't represent microseconds since epoch in UTC time zone 
any more, and cannot be considered as `TimestampType`.
    
    ## How was this patch tested?
    
    The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.
    
    Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate.
    
    Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
    Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Co-authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 R/pkg/R/functions.R                                |   2 +
 R/pkg/tests/fulltests/test_sparkSQL.R              |  18 +++-
 python/pyspark/sql/functions.py                    |  10 ++
 .../catalyst/expressions/datetimeExpressions.scala |  12 +++
 .../org/apache/spark/sql/internal/SQLConf.scala    |   8 ++
 .../catalyst/expressions/CodeGenerationSuite.scala |  61 ++++++-----
 .../expressions/DateExpressionsSuite.scala         |  63 ++++++++----
 .../scala/org/apache/spark/sql/functions.scala     |   4 +
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 112 ++++++++++++---------
 .../execution/benchmark/DateTimeBenchmark.scala    |   9 +-
 .../sql/streaming/StreamingAggregationSuite.scala  |  99 +++++++++---------
 11 files changed, 246 insertions(+), 152 deletions(-)

diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index d91896a..0566a47 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = 
"characterOrColumn"),
 #' @note from_utc_timestamp since 1.5.0
 setMethod("from_utc_timestamp", signature(y = "Column", x = "character"),
           function(y, x) {
+            .Deprecated(msg = "from_utc_timestamp is deprecated. See 
SPARK-25496.")
             jc <- callJStatic("org.apache.spark.sql.functions", 
"from_utc_timestamp", y@jc, x)
             column(jc)
           })
@@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = 
"character"),
 #' @note to_utc_timestamp since 1.5.0
 setMethod("to_utc_timestamp", signature(y = "Column", x = "character"),
           function(y, x) {
+            .Deprecated(msg = "to_utc_timestamp is deprecated. See 
SPARK-25496.")
             jc <- callJStatic("org.apache.spark.sql.functions", 
"to_utc_timestamp", y@jc, x)
             column(jc)
           })
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 2394f74..fdc7474 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1905,10 +1905,20 @@ test_that("date functions on a DataFrame", {
   df2 <- createDataFrame(l2)
   expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
   expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
-  expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
-               c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 
10:24:34 UTC")))
-  expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
-               c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 
16:24:34 UTC")))
+  conf <- callJMethod(sparkSession, "conf")
+  isUtcTimestampFuncEnabled <- callJMethod(conf, "get", 
"spark.sql.legacy.utcTimestampFunc.enabled")
+  callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true")
+  tryCatch({
+    # Both from_utc_timestamp and to_utc_timestamp are deprecated as of 
SPARK-25496
+    expect_equal(suppressWarnings(collect(select(df2, 
from_utc_timestamp(df2$b, "JST"))))[, 1],
+                 c(as.POSIXct("2012-12-13 21:34:00 UTC"), 
as.POSIXct("2014-12-15 10:24:34 UTC")))
+    expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, 
"JST"))))[, 1],
+                 c(as.POSIXct("2012-12-13 03:34:00 UTC"), 
as.POSIXct("2014-12-14 16:24:34 UTC")))
+  },
+  finally = {
+    # Reverting the conf back
+    callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", 
isUtcTimestampFuncEnabled)
+  })
   expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
   expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
   expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), 
"yyyy-MM-dd")))[1, 1], 0)
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 6ae2357..22163f5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1306,7 +1306,10 @@ def from_utc_timestamp(timestamp, tz):
     [Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))]
     >>> df.select(from_utc_timestamp(df.ts, 
df.tz).alias('local_time')).collect()
     [Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))]
+
+    .. note:: Deprecated in 3.0. See SPARK-25496
     """
+    warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
     sc = SparkContext._active_spark_context
     if isinstance(tz, Column):
         tz = _to_java_column(tz)
@@ -1340,7 +1343,10 @@ def to_utc_timestamp(timestamp, tz):
     [Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))]
     >>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect()
     [Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))]
+
+    .. note:: Deprecated in 3.0. See SPARK-25496
     """
+    warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
     sc = SparkContext._active_spark_context
     if isinstance(tz, Column):
         tz = _to_java_column(tz)
@@ -3191,9 +3197,13 @@ def _test():
     globs['sc'] = sc
     globs['spark'] = spark
     globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), 
Row(name='Bob', age=5)])
+
+    spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true")
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.functions, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+    spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled")
+
     spark.stop()
     if failure_count:
         sys.exit(-1)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 3cda989..784425e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -26,11 +26,13 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringEscapeUtils
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
@@ -1021,6 +1023,11 @@ case class TimeAdd(start: Expression, interval: 
Expression, timeZoneId: Option[S
 case class FromUTCTimestamp(left: Expression, right: Expression)
   extends BinaryExpression with ImplicitCastInputTypes {
 
+  if (!SQLConf.get.utcTimestampFuncEnabled) {
+    throw new AnalysisException(s"The $prettyName function has been disabled 
since Spark 3.0." +
+      s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this 
function.")
+  }
+
   override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, 
StringType)
   override def dataType: DataType = TimestampType
   override def prettyName: String = "from_utc_timestamp"
@@ -1227,6 +1234,11 @@ case class MonthsBetween(
 case class ToUTCTimestamp(left: Expression, right: Expression)
   extends BinaryExpression with ImplicitCastInputTypes {
 
+  if (!SQLConf.get.utcTimestampFuncEnabled) {
+    throw new AnalysisException(s"The $prettyName function has been disabled 
since Spark 3.0. " +
+      s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this 
function.")
+  }
+
   override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, 
StringType)
   override def dataType: DataType = TimestampType
   override def prettyName: String = "to_utc_timestamp"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 411805e..71a49c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1723,6 +1723,12 @@ object SQLConf {
       "and java.sql.Date are used for the same purpose.")
     .booleanConf
     .createWithDefault(false)
+
+  val UTC_TIMESTAMP_FUNC_ENABLED = 
buildConf("spark.sql.legacy.utcTimestampFunc.enabled")
+    .doc("The configuration property enables the to_utc_timestamp() " +
+         "and from_utc_timestamp() functions.")
+    .booleanConf
+    .createWithDefault(false)
 }
 
 /**
@@ -1916,6 +1922,8 @@ class SQLConf extends Serializable with Logging {
 
   def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)
 
+  def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED)
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used 
to determine if two
    * identifiers are equal.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 7d656fc..4e64313 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.Timestamp
 
-import org.apache.log4j.{Appender, AppenderSkeleton, Logger}
+import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.spi.LoggingEvent
 
 import org.apache.spark.SparkFunSuite
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.ThreadUtils
@@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("SPARK-17702: split wide constructor into blocks due to JVM code size 
limit") {
-    val length = 5000
-    val expressions = Seq.fill(length) {
-      ToUTCTimestamp(
-        Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), 
TimestampType),
-        Literal.create("PST", StringType))
-    }
-    val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new 
GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
-    val expected = Seq.fill(length)(
-      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 
07:00:00")))
-
-    if (actual != expected) {
-      fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      val length = 5000
+      val expressions = Seq.fill(length) {
+        ToUTCTimestamp(
+          Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), 
TimestampType),
+          Literal.create("PST", StringType))
+      }
+      val plan = GenerateMutableProjection.generate(expressions)
+      val actual = plan(new 
GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
+      val expected = Seq.fill(length)(
+        DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 
07:00:00")))
+
+      if (actual != expected) {
+        fail(
+          s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
+      }
     }
   }
 
   test("SPARK-22226: group splitted expressions into one method per nested 
class") {
-    val length = 10000
-    val expressions = Seq.fill(length) {
-      ToUTCTimestamp(
-        Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), 
TimestampType),
-        Literal.create("PST", StringType))
-    }
-    val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new 
GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
-    val expected = Seq.fill(length)(
-      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 
07:00:00")))
-
-    if (actual != expected) {
-      fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      val length = 10000
+      val expressions = Seq.fill(length) {
+        ToUTCTimestamp(
+          Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), 
TimestampType),
+          Literal.create("PST", StringType))
+      }
+      val plan = GenerateMutableProjection.generate(expressions)
+      val actual = plan(new 
GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
+      val expected = Seq.fill(length)(
+        DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 
07:00:00")))
+
+      if (actual != expected) {
+        fail(
+          s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
+      }
     }
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 8823fe7..bc2c575 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeUnit._
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
 
@@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           NonFoldableLiteral.create(tz, StringType)),
         if (expected != null) Timestamp.valueOf(expected) else null)
     }
-    test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
-    test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
-    test(null, "UTC", null)
-    test("2015-07-24 00:00:00", null, null)
-    test(null, null, null)
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
+      test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
+      test(null, "UTC", null)
+      test("2015-07-24 00:00:00", null, null)
+      test(null, null, null)
+    }
+    val msg = intercept[AnalysisException] {
+      test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
+    }.getMessage
+    assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
   }
 
   test("to_utc_timestamp - invalid time zone id") {
-    Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
-      val msg = intercept[java.time.DateTimeException] {
-        GenerateUnsafeProjection.generate(
-          ToUTCTimestamp(
-            Literal(Timestamp.valueOf("2015-07-24 00:00:00")), 
Literal(invalidTz)) :: Nil)
-      }.getMessage
-      assert(msg.contains(invalidTz))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
+        val msg = intercept[java.time.DateTimeException] {
+          GenerateUnsafeProjection.generate(
+            ToUTCTimestamp(
+              Literal(Timestamp.valueOf("2015-07-24 00:00:00")), 
Literal(invalidTz)) :: Nil)
+        }.getMessage
+        assert(msg.contains(invalidTz))
+      }
     }
   }
 
@@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           NonFoldableLiteral.create(tz, StringType)),
         if (expected != null) Timestamp.valueOf(expected) else null)
     }
-    test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
-    test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
-    test(null, "UTC", null)
-    test("2015-07-24 00:00:00", null, null)
-    test(null, null, null)
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
+      test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
+      test(null, "UTC", null)
+      test("2015-07-24 00:00:00", null, null)
+      test(null, null, null)
+    }
+    val msg = intercept[AnalysisException] {
+      test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
+    }.getMessage
+    assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
   }
 
   test("from_utc_timestamp - invalid time zone id") {
-    Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
-      val msg = intercept[java.time.DateTimeException] {
-        GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), 
Literal(invalidTz)) :: Nil)
-      }.getMessage
-      assert(msg.contains(invalidTz))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
+        val msg = intercept[java.time.DateTimeException] {
+          GenerateUnsafeProjection.generate(
+            FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil)
+        }.getMessage
+        assert(msg.contains(invalidTz))
+      }
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d6d1f6a..d0be216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2988,6 +2988,7 @@ object functions {
    * @group datetime_funcs
    * @since 1.5.0
    */
+  @deprecated("This function is deprecated and will be removed in future 
versions.", "3.0.0")
   def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
     FromUTCTimestamp(ts.expr, Literal(tz))
   }
@@ -2999,6 +3000,7 @@ object functions {
    * @group datetime_funcs
    * @since 2.4.0
    */
+  @deprecated("This function is deprecated and will be removed in future 
versions.", "3.0.0")
   def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
     FromUTCTimestamp(ts.expr, tz.expr)
   }
@@ -3017,6 +3019,7 @@ object functions {
    * @group datetime_funcs
    * @since 1.5.0
    */
+  @deprecated("This function is deprecated and will be removed in future 
versions.", "3.0.0")
   def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
     ToUTCTimestamp(ts.expr, Literal(tz))
   }
@@ -3028,6 +3031,7 @@ object functions {
    * @group datetime_funcs
    * @since 2.4.0
    */
+  @deprecated("This function is deprecated and will be removed in future 
versions.", "3.0.0")
   def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
     ToUTCTimestamp(ts.expr, tz.expr)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index a9435e0..5ad1cb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -675,33 +675,41 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
       (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"),
       (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00")
     ).toDF("a", "b")
-    checkAnswer(
-      df.select(from_utc_timestamp(col("a"), "PST")),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-23 17:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
-    checkAnswer(
-      df.select(from_utc_timestamp(col("b"), "PST")),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-23 17:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      checkAnswer(
+        df.select(from_utc_timestamp(col("a"), "PST")),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-23 17:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+      checkAnswer(
+        df.select(from_utc_timestamp(col("b"), "PST")),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-23 17:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+    }
+    val msg = intercept[AnalysisException] {
+      df.select(from_utc_timestamp(col("a"), "PST")).collect()
+    }.getMessage
+    assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
   }
 
   test("from_utc_timestamp with column zone") {
-    val df = Seq(
-      (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"),
-      (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST")
-    ).toDF("a", "b", "c")
-    checkAnswer(
-      df.select(from_utc_timestamp(col("a"), col("c"))),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 02:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
-    checkAnswer(
-      df.select(from_utc_timestamp(col("b"), col("c"))),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 02:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      val df = Seq(
+        (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", 
"CET"),
+        (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", 
"PST")
+      ).toDF("a", "b", "c")
+      checkAnswer(
+        df.select(from_utc_timestamp(col("a"), col("c"))),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 02:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+      checkAnswer(
+        df.select(from_utc_timestamp(col("b"), col("c"))),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 02:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 17:00:00"))))
+    }
   }
 
   test("to_utc_timestamp with literal zone") {
@@ -709,32 +717,40 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
       (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"),
       (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00")
     ).toDF("a", "b")
-    checkAnswer(
-      df.select(to_utc_timestamp(col("a"), "PST")),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 07:00:00")),
-        Row(Timestamp.valueOf("2015-07-25 07:00:00"))))
-    checkAnswer(
-      df.select(to_utc_timestamp(col("b"), "PST")),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 07:00:00")),
-        Row(Timestamp.valueOf("2015-07-25 07:00:00"))))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      checkAnswer(
+        df.select(to_utc_timestamp(col("a"), "PST")),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 07:00:00")),
+          Row(Timestamp.valueOf("2015-07-25 07:00:00"))))
+      checkAnswer(
+        df.select(to_utc_timestamp(col("b"), "PST")),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 07:00:00")),
+          Row(Timestamp.valueOf("2015-07-25 07:00:00"))))
+    }
+    val msg = intercept[AnalysisException] {
+      df.select(to_utc_timestamp(col("a"), "PST")).collect()
+    }.getMessage
+    assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
   }
 
   test("to_utc_timestamp with column zone") {
-    val df = Seq(
-      (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"),
-      (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET")
-    ).toDF("a", "b", "c")
-    checkAnswer(
-      df.select(to_utc_timestamp(col("a"), col("c"))),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 07:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
-    checkAnswer(
-      df.select(to_utc_timestamp(col("b"), col("c"))),
-      Seq(
-        Row(Timestamp.valueOf("2015-07-24 07:00:00")),
-        Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      val df = Seq(
+        (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", 
"PST"),
+        (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", 
"CET")
+      ).toDF("a", "b", "c")
+      checkAnswer(
+        df.select(to_utc_timestamp(col("a"), col("c"))),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 07:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
+      checkAnswer(
+        df.select(to_utc_timestamp(col("b"), col("c"))),
+        Seq(
+          Row(Timestamp.valueOf("2015-07-24 07:00:00")),
+          Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
+    }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala
index cbd51b4..17bdd21 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Synthetic benchmark for date and timestamp functions.
@@ -86,9 +87,11 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
       run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd 
HH:mm:ss.SSSSSS')")
     }
     runBenchmark("Convert timestamps") {
-      val timestampExpr = "cast(id as timestamp)"
-      run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 
'CET')")
-      run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')")
+      withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+        val timestampExpr = "cast(id as timestamp)"
+        run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 
'CET')")
+        run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')")
+      }
     }
     runBenchmark("Intervals") {
       val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as 
timestamp)")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 116fd74..2a9e6b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util.{Locale, TimeZone}
 
 import org.apache.commons.io.FileUtils
-import org.scalatest.{Assertions, BeforeAndAfterAll}
+import org.scalatest.Assertions
 
 import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.rdd.BlockRDD
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StreamingAggregationStateManager}
+import 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManager
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -342,52 +342,55 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
   }
 
   testWithAllStateVersions("prune results by current_date, complete mode") {
-    import testImplicits._
-    val clock = new StreamManualClock
-    val tz = TimeZone.getDefault.getID
-    val inputData = MemoryStream[Long]
-    val aggregated =
-      inputData.toDF()
-        .select(to_utc_timestamp(from_unixtime('value * 
DateTimeUtils.SECONDS_PER_DAY), tz))
-        .toDF("value")
-        .groupBy($"value")
-        .agg(count("*"))
-        .where($"value".cast("date") >= date_sub(current_date(), 10))
-        .select(($"value".cast("long") / 
DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
-    testStream(aggregated, Complete)(
-      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-      // advance clock to 10 days, should retain all keys
-      AddData(inputData, 0L, 5L, 5L, 10L),
-      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-      CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-      // advance clock to 20 days, should retain keys >= 10
-      AddData(inputData, 15L, 15L, 20L),
-      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-      CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-      // advance clock to 30 days, should retain keys >= 20
-      AddData(inputData, 85L),
-      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-      CheckLastBatch((20L, 1), (85L, 1)),
-
-      // bounce stream and ensure correct batch timestamp is used
-      // i.e., we don't take it from the clock, which is at 90 days.
-      StopStream,
-      AssertOnQuery { q => // clear the sink
-        q.sink.asInstanceOf[MemorySink].clear()
-        q.commitLog.purge(3)
-        // advance by 60 days i.e., 90 days total
-        clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
-        true
-      },
-      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-      // Commit log blown, causing a re-run of the last batch
-      CheckLastBatch((20L, 1), (85L, 1)),
-
-      // advance clock to 100 days, should retain keys >= 90
-      AddData(inputData, 85L, 90L, 100L, 105L),
-      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-      CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
-    )
+    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
+      import testImplicits._
+      val clock = new StreamManualClock
+      val tz = TimeZone.getDefault.getID
+      val inputData = MemoryStream[Long]
+      val aggregated =
+        inputData.toDF()
+          .select(to_utc_timestamp(from_unixtime('value * 
DateTimeUtils.SECONDS_PER_DAY), tz))
+          .toDF("value")
+          .groupBy($"value")
+          .agg(count("*"))
+          .where($"value".cast("date") >= date_sub(current_date(), 10))
+          .select(
+            ($"value".cast("long") / 
DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
+      testStream(aggregated, Complete)(
+        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+        // advance clock to 10 days, should retain all keys
+        AddData(inputData, 0L, 5L, 5L, 10L),
+        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+        CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+        // advance clock to 20 days, should retain keys >= 10
+        AddData(inputData, 15L, 15L, 20L),
+        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+        CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+        // advance clock to 30 days, should retain keys >= 20
+        AddData(inputData, 85L),
+        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+        CheckLastBatch((20L, 1), (85L, 1)),
+
+        // bounce stream and ensure correct batch timestamp is used
+        // i.e., we don't take it from the clock, which is at 90 days.
+        StopStream,
+        AssertOnQuery { q => // clear the sink
+          q.sink.asInstanceOf[MemorySink].clear()
+          q.commitLog.purge(3)
+          // advance by 60 days i.e., 90 days total
+          clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
+          true
+        },
+        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+        // Commit log blown, causing a re-run of the last batch
+        CheckLastBatch((20L, 1), (85L, 1)),
+
+        // advance clock to 100 days, should retain keys >= 90
+        AddData(inputData, 85L, 90L, 100L, 105L),
+        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+        CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
+      )
+    }
   }
 
   testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in 
streaming query " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to