This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5fe963f8560e [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions 5fe963f8560e is described below commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc Author: Aleksandar Tomic <aleksandar.to...@databricks.com> AuthorDate: Thu Dec 21 15:58:15 2023 +0800 [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions ### What changes were proposed in this pull request? With this PR proposal is to do inline table resolution in two phases: 1) If there are no expressions that depend on current context (e.g. expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) they will be evaluated as part of ResolveInlineTable rule. 2) Expressions that do depend on CURRENT_* evaluation will be kept as expressions and they evaluation will be delayed to post analysis phase. ### Why are the changes needed? This PR aims to solve two problems with inline tables. Example1: ```sql SELECT COUNT(DISTINCT ct) FROM VALUES (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()) as data(ct) ``` Prior to this change this example would return 3 (i.e. all CURRENT_TIMESTAMP expressions would return different value since they would be evaluated individually as part of inline table evaluation). After this change result is 1. Example 2: ```sql CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP()) ``` In this example VIEW would be saved with literal evaluated during VIEW creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution. ### Does this PR introduce _any_ user-facing change? See section above. ### How was this patch tested? New test that validates this behaviour is introduced. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44316 from dbatomic/inline_tables_curr_time_fix. Lead-authored-by: Aleksandar Tomic <aleksandar.to...@databricks.com> Co-authored-by: Aleksandar Tomic <150942779+dbato...@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/ResolveInlineTables.scala | 68 ++++++++++++---------- .../spark/sql/catalyst/analysis/unresolved.scala | 15 +++++ .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +- .../sql/catalyst/optimizer/finishAnalysis.scala | 34 ++++++++++- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../spark/sql/catalyst/trees/TreePatterns.scala | 1 + .../analysis/ResolveInlineTablesSuite.scala | 31 ++++++++-- .../analyzer-results/inline-table.sql.out | 16 ++++- .../postgreSQL/create_view.sql.out | 2 +- .../resources/sql-tests/inputs/inline-table.sql | 6 ++ .../sql-tests/results/inline-table.sql.out | 16 +++++ .../apache/spark/sql/execution/SQLViewSuite.scala | 14 +++++ 12 files changed, 165 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 760ea466b857..73600f5c7064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -17,28 +17,29 @@ package org.apache.spark.sql.catalyst.analysis -import scala.util.control.NonFatal - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression} +import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.AlwaysProcess +import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId} import org.apache.spark.sql.types.{StructField, StructType} /** - * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[ResolvedInlineTable]]. */ object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport with AliasHelper with EvalHelper { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( - AlwaysProcess.fn, ruleId) { - case table: UnresolvedInlineTable if table.expressionsResolved => - validateInputDimension(table) - validateInputEvaluable(table) - convert(table) + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) { + case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + val resolvedTable = findCommonTypesAndCast(table) + earlyEvalIfPossible(resolvedTable) + } } /** @@ -74,7 +75,10 @@ object ResolveInlineTables extends Rule[LogicalPlan] table.rows.foreach { row => row.foreach { e => // Note that nondeterministic expressions are not supported since they are not foldable. - if (!e.resolved || !trimAliases(prepareForEval(e)).foldable) { + // Only exception are CURRENT_LIKE expressions, which are replaced by a literal + // In later stages. + if ((!e.resolved && !e.containsPattern(CURRENT_LIKE)) + || !trimAliases(prepareForEval(e)).foldable) { e.failAnalysis( errorClass = "INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE", messageParameters = Map("expr" -> toSQLExpr(e))) @@ -84,14 +88,12 @@ object ResolveInlineTables extends Rule[LogicalPlan] } /** - * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] - * into a [[LocalRelation]]. - * * This function attempts to coerce inputs into consistent types. * * This is package visible for unit testing. */ - private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { + private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable): + ResolvedInlineTable = { // For each column, traverse all the values and find a common data type and nullability. val fields = table.rows.transpose.zip(table.names).map { case (column, name) => val inputTypes = column.map(_.dataType) @@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan] val attributes = DataTypeUtils.toAttributes(StructType(fields)) assert(fields.size == table.names.size) - val newRows: Seq[InternalRow] = table.rows.map { row => - InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => - val targetType = fields(ci).dataType - try { + val castedRows: Seq[Seq[Expression]] = table.rows.map { row => + row.zipWithIndex.map { + case (e, ci) => + val targetType = fields(ci).dataType val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) { e } else { cast(e, targetType) } - prepareForEval(castedExpr).eval() - } catch { - case NonFatal(ex) => - table.failAnalysis( - errorClass = "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION", - messageParameters = Map("sqlExpr" -> toSQLExpr(e)), - cause = ex) - } - }) + castedExpr + } } - LocalRelation(attributes, newRows) + ResolvedInlineTable(castedRows, attributes) + } + + /** + * This function attempts to early evaluate rows in inline table. + * If evaluation doesn't rely on non-deterministic expressions (e.g. current_like) + * expressions will be evaluated and inlined as [[LocalRelation]] + * This is package visible for unit testing. + */ + private[analysis] def earlyEvalIfPossible(table: ResolvedInlineTable): LogicalPlan = { + val earlyEvalPossible = table.rows.flatten.forall(!_.containsPattern(CURRENT_LIKE)) + if (earlyEvalPossible) EvalInlineTables(table) else table } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index e1dec5955a7f..b32ff671b2b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -131,6 +131,21 @@ case class UnresolvedInlineTable( lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) } +/** + * An resolved inline table that holds all the expressions that were checked for + * the right shape and common data types. + * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which + * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]] + * for this inline table. + * + * @param output list of column attributes + * @param rows expressions for the data rows + */ +case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute]) + extends LeafNode { + final override val nodePatterns: Seq[TreePattern] = Seq(INLINE_TABLE_EVAL) +} + /** * A table-valued function, e.g. * {{{ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a4b25cbd1d2e..5a19c5e3c241 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -293,7 +293,9 @@ abstract class Optimizer(catalogManager: CatalogManager) ComputeCurrentTime, ReplaceCurrentLike(catalogManager), SpecialDatetimeValues, - RewriteAsOfJoin) + RewriteAsOfJoin, + EvalInlineTables + ) override def apply(plan: LogicalPlan): LogicalPlan = { rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 18c85999312d..92ac7599a8ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer import java.time.{Instant, LocalDateTime, ZoneId} -import org.apache.spark.sql.catalyst.CurrentUserContext +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow} +import org.apache.spark.sql.catalyst.analysis.{CastSupport, ResolvedInlineTable} +import org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.prepareForEval import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -27,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros} +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -70,6 +75,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { } } +/** + * Computes expressions in inline tables. This rule is supposed to be called at the very end + * of the analysis phase, given that all the expressions need to be fully resolved/replaced + * at this point. + */ +object EvalInlineTables extends Rule[LogicalPlan] with CastSupport { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(INLINE_TABLE_EVAL)) { + case table: ResolvedInlineTable => + val newRows: Seq[InternalRow] = + table.rows.map { row => InternalRow.fromSeq(row.map { e => + try { + prepareForEval(e).eval() + } catch { + case NonFatal(ex) => + table.failAnalysis( + errorClass = "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION", + messageParameters = Map("sqlExpr" -> toSQLExpr(e)), + cause = ex) + }}) + } + + LocalRelation(table.output, newRows) + } + } +} + /** * Computes the current date and time to make sure we return the same result in a single query. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index a6d03692646c..8eeea74b5376 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -168,6 +168,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals" :: "org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps" :: "org.apache.spark.sql.catalyst.optimizer.TransposeWindow" :: + "org.apache.spark.sql.catalyst.optimizer.EvalInlineTables" :: "org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison" :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index fc869bce2772..daa4ea0c8616 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -54,6 +54,7 @@ object TreePattern extends Enumeration { val IF: Value = Value val IN: Value = Value val IN_SUBQUERY: Value = Value + val INLINE_TABLE_EVAL: Value = Value val INSET: Value = Value val INTERSECT: Value = Value val INVOKE: Value = Value diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 2e6c6e4eaf4c..758b6b73e4eb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTimestamp, Literal, Rand} import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime, EvalInlineTables} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types.{LongType, NullType, TimestampType} @@ -83,9 +84,10 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { assert(ResolveInlineTables(table) == table) } - test("convert") { + test("cast and execute") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) - val converted = ResolveInlineTables.convert(table) + val resolved = ResolveInlineTables.findCommonTypesAndCast(table) + val converted = ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation] assert(converted.output.map(_.dataType) == Seq(LongType)) assert(converted.data.size == 2) @@ -93,11 +95,28 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { assert(converted.data(1).getLong(0) == 2L) } + test("cast and execute CURRENT_LIKE expressions") { + val table = UnresolvedInlineTable(Seq("c1"), Seq( + Seq(CurrentTimestamp()), Seq(CurrentTimestamp()))) + val casted = ResolveInlineTables.findCommonTypesAndCast(table) + val earlyEval = ResolveInlineTables.earlyEvalIfPossible(casted) + // Early eval should keep it in expression form. + assert(earlyEval.isInstanceOf[ResolvedInlineTable]) + + EvalInlineTables(ComputeCurrentTime(earlyEval)) match { + case LocalRelation(output, data, _) => + assert(output.map(_.dataType) == Seq(TimestampType)) + assert(data.size == 2) + // Make sure that both CURRENT_TIMESTAMP expressions are evaluated to the same value. + assert(data(0).getLong(0) == data(1).getLong(0)) + } + } + test("convert TimeZoneAwareExpression") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType)))) val withTimeZone = ResolveTimeZone.apply(table) - val LocalRelation(output, data, _) = ResolveInlineTables.apply(withTimeZone) + val LocalRelation(output, data, _) = EvalInlineTables(ResolveInlineTables.apply(withTimeZone)) val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType) .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long] assert(output.map(_.dataType) == Seq(TimestampType)) @@ -107,11 +126,11 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { test("nullability inference in convert") { val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) - val converted1 = ResolveInlineTables.convert(table1) + val converted1 = ResolveInlineTables.findCommonTypesAndCast(table1) assert(!converted1.schema.fields(0).nullable) val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType)))) - val converted2 = ResolveInlineTables.convert(table2) + val converted2 = ResolveInlineTables.findCommonTypesAndCast(table2) assert(converted2.schema.fields(0).nullable) } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out index 0d79168651fd..988df7de1a3c 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out @@ -73,9 +73,7 @@ Project [a#x, b#x] -- !query select a from values ("one", current_timestamp) as data(a, b) -- !query analysis -Project [a#x] -+- SubqueryAlias data - +- LocalRelation [a#x, b#x] +[Analyzer test output redacted due to nondeterminism] -- !query @@ -246,3 +244,15 @@ select * from values (10 + try_divide(5, 0)) -- !query analysis Project [col1#x] +- LocalRelation [col1#x] + + +-- !query +select count(distinct ct) from values now(), now(), now() as data(ct) +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +select count(distinct ct) from values current_timestamp(), current_timestamp() as data(ct) +-- !query analysis +[Analyzer test output redacted due to nondeterminism] diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out index 1adc3ae0fa65..0a74ec87eb83 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out @@ -1674,7 +1674,7 @@ select * from tt7a left join tt8a using (x), tt8a tt8ax, false, false, Persisted :- Project [a#x, b#x, c#x, d#x, e#x] : +- SubqueryAlias v : +- Project [col1#x AS a#x, col2#x AS b#x, col3#x AS c#x, col4#x AS d#x, col5#x AS e#x] - : +- LocalRelation [col1#x, col2#x, col3#x, col4#x, col5#x] + : +- ResolvedInlineTable [[now(), 2, 3, now(), 5]], [col1#x, col2#x, col3#x, col4#x, col5#x] +- Project [cast(x#x as timestamp) AS x#x, y#x, z#x, x#x, z#x] +- Project [x#x, y#x, z#x, x#x, z#x] +- Join Inner diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 6867248f5765..8f65dc77c960 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -60,3 +60,9 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991- select * from values (try_add(5, 0)); select * from values (try_divide(5, 0)); select * from values (10 + try_divide(5, 0)); + +-- now() should be kept as tempResolved inline expression. +select count(distinct ct) from values now(), now(), now() as data(ct); + +-- current_timestamp() should be kept as tempResolved inline expression. +select count(distinct ct) from values current_timestamp(), current_timestamp() as data(ct); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index f34b2fbd8724..4dcdf8ac3e98 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -271,3 +271,19 @@ select * from values (10 + try_divide(5, 0)) struct<col1:double> -- !query output NULL + + +-- !query +select count(distinct ct) from values now(), now(), now() as data(ct) +-- !query schema +struct<count(DISTINCT ct):bigint> +-- !query output +1 + + +-- !query +select count(distinct ct) from values current_timestamp(), current_timestamp() as data(ct) +-- !query schema +struct<count(DISTINCT ct):bigint> +-- !query output +1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index a7cab381c7f6..bca16579acff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -1306,4 +1306,18 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } } + + test("Inline table with current time expression") { + withView("v1") { + sql("CREATE VIEW v1 (t1, t2) AS SELECT * FROM VALUES (now(), now())") + val r1 = sql("select t1, t2 from v1").collect()(0) + val ts1 = (r1.getTimestamp(0), r1.getTimestamp(1)) + assert(ts1._1 == ts1._2) + Thread.sleep(1) + val r2 = sql("select t1, t2 from v1").collect()(0) + val ts2 = (r2.getTimestamp(0), r2.getTimestamp(1)) + assert(ts2._1 == ts2._2) + assert(ts1._1.getTime < ts2._1.getTime) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org