This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 344453761cb [SPARK-45660] Re-use Literal objects in ComputeCurrentTime 
rule
344453761cb is described below

commit 344453761cbca154a04a53d4c5d6c2b1eef59652
Author: Ole Sasse <ole.sa...@databricks.com>
AuthorDate: Wed Oct 25 19:56:15 2023 +0300

    [SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule
    
    ### What changes were proposed in this pull request?
    
    The ComputeCurrentTime optimizer rule does produce unique timestamp 
Literals for current time expressions of a query. For CurrentDate and 
LocalTimestamp the Literal objects are not re-used though, but semantically 
equal objects are created for each instance. This can cost unnecessary much 
memory in case there are many such Literal objects.
    
    This PR adds a map that caches timestamp literals in case they are used 
more than once.
    
    ### Why are the changes needed?
    
    A query that has a lot of equal literals could use unnecessary high amounts 
of memory
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a new Unit Test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43524 from olaky/unique-timestamp-replacement-literals.
    
    Authored-by: Ole Sasse <ole.sa...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../sql/catalyst/optimizer/finishAnalysis.scala    | 15 ++++++++---
 .../optimizer/ComputeCurrentTimeSuite.scala        | 30 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 4052ccd6496..18c85999312 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.time.{Instant, LocalDateTime}
+import java.time.{Instant, LocalDateTime, ZoneId}
 
 import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.expressions._
@@ -79,6 +79,8 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
     val currentTimestampMicros = instantToMicros(instant)
     val currentTime = Literal.create(currentTimestampMicros, TimestampType)
     val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
+    val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal]
+    val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal]
 
     def transformCondition(treePatternbits: TreePatternBits): Boolean = {
       treePatternbits.containsPattern(CURRENT_LIKE)
@@ -88,12 +90,17 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
       case subQuery =>
         subQuery.transformAllExpressionsWithPruning(transformCondition) {
           case cd: CurrentDate =>
-            Literal.create(DateTimeUtils.microsToDays(currentTimestampMicros, 
cd.zoneId), DateType)
+            currentDates.getOrElseUpdate(cd.zoneId, {
+              Literal.create(
+                DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), 
DateType)
+            })
           case CurrentTimestamp() | Now() => currentTime
           case CurrentTimeZone() => timezone
           case localTimestamp: LocalTimestamp =>
-            val asDateTime = LocalDateTime.ofInstant(instant, 
localTimestamp.zoneId)
-            Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType)
+            localTimestamps.getOrElseUpdate(localTimestamp.zoneId, {
+              val asDateTime = LocalDateTime.ofInstant(instant, 
localTimestamp.zoneId)
+              Literal.create(localDateTimeToMicros(asDateTime), 
TimestampNTZType)
+            })
         }
     }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
index 8b76cc383c5..447d77855fb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -23,7 +23,7 @@ import scala.concurrent.duration._
 import scala.jdk.CollectionConverters.MapHasAsScala
 
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, 
LocalTimestamp, Now}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, CurrentTimeZone, Expression, InSubquery, ListQuery, Literal, 
LocalTimestamp, Now}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -135,6 +135,34 @@ class ComputeCurrentTimeSuite extends PlanTest {
     assert(offsetsFromQuarterHour.size == 1)
   }
 
+  test("No duplicate literals") {
+    def checkLiterals(f: (String) => Expression, expected: Int): Unit = {
+      val timestamps = ZoneId.SHORT_IDS.asScala.flatMap { case (zoneId, _) =>
+        // Request each timestamp multiple times.
+        (1 to 5).map { _ => Alias(f(zoneId), zoneId)() }
+      }.toSeq
+
+      val input = Project(timestamps, LocalRelation())
+      val plan = Optimize.execute(input).asInstanceOf[Project]
+
+      val uniqueLiteralObjectIds = new scala.collection.mutable.HashSet[Int]
+      plan.transformWithSubqueries { case subQuery =>
+        subQuery.transformAllExpressions { case literal: Literal =>
+          uniqueLiteralObjectIds += System.identityHashCode(literal)
+          literal
+        }
+      }
+
+      assert(expected === uniqueLiteralObjectIds.size)
+    }
+
+    val numTimezones = ZoneId.SHORT_IDS.size
+    checkLiterals({ _: String => CurrentTimestamp() }, 1)
+    checkLiterals({ zoneId: String => LocalTimestamp(Some(zoneId)) }, 
numTimezones)
+    checkLiterals({ _: String => Now() }, 1)
+    checkLiterals({ zoneId: String => CurrentDate(Some(zoneId)) }, 
numTimezones)
+  }
+
   private def literals[T](plan: LogicalPlan): 
scala.collection.mutable.ArrayBuffer[T] = {
     val literals = new scala.collection.mutable.ArrayBuffer[T]
     plan.transformWithSubqueries { case subQuery =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to