Repository: spark
Updated Branches:
  refs/heads/branch-1.6 eb1ba1e2e -> 0c970fd2c


[SPARK-11849][SQL] Analyzer should replace current_date and current_timestamp 
with literals

We currently rely on the optimizer's constant folding to replace 
current_timestamp and current_date. However, this can still result in different 
values for different instances of current_timestamp/current_date if the 
optimizer is not running fast enough.

A better solution is to replace these functions in the analyzer in one shot.

Author: Reynold Xin <r...@databricks.com>

Closes #9833 from rxin/SPARK-11849.

(cherry picked from commit f449992009becc8f7c7f06cda522b9beaa1e263c)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c970fd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c970fd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c970fd2

Branch: refs/heads/branch-1.6
Commit: 0c970fd2cb79f97827fe6f133856687568f5a530
Parents: eb1ba1e
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Nov 19 10:48:04 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Nov 19 10:48:11 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 27 +++++++++++---
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 38 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c970fd2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f00c451..84781cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -65,9 +65,8 @@ class Analyzer(
 
   lazy val batches: Seq[Batch] = Seq(
     Batch("Substitution", fixedPoint,
-      CTESubstitution ::
-      WindowsSubstitution ::
-      Nil : _*),
+      CTESubstitution,
+      WindowsSubstitution),
     Batch("Resolution", fixedPoint,
       ResolveRelations ::
       ResolveReferences ::
@@ -84,7 +83,8 @@ class Analyzer(
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,
-      PullOutNondeterministic),
+      PullOutNondeterministic,
+      ComputeCurrentTime),
     Batch("UDF", Once,
       HandleNullInputsForUDF),
     Batch("Cleanup", fixedPoint,
@@ -1076,7 +1076,7 @@ class Analyzer(
     override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
       case p if !p.resolved => p // Skip unresolved nodes.
 
-      case plan => plan transformExpressionsUp {
+      case p => p transformExpressionsUp {
 
         case udf @ ScalaUDF(func, _, inputs, _) =>
           val parameterTypes = ScalaReflection.getParameterTypes(func)
@@ -1162,3 +1162,20 @@ object CleanupAliases extends Rule[LogicalPlan] {
       }
   }
 }
+
+/**
+ * Computes the current date and time to make sure we return the same result 
in a single query.
+ */
+object ComputeCurrentTime extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    val dateExpr = CurrentDate()
+    val timeExpr = CurrentTimestamp()
+    val currentDate = Literal.create(dateExpr.eval(EmptyRow), 
dateExpr.dataType)
+    val currentTime = Literal.create(timeExpr.eval(EmptyRow), 
timeExpr.dataType)
+
+    plan transformAllExpressions {
+      case CurrentDate() => currentDate
+      case CurrentTimestamp() => currentTime
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0c970fd2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 08586a9..e051069 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
 class AnalysisSuite extends AnalysisTest {
@@ -218,4 +219,41 @@ class AnalysisSuite extends AnalysisTest {
       udf4)
     // checkUDF(udf4, expected4)
   }
+
+  test("analyzer should replace current_timestamp with literals") {
+    val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), 
Alias(CurrentTimestamp(), "b")()),
+      LocalRelation())
+
+    val min = System.currentTimeMillis() * 1000
+    val plan = in.analyze.asInstanceOf[Project]
+    val max = (System.currentTimeMillis() + 1) * 1000
+
+    val lits = new scala.collection.mutable.ArrayBuffer[Long]
+    plan.transformAllExpressions { case e: Literal =>
+      lits += e.value.asInstanceOf[Long]
+      e
+    }
+    assert(lits.size == 2)
+    assert(lits(0) >= min && lits(0) <= max)
+    assert(lits(1) >= min && lits(1) <= max)
+    assert(lits(0) == lits(1))
+  }
+
+  test("analyzer should replace current_date with literals") {
+    val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), 
"b")()), LocalRelation())
+
+    val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val plan = in.analyze.asInstanceOf[Project]
+    val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
+
+    val lits = new scala.collection.mutable.ArrayBuffer[Int]
+    plan.transformAllExpressions { case e: Literal =>
+      lits += e.value.asInstanceOf[Int]
+      e
+    }
+    assert(lits.size == 2)
+    assert(lits(0) >= min && lits(0) <= max)
+    assert(lits(1) >= min && lits(1) <= max)
+    assert(lits(0) == lits(1))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to