This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1686cff9 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function 1686cff9 is described below commit 1686cff9a15311b1ab909996186871e0b56aab91 Author: gengjiaan <gengji...@360.cn> AuthorDate: Wed Jul 14 15:38:46 2021 +0800 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function ### What changes were proposed in this pull request? `LOCALTIMESTAMP()` is a datetime value function from ANSI SQL. The syntax show below: ``` <datetime value function> ::= <current date value function> | <current time value function> | <current timestamp value function> | <current local time value function> | <current local timestamp value function> <current date value function> ::= CURRENT_DATE <current time value function> ::= CURRENT_TIME [ <left paren> <time precision> <right paren> ] <current local time value function> ::= LOCALTIME [ <left paren> <time precision> <right paren> ] <current timestamp value function> ::= CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ] <current local timestamp value function> ::= LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ] ``` `LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`. Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times. ### Why are the changes needed? `CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation. `LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation. The `LOCALTIMESTAMP` function is an ANSI SQL. The `LOCALTIMESTAMP` function is very useful. ### Does this PR introduce _any_ user-facing change? 'Yes'. Support new function `LOCALTIMESTAMP()`. ### How was this patch tested? New tests. Closes #33258 from beliefer/SPARK-36037. Lead-authored-by: gengjiaan <gengji...@360.cn> Co-authored-by: Jiaan Geng <belie...@163.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit b4f7758944505c7c957e2e0c2c70da5ea746099b) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../analysis/UnsupportedOperationChecker.scala | 4 +- .../catalyst/expressions/datetimeExpressions.scala | 42 ++++++++- .../sql/catalyst/optimizer/finishAnalysis.scala | 10 +- .../expressions/DateExpressionsSuite.scala | 9 ++ .../optimizer/ComputeCurrentTimeSuite.scala | 25 ++++- .../execution/streaming/MicroBatchExecution.scala | 5 +- .../streaming/continuous/ContinuousExecution.scala | 8 +- .../scala/org/apache/spark/sql/functions.scala | 10 ++ .../sql-functions/sql-expression-schema.md | 3 +- .../test/resources/sql-tests/inputs/datetime.sql | 1 + .../sql-tests/results/ansi/datetime.sql.out | 10 +- .../sql-tests/results/datetime-legacy.sql.out | 10 +- .../resources/sql-tests/results/datetime.sql.out | 10 +- .../results/timestampNTZ/datetime.sql.out | 10 +- .../sql/expressions/ExpressionInfoSuite.scala | 1 + .../sql/streaming/StreamingAggregationSuite.scala | 102 +++++++++++---------- 17 files changed, 196 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index d518bf3..60ca1e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -519,6 +519,7 @@ object FunctionRegistry { expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), expression[CurrentTimeZone]("current_timezone"), + expression[LocalTimestamp]("localtimestamp"), expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8629300..13c7f75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestamp, GroupingSets, MonotonicallyIncreasingID, Now} +import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -417,7 +417,7 @@ object UnsupportedOperationChecker extends Logging { subPlan.expressions.foreach { e => if (e.collectLeaves().exists { - case (_: CurrentTimestamp | _: Now | _: CurrentDate) => true + case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => true case _ => false }) { throwError(s"Continuous processing does not support current time operations.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index ca8dea8..1146ba7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.text.ParseException -import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId} +import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId, ZoneOffset} import java.time.format.DateTimeParseException import java.util.Locale @@ -201,6 +201,44 @@ case class Now() extends CurrentTimestampLike { } /** + * Returns the current timestamp without time zone at the start of query evaluation. + * There is no code generation since this expression should get constant folded by the optimizer. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_() - Returns the current timestamp without time zone at the start of query evaluation. All calls of localtimestamp within the same query return the same value. + + _FUNC_ - Returns the current local date-time at the session time zone at the start of query evaluation. + """, + examples = """ + Examples: + > SELECT _FUNC_(); + 2020-04-25 15:49:11.914 + """, + group = "datetime_funcs", + since = "3.2.0") +case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression + with TimeZoneAwareExpression with CodegenFallback { + + def this() = this(None) + + override def foldable: Boolean = true + override def nullable: Boolean = false + + override def dataType: DataType = TimestampNTZType + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE) + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId)) + + override def prettyName: String = "localtimestamp" +} + +/** * Expression representing the current batch time, which is used by StreamExecution to * 1. prevent optimizer from pushing this expression below a stateful operator * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) @@ -236,6 +274,8 @@ case class CurrentBatchTimestamp( val timestampUs = millisToMicros(timestampMs) dataType match { case _: TimestampType => Literal(timestampUs, TimestampType) + case _: TimestampNTZType => + Literal(convertTz(timestampUs, ZoneOffset.UTC, zoneId), TimestampNTZType) case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 116ccde..deacc3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -81,16 +80,19 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long] val currentTime = Literal.create(timestamp, timeExpr.dataType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) + val localTimestamps = mutable.Map.empty[String, Literal] plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { case currentDate @ CurrentDate(Some(timeZoneId)) => currentDates.getOrElseUpdate(timeZoneId, { - Literal.create( - DateTimeUtils.microsToDays(timestamp, currentDate.zoneId), - DateType) + Literal.create(currentDate.eval().asInstanceOf[Int], DateType) }) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone + case localTimestamp @ LocalTimestamp(Some(timeZoneId)) => + localTimestamps.getOrElseUpdate(timeZoneId, { + Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType) + }) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index f288d82..93fc775 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -96,6 +96,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { assert(math.abs(t1 - ct.getTime) < 5000) } + test("datetime function localtimestamp") { + outstandingTimezonesIds.foreach { zid => + val ct = LocalTimestamp(Some(zid)).eval(EmptyRow).asInstanceOf[Long] + val t1 = DateTimeUtils.localDateTimeToMicros( + LocalDateTime.now(DateTimeUtils.getZoneId(zid))) + assert(math.abs(t1 - ct) < 5000) + } + } + test("DayOfYear") { val sdfDay = new SimpleDateFormat("D", Locale.US) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 82d6757..9b04dcd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.catalyst.optimizer -import java.time.ZoneId +import java.time.{LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -81,4 +81,25 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(lits.size == 1) assert(lits.head == SQLConf.get.sessionLocalTimeZone) } + + test("analyzer should replace localtimestamp with literals") { + val in = Project(Seq(Alias(LocalTimestamp(), "a")(), Alias(LocalTimestamp(), "b")()), + LocalRelation()) + + val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) + + val min = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId)) + val plan = Optimize.execute(in.analyze).asInstanceOf[Project] + val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId)) + + val lits = new scala.collection.mutable.ArrayBuffer[Long] + plan.transformAllExpressions { case e: Literal => + lits += e.value.asInstanceOf[Long] + e + } + assert(lits.size == 2) + assert(lits(0) >= min && lits(0) <= max) + assert(lits(1) >= min && lits(1) <= max) + assert(lits(0) == lits(1)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 1ffcaf5..c31307f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE @@ -558,6 +558,9 @@ class MicroBatchExecution( // dummy string to prevent UnresolvedException and to prevent to be used in the future. CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType, Some("Dummy TimeZoneId")) + case lt: LocalTimestamp => + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, + lt.dataType, lt.timeZoneId) case cd: CurrentDate => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType, cd.timeZoneId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 7c52b80..5e40860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE @@ -172,9 +172,9 @@ class ContinuousExecution( } withNewSources.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { - case (_: CurrentTimestamp | _: CurrentDate) => - throw new IllegalStateException( - "CurrentTimestamp and CurrentDate not yet supported for continuous processing") + case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => + throw new IllegalStateException("CurrentTimestamp, Now, CurrentDate and LocalTimestamp" + + " not yet supported for continuous processing") } reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 77e4a67..3b39d97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2975,6 +2975,16 @@ object functions { def current_timestamp(): Column = withExpr { CurrentTimestamp() } /** + * Returns the current timestamp without time zone at the start of query evaluation + * as a timestamp without time zone column. + * All calls of localtimestamp within the same query return the same value. + * + * @group datetime_funcs + * @since 3.2.0 + */ + def localtimestamp(): Column = withExpr { LocalTimestamp() } + + /** * Converts a date/timestamp/string to a value of string in the format specified by the date * format given by the second argument. * diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index f71f3a8..c13a1d4 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -1,6 +1,6 @@ <!-- Automatically generated by ExpressionsSchemaSuite --> ## Summary - - Number of queries: 359 + - Number of queries: 360 - Number of expressions that missing example: 13 - Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint,window ## Schema of Built-in Functions @@ -162,6 +162,7 @@ | org.apache.spark.sql.catalyst.expressions.LessThanOrEqual | <= | SELECT 2 <= 2 | struct<(2 <= 2):boolean> | | org.apache.spark.sql.catalyst.expressions.Levenshtein | levenshtein | SELECT levenshtein('kitten', 'sitting') | struct<levenshtein(kitten, sitting):int> | | org.apache.spark.sql.catalyst.expressions.Like | like | SELECT like('Spark', '_park') | struct<Spark LIKE _park:boolean> | +| org.apache.spark.sql.catalyst.expressions.LocalTimestamp | localtimestamp | SELECT localtimestamp() | struct<localtimestamp():timestamp_ntz> | | org.apache.spark.sql.catalyst.expressions.Log | ln | SELECT ln(1) | struct<ln(1):double> | | org.apache.spark.sql.catalyst.expressions.Log10 | log10 | SELECT log10(10) | struct<LOG10(10):double> | | org.apache.spark.sql.catalyst.expressions.Log1p | log1p | SELECT log1p(0) | struct<LOG1P(0):double> | diff --git a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql index db30c22..74a451e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql @@ -24,6 +24,7 @@ select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), DATE_FROM_UNIX_DATE(nu select UNIX_DATE(DATE('1970-01-01')), UNIX_DATE(DATE('2020-12-04')), UNIX_DATE(null); -- [SPARK-16836] current_date and current_timestamp literals select current_date = current_date(), current_timestamp = current_timestamp(); +select localtimestamp() = localtimestamp(); select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd'); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out index beff665..4e999f3 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 205 +-- Number of queries: 206 -- !query @@ -142,6 +142,14 @@ select current_date = current_date(), current_timestamp = current_timestamp() -- !query +select localtimestamp() = localtimestamp() +-- !query schema +struct<(localtimestamp() = localtimestamp()):boolean> +-- !query output +true + + +-- !query select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd') -- !query schema struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date> diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out index b6fe551..2108808 100644 --- a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 205 +-- Number of queries: 206 -- !query @@ -136,6 +136,14 @@ true true -- !query +select localtimestamp() = localtimestamp() +-- !query schema +struct<(localtimestamp() = localtimestamp()):boolean> +-- !query output +true + + +-- !query select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd') -- !query schema struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date> diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out index c865f2c..f6278f6 100755 --- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 205 +-- Number of queries: 206 -- !query @@ -136,6 +136,14 @@ true true -- !query +select localtimestamp() = localtimestamp() +-- !query schema +struct<(localtimestamp() = localtimestamp()):boolean> +-- !query output +true + + +-- !query select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd') -- !query schema struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date> diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out index a5659e1..69c4b8f 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 205 +-- Number of queries: 206 -- !query @@ -136,6 +136,14 @@ true true -- !query +select localtimestamp() = localtimestamp() +-- !query schema +struct<(localtimestamp() = localtimestamp()):boolean> +-- !query output +true + + +-- !query select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd') -- !query schema struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index 589ac51..30ee97a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -175,6 +175,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { "org.apache.spark.sql.catalyst.expressions.CurrentTimestamp", "org.apache.spark.sql.catalyst.expressions.CurrentTimeZone", "org.apache.spark.sql.catalyst.expressions.Now", + "org.apache.spark.sql.catalyst.expressions.LocalTimestamp", // Random output without a seed "org.apache.spark.sql.catalyst.expressions.Rand", "org.apache.spark.sql.catalyst.expressions.Randn", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index eef13ca..77334ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode._ import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructType, TimestampType} import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils @@ -406,56 +406,66 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { ) } - testWithAllStateVersions("prune results by current_time, complete mode") { + testWithAllStateVersions("prune results by current_time or localtimestamp, complete mode") { import testImplicits._ - val clock = new StreamManualClock - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() + val inputDataOne = MemoryStream[Long] + val aggregatedOne = + inputDataOne.toDF() .groupBy($"value") .agg(count("*")) .where('value >= current_timestamp().cast("long") - 10L) + val inputDataTwo = MemoryStream[Long] + val aggregatedTwo = + inputDataTwo.toDF() + .groupBy($"value") + .agg(count("*")) + .where('value >= localtimestamp().cast(TimestampType).cast("long") - 10L) + + Seq((inputDataOne, aggregatedOne), (inputDataTwo, aggregatedTwo)).foreach { x => + val inputData = x._1 + val aggregated = x._2 + val clock = new StreamManualClock + testStream(aggregated, Complete)( + StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), + + // advance clock to 10 seconds, all keys retained + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + + // advance clock to 20 seconds, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + + // advance clock to 30 seconds, should retain keys >= 20 + AddData(inputData, 0L, 85L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 seconds. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.commitLog.purge(3) + // advance by a minute i.e., 90 seconds total + clock.advance(60 * 1000L) + true + }, + StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), + // The commit log blown, causing the last batch to re-run + CheckLastBatch((20L, 1), (85L, 1)), + AssertOnQuery { q => + clock.getTimeMillis() == 90000L + }, - testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), - - // advance clock to 10 seconds, all keys retained - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(10 * 1000), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - - // advance clock to 20 seconds, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(10 * 1000), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - - // advance clock to 30 seconds, should retain keys >= 20 - AddData(inputData, 0L, 85L), - AdvanceManualClock(10 * 1000), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 seconds. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.commitLog.purge(3) - // advance by a minute i.e., 90 seconds total - clock.advance(60 * 1000L) - true - }, - StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), - // The commit log blown, causing the last batch to re-run - CheckLastBatch((20L, 1), (85L, 1)), - AssertOnQuery { q => - clock.getTimeMillis() == 90000L - }, - - // advance clock to 100 seconds, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(10 * 1000), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) + // advance clock to 100 seconds, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) + } } testWithAllStateVersions("prune results by current_date, complete mode") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org