This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c5e83ab [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp c5e83ab is described below commit c5e83ab92c0cb514963209dc3e70ba0e24570082 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Apr 2 10:20:06 2019 +0800 [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp ## What changes were proposed in this pull request? In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception. One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`. ## How was this patch tested? The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`. Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- R/pkg/R/functions.R | 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 18 +++- python/pyspark/sql/functions.py | 10 ++ .../catalyst/expressions/datetimeExpressions.scala | 12 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 8 ++ .../catalyst/expressions/CodeGenerationSuite.scala | 61 ++++++----- .../expressions/DateExpressionsSuite.scala | 63 ++++++++---- .../scala/org/apache/spark/sql/functions.scala | 4 + .../org/apache/spark/sql/DateFunctionsSuite.scala | 112 ++++++++++++--------- .../execution/benchmark/DateTimeBenchmark.scala | 9 +- .../sql/streaming/StreamingAggregationSuite.scala | 9 +- 11 files changed, 199 insertions(+), 109 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d91896a..0566a47 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"), #' @note from_utc_timestamp since 1.5.0 setMethod("from_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated(msg = "from_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x) column(jc) }) @@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"), #' @note to_utc_timestamp since 1.5.0 setMethod("to_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated(msg = "to_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x) column(jc) }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 2394f74..fdc7474 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1905,10 +1905,20 @@ test_that("date functions on a DataFrame", { df2 <- createDataFrame(l2) expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) - expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) - expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) + conf <- callJMethod(sparkSession, "conf") + isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled") + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true") + tryCatch({ + # Both from_utc_timestamp and to_utc_timestamp are deprecated as of SPARK-25496 + expect_equal(suppressWarnings(collect(select(df2, from_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) + expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) + }, + finally = { + # Reverting the conf back + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled) + }) expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6ae2357..22163f5 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1306,7 +1306,10 @@ def from_utc_timestamp(timestamp, tz): [Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))] >>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))] + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -1340,7 +1343,10 @@ def to_utc_timestamp(timestamp, tz): [Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))] >>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))] + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -3191,9 +3197,13 @@ def _test(): globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)]) + + spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") + spark.stop() if failure_count: sys.exit(-1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 3cda989..784425e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -26,11 +26,13 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -1021,6 +1023,11 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S case class FromUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "from_utc_timestamp" @@ -1227,6 +1234,11 @@ case class MonthsBetween( case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "to_utc_timestamp" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 411805e..71a49c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1723,6 +1723,12 @@ object SQLConf { "and java.sql.Date are used for the same purpose.") .booleanConf .createWithDefault(false) + + val UTC_TIMESTAMP_FUNC_ENABLED = buildConf("spark.sql.legacy.utcTimestampFunc.enabled") + .doc("The configuration property enables the to_utc_timestamp() " + + "and from_utc_timestamp() functions.") + .booleanConf + .createWithDefault(false) } /** @@ -1916,6 +1922,8 @@ class SQLConf extends Serializable with Logging { def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED) + def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 7d656fc..4e64313 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import org.apache.log4j.{Appender, AppenderSkeleton, Logger} +import org.apache.log4j.AppenderSkeleton import org.apache.log4j.spi.LoggingEvent import org.apache.spark.SparkFunSuite @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ThreadUtils @@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") { - val length = 5000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 5000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } test("SPARK-22226: group splitted expressions into one method per nested class") { - val length = 10000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 10000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 8823fe7..bc2c575 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate( - ToUTCTimestamp( - Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + ToUTCTimestamp( + Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } @@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d6d1f6a..d0be216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2988,6 +2988,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: String): Column = withExpr { FromUTCTimestamp(ts.expr, Literal(tz)) } @@ -2999,6 +3000,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr { FromUTCTimestamp(ts.expr, tz.expr) } @@ -3017,6 +3019,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: String): Column = withExpr { ToUTCTimestamp(ts.expr, Literal(tz)) } @@ -3028,6 +3031,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr { ToUTCTimestamp(ts.expr, tz.expr) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index a9435e0..5ad1cb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -675,33 +675,41 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(from_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(from_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(from_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") - ).toDF("a", "b", "c") - checkAnswer( - df.select(from_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") + ).toDF("a", "b", "c") + checkAnswer( + df.select(from_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } } test("to_utc_timestamp with literal zone") { @@ -709,32 +717,40 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(to_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(to_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(to_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") - ).toDF("a", "b", "c") - checkAnswer( - df.select(to_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") + ).toDF("a", "b", "c") + checkAnswer( + df.select(to_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index cbd51b4..17bdd21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.internal.SQLConf /** * Synthetic benchmark for date and timestamp functions. @@ -86,9 +87,11 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')") } runBenchmark("Convert timestamps") { - val timestampExpr = "cast(id as timestamp)" - run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") - run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val timestampExpr = "cast(id as timestamp)" + run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") + run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + } } runBenchmark("Intervals") { val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 116fd74..485eb7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.streaming import java.io.File -import java.util.{Locale, TimeZone} +import java.util.Locale import org.apache.commons.io.FileUtils -import org.scalatest.{Assertions, BeforeAndAfterAll} +import org.scalatest.Assertions import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.rdd.BlockRDD @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.state.{StateStore, StreamingAggregationStateManager} +import org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManager import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -344,11 +344,10 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { testWithAllStateVersions("prune results by current_date, complete mode") { import testImplicits._ val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID val inputData = MemoryStream[Long] val aggregated = inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .select(to_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY))) .toDF("value") .groupBy($"value") .agg(count("*")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org