This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d7534a3ec1ea [SPARK-46380][SQL] Replace current time/date prior to 
evaluating inline table expressions
d7534a3ec1ea is described below

commit d7534a3ec1eab53bbd349f9ae31684337c734958
Author: Aleksandar Tomic <aleksandar.to...@databricks.com>
AuthorDate: Thu Dec 21 15:58:15 2023 +0800

    [SPARK-46380][SQL] Replace current time/date prior to evaluating inline 
table expressions
    
    With this PR proposal is to do inline table resolution in two phases:
    1) If there are no expressions that depend on current context (e.g. 
expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) 
they will be evaluated as part of ResolveInlineTable rule.
    2) Expressions that do depend on CURRENT_* evaluation will be kept as 
expressions and they evaluation will be delayed to post analysis phase.
    
    This PR aims to solve two problems with inline tables.
    
    Example1:
    ```sql
    SELECT COUNT(DISTINCT ct) FROM VALUES
    (CURRENT_TIMESTAMP()),
    (CURRENT_TIMESTAMP()),
    (CURRENT_TIMESTAMP()) as data(ct)
    ```
    Prior to this change this example would return 3 (i.e. all 
CURRENT_TIMESTAMP expressions would return different value since they would be 
evaluated individually as part of inline table evaluation). After this change 
result is 1.
    
    Example 2:
    ```sql
    CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP())
    ```
    In this example VIEW would be saved with literal evaluated during VIEW 
creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution.
    
    See section above.
    
    New test that validates this behaviour is introduced.
    
    No.
    
    Closes #44316 from dbatomic/inline_tables_curr_time_fix.
    
    Lead-authored-by: Aleksandar Tomic <aleksandar.to...@databricks.com>
    Co-authored-by: Aleksandar Tomic 
<150942779+dbato...@users.noreply.github.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/analysis/ResolveInlineTables.scala    | 68 ++++++++++++----------
 .../spark/sql/catalyst/analysis/unresolved.scala   | 15 +++++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala    | 33 +++++++++++
 .../sql/catalyst/rules/RuleIdCollection.scala      |  1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala    |  1 +
 .../analysis/ResolveInlineTablesSuite.scala        | 31 ++++++++--
 .../analyzer-results/inline-table.sql.out          | 16 ++++-
 .../postgreSQL/create_view.sql.out                 |  2 +-
 .../resources/sql-tests/inputs/inline-table.sql    |  6 ++
 .../sql-tests/results/inline-table.sql.out         | 16 +++++
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 14 +++++
 12 files changed, 165 insertions(+), 42 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 760ea466b857..73600f5c7064 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -17,28 +17,29 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, 
Expression}
+import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
- * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[LocalRelation]].
+ * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[ResolvedInlineTable]].
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) {
+      case table: UnresolvedInlineTable if table.expressionsResolved =>
+        validateInputDimension(table)
+        validateInputEvaluable(table)
+        val resolvedTable = findCommonTypesAndCast(table)
+        earlyEvalIfPossible(resolvedTable)
+    }
   }
 
   /**
@@ -74,7 +75,10 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     table.rows.foreach { row =>
       row.foreach { e =>
         // Note that nondeterministic expressions are not supported since they 
are not foldable.
-        if (!e.resolved || !trimAliases(prepareForEval(e)).foldable) {
+        // Only exception are CURRENT_LIKE expressions, which are replaced by 
a literal
+        // In later stages.
+        if ((!e.resolved && !e.containsPattern(CURRENT_LIKE))
+          || !trimAliases(prepareForEval(e)).foldable) {
           e.failAnalysis(
             errorClass = 
"INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE",
             messageParameters = Map("expr" -> toSQLExpr(e)))
@@ -84,14 +88,12 @@ object ResolveInlineTables extends Rule[LogicalPlan]
   }
 
   /**
-   * Convert a valid (with right shape and foldable inputs) 
[[UnresolvedInlineTable]]
-   * into a [[LocalRelation]].
-   *
    * This function attempts to coerce inputs into consistent types.
    *
    * This is package visible for unit testing.
    */
-  private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = 
{
+  private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable):
+    ResolvedInlineTable = {
     // For each column, traverse all the values and find a common data type 
and nullability.
     val fields = table.rows.transpose.zip(table.names).map { case (column, 
name) =>
       val inputTypes = column.map(_.dataType)
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
+      row.zipWithIndex.map {
+        case (e, ci) =>
+          val targetType = fields(ci).dataType
           val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) 
{
             e
           } else {
             cast(e, targetType)
           }
-          prepareForEval(castedExpr).eval()
-        } catch {
-          case NonFatal(ex) =>
-            table.failAnalysis(
-              errorClass = 
"INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
-              messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
-              cause = ex)
-        }
-      })
+          castedExpr
+      }
     }
 
-    LocalRelation(attributes, newRows)
+    ResolvedInlineTable(castedRows, attributes)
+  }
+
+  /**
+   * This function attempts to early evaluate rows in inline table.
+   * If evaluation doesn't rely on non-deterministic expressions (e.g. 
current_like)
+   * expressions will be evaluated and inlined as [[LocalRelation]]
+   * This is package visible for unit testing.
+   */
+  private[analysis] def earlyEvalIfPossible(table: ResolvedInlineTable): 
LogicalPlan = {
+      val earlyEvalPossible = 
table.rows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
+      if (earlyEvalPossible) EvalInlineTables(table) else table
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index b1dcb465b477..07ad5e57306a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -126,6 +126,21 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An resolved inline table that holds all the expressions that were checked 
for
+ * the right shape and common data types.
+ * This is a preparation step for 
[[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: 
Seq[Attribute])
+  extends LeafNode {
+  final override val nodePatterns: Seq[TreePattern] = Seq(INLINE_TABLE_EVAL)
+}
+
 /**
  * A table-valued function, e.g.
  * {{{
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bb2a86556c03..ec5f00d34cd8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -287,7 +287,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
       ComputeCurrentTime,
       ReplaceCurrentLike(catalogManager),
       SpecialDatetimeValues,
-      RewriteAsOfJoin)
+      RewriteAsOfJoin,
+      EvalInlineTables
+    )
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
       rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 466781fa1def..d7efc16a514b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -19,7 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import java.time.{Instant, LocalDateTime}
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{CastSupport, 
ResolvedInlineTable}
+import 
org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.prepareForEval
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -27,6 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.trees.TreePatternBits
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, 
localDateTimeToMicros}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -71,6 +77,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called 
at the very end
+ * of the analysis phase, given that all the expressions need to be fully 
resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(INLINE_TABLE_EVAL))
 {
+      case table: ResolvedInlineTable =>
+        val newRows: Seq[InternalRow] =
+          table.rows.map { row => InternalRow.fromSeq(row.map { e =>
+              try {
+                prepareForEval(e).eval()
+              } catch {
+                case NonFatal(ex) =>
+                  table.failAnalysis(
+                    errorClass = 
"INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
+                    messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
+                    cause = ex)
+              }})
+          }
+
+        LocalRelation(table.output, newRows)
+    }
+  }
+}
+
 /**
  * Computes the current date and time to make sure we return the same result 
in a single query.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index caf679f3e7a7..96f78d251c39 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -166,6 +166,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals" ::
       "org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps" ::
       "org.apache.spark.sql.catalyst.optimizer.TransposeWindow" ::
+      "org.apache.spark.sql.catalyst.optimizer.EvalInlineTables" ::
       "org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison" 
::  Nil
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index bf7b2db1719f..ce8f5951839e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -53,6 +53,7 @@ object TreePattern extends Enumeration  {
   val IF: Value = Value
   val IN: Value = Value
   val IN_SUBQUERY: Value = Value
+  val INLINE_TABLE_EVAL: Value = Value
   val INSET: Value = Value
   val INTERSECT: Value = Value
   val INVOKE: Value = Value
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index 2e6c6e4eaf4c..758b6b73e4eb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, Rand}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, 
CurrentTimestamp, Literal, Rand}
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime, 
EvalInlineTables}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types.{LongType, NullType, TimestampType}
 
@@ -83,9 +84,10 @@ class ResolveInlineTablesSuite extends AnalysisTest with 
BeforeAndAfter {
     assert(ResolveInlineTables(table) == table)
   }
 
-  test("convert") {
+  test("cast and execute") {
     val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), 
Seq(lit(2L))))
-    val converted = ResolveInlineTables.convert(table)
+    val resolved = ResolveInlineTables.findCommonTypesAndCast(table)
+    val converted = 
ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation]
 
     assert(converted.output.map(_.dataType) == Seq(LongType))
     assert(converted.data.size == 2)
@@ -93,11 +95,28 @@ class ResolveInlineTablesSuite extends AnalysisTest with 
BeforeAndAfter {
     assert(converted.data(1).getLong(0) == 2L)
   }
 
+  test("cast and execute CURRENT_LIKE expressions") {
+    val table = UnresolvedInlineTable(Seq("c1"), Seq(
+      Seq(CurrentTimestamp()), Seq(CurrentTimestamp())))
+    val casted = ResolveInlineTables.findCommonTypesAndCast(table)
+    val earlyEval = ResolveInlineTables.earlyEvalIfPossible(casted)
+    // Early eval should keep it in expression form.
+    assert(earlyEval.isInstanceOf[ResolvedInlineTable])
+
+    EvalInlineTables(ComputeCurrentTime(earlyEval)) match {
+      case LocalRelation(output, data, _) =>
+        assert(output.map(_.dataType) == Seq(TimestampType))
+        assert(data.size == 2)
+        // Make sure that both CURRENT_TIMESTAMP expressions are evaluated to 
the same value.
+        assert(data(0).getLong(0) == data(1).getLong(0))
+    }
+  }
+
   test("convert TimeZoneAwareExpression") {
     val table = UnresolvedInlineTable(Seq("c1"),
       Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
     val withTimeZone = ResolveTimeZone.apply(table)
-    val LocalRelation(output, data, _) = 
ResolveInlineTables.apply(withTimeZone)
+    val LocalRelation(output, data, _) = 
EvalInlineTables(ResolveInlineTables.apply(withTimeZone))
     val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
       .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
     assert(output.map(_.dataType) == Seq(TimestampType))
@@ -107,11 +126,11 @@ class ResolveInlineTablesSuite extends AnalysisTest with 
BeforeAndAfter {
 
   test("nullability inference in convert") {
     val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), 
Seq(lit(2L))))
-    val converted1 = ResolveInlineTables.convert(table1)
+    val converted1 = ResolveInlineTables.findCommonTypesAndCast(table1)
     assert(!converted1.schema.fields(0).nullable)
 
     val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), 
Seq(Literal(null, NullType))))
-    val converted2 = ResolveInlineTables.convert(table2)
+    val converted2 = ResolveInlineTables.findCommonTypesAndCast(table2)
     assert(converted2.schema.fields(0).nullable)
   }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
index 2a17f092a06b..adce16bf2357 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
@@ -73,9 +73,7 @@ Project [a#x, b#x]
 -- !query
 select a from values ("one", current_timestamp) as data(a, b)
 -- !query analysis
-Project [a#x]
-+- SubqueryAlias data
-   +- LocalRelation [a#x, b#x]
+[Analyzer test output redacted due to nondeterminism]
 
 
 -- !query
@@ -241,3 +239,15 @@ select * from values (10 + try_divide(5, 0))
 -- !query analysis
 Project [col1#x]
 +- LocalRelation [col1#x]
+
+
+-- !query
+select count(distinct ct) from values now(), now(), now() as data(ct)
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+select count(distinct ct) from values current_timestamp(), current_timestamp() 
as data(ct)
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
index b199cb55f2a4..7f477c80d46c 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
@@ -1661,7 +1661,7 @@ select * from tt7a left join tt8a using (x), tt8a tt8ax, 
false, false, Persisted
       :- Project [a#x, b#x, c#x, d#x, e#x]
       :  +- SubqueryAlias v
       :     +- Project [col1#x AS a#x, col2#x AS b#x, col3#x AS c#x, col4#x AS 
d#x, col5#x AS e#x]
-      :        +- LocalRelation [col1#x, col2#x, col3#x, col4#x, col5#x]
+      :        +- ResolvedInlineTable [[now(), 2, 3, now(), 5]], [col1#x, 
col2#x, col3#x, col4#x, col5#x]
       +- Project [cast(x#x as timestamp) AS x#x, y#x, z#x, x#x, z#x]
          +- Project [x#x, y#x, z#x, x#x, z#x]
             +- Join Inner
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql 
b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index 6867248f5765..8f65dc77c960 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -60,3 +60,9 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-
 select * from values (try_add(5, 0));
 select * from values (try_divide(5, 0));
 select * from values (10 + try_divide(5, 0));
+
+-- now() should be kept as tempResolved inline expression.
+select count(distinct ct) from values now(), now(), now() as data(ct);
+
+-- current_timestamp() should be kept as tempResolved inline expression.
+select count(distinct ct) from values current_timestamp(), current_timestamp() 
as data(ct);
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out 
b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index 709d7ab73f6c..b6c90b95c1d3 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -266,3 +266,19 @@ select * from values (10 + try_divide(5, 0))
 struct<col1:double>
 -- !query output
 NULL
+
+
+-- !query
+select count(distinct ct) from values now(), now(), now() as data(ct)
+-- !query schema
+struct<count(DISTINCT ct):bigint>
+-- !query output
+1
+
+
+-- !query
+select count(distinct ct) from values current_timestamp(), current_timestamp() 
as data(ct)
+-- !query schema
+struct<count(DISTINCT ct):bigint>
+-- !query output
+1
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index e258d600a2aa..a1147c16cc86 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -1216,4 +1216,18 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
       }
     }
   }
+
+  test("Inline table with current time expression") {
+    withView("v1") {
+      sql("CREATE VIEW v1 (t1, t2) AS SELECT * FROM VALUES (now(), now())")
+      val r1 = sql("select t1, t2 from v1").collect()(0)
+      val ts1 = (r1.getTimestamp(0), r1.getTimestamp(1))
+      assert(ts1._1 == ts1._2)
+      Thread.sleep(1)
+      val r2 = sql("select t1, t2 from v1").collect()(0)
+      val ts2 = (r2.getTimestamp(0), r2.getTimestamp(1))
+      assert(ts2._1 == ts2._2)
+      assert(ts1._1.getTime < ts2._1.getTime)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to