This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a703dace0aa [SPARK-46064][SQL][SS] Move out 
EliminateEventTimeWatermark to the analyzer and change to only take effect on 
resolved child
a703dace0aa is described below

commit a703dace0aa400fa24b2bded1500f44ae7ac8db0
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Nov 23 20:11:43 2023 +0900

    [SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the analyzer 
and change to only take effect on resolved child
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to move out EliminateEventTimeWatermark to the analyzer 
(one of the analysis rule), and also make a change to eliminate 
EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".
    
    ### Why are the changes needed?
    
    Currently, we apply EliminateEventTimeWatermark immediately when 
withWatermark is called, which means the rule is applied immediately against 
the child, regardless whether child is resolved or not.
    
    It is not an issue for the usage of DataFrame API initiated by read / 
readStream, because streaming sources have the flag isStreaming set to true 
even it is yet resolved. But mix-up of SQL and DataFrame API would expose the 
issue; we may not know the exact value of isStreaming flag on unresolved node 
and it is subject to change upon resolution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43971 from HeartSaVioR/SPARK-46064.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  6 ++++--
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 23 ++++++++++++++++++++++
 .../spark/sql/catalyst/analysis/AnalysisTest.scala |  2 ++
 .../sql/catalyst/analysis/TestRelations.scala      | 14 +++++++++++++
 .../catalyst/optimizer/FilterPushdownSuite.scala   |  8 ++++----
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  3 +--
 6 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6f201ba3a84..5fef7900d79 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -347,7 +347,9 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
     Batch("Cleanup", fixedPoint,
       CleanupAliases),
     Batch("HandleSpecialCommand", Once,
-      HandleSpecialCommand)
+      HandleSpecialCommand),
+    Batch("Remove watermark for batch query", Once,
+      EliminateEventTimeWatermark)
   )
 
   /**
@@ -3938,7 +3940,7 @@ object CleanupAliases extends Rule[LogicalPlan] with 
AliasHelper {
 object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
     _.containsPattern(EVENT_TIME_WATERMARK)) {
-    case EventTimeWatermark(_, _, child) if !child.isStreaming => child
+    case EventTimeWatermark(_, _, child) if child.resolved && 
!child.isStreaming => child
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 441b5fb6ca6..d035dcad81e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1724,4 +1724,27 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       checkAnalysis(ident2.select($"a"), testRelation.select($"a").analyze)
     }
   }
+
+  test("SPARK-46064 Basic functionality of elimination for watermark node in 
batch query") {
+    val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
+      IntervalUtils.fromIntervalString("10 seconds"), batchRelationWithTs)
+
+    val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new 
QueryPlanningTracker)
+
+    // EventTimeWatermark node is eliminated via EliminateEventTimeWatermark.
+    assert(!analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
+  }
+
+  test("SPARK-46064 EliminateEventTimeWatermark properly handles the case 
where the child of " +
+    "EventTimeWatermark changes the isStreaming flag during resolution") {
+    // UnresolvedRelation which is batch initially and will be resolved as 
streaming
+    val dfWithTempView = UnresolvedRelation(TableIdentifier("streamingTable"))
+    val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
+      IntervalUtils.fromIntervalString("10 seconds"), dfWithTempView)
+
+    val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new 
QueryPlanningTracker)
+
+    // EventTimeWatermark node is NOT eliminated.
+    assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index ba4e7b279f5..2ee434704d6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -82,6 +82,8 @@ trait AnalysisTest extends PlanTest {
     createTempView(catalog, "TaBlE3", TestRelations.testRelation3, 
overrideIfExists = true)
     createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, 
overrideIfExists = true)
     createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, 
overrideIfExists = true)
+    createTempView(catalog, "streamingTable", TestRelations.streamingRelation,
+      overrideIfExists = true)
     new Analyzer(catalog) {
       override val extendedResolutionRules = extendedAnalysisRules
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
index d54237fcc14..01b1a627e28 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
@@ -68,4 +68,18 @@ object TestRelations {
 
   val mapRelation = LocalRelation(
     AttributeReference("map", MapType(IntegerType, IntegerType))())
+
+  val streamingRelation = LocalRelation(
+    Seq(
+      AttributeReference("a", IntegerType)(),
+      AttributeReference("ts", TimestampType)()
+    ),
+    isStreaming = true)
+
+  val batchRelationWithTs = LocalRelation(
+    Seq(
+      AttributeReference("a", IntegerType)(),
+      AttributeReference("ts", TimestampType)()
+    ),
+    isStreaming = false)
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index ee56d1fa9ac..2ebb43d4fba 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -1190,7 +1190,7 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: no pushdown on watermark attribute #1") {
     val interval = new CalendarInterval(2, 2, 2000L)
-    val relation = LocalRelation(attrA, $"b".timestamp, attrC)
+    val relation = LocalRelation(Seq(attrA, $"b".timestamp, attrC), Nil, 
isStreaming = true)
 
     // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
@@ -1205,7 +1205,7 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: no pushdown for nondeterministic filter") {
     val interval = new CalendarInterval(2, 2, 2000L)
-    val relation = LocalRelation(attrA, attrB, $"c".timestamp)
+    val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, 
isStreaming = true)
 
     // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
@@ -1221,7 +1221,7 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: full pushdown") {
     val interval = new CalendarInterval(2, 2, 2000L)
-    val relation = LocalRelation(attrA, attrB, $"c".timestamp)
+    val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, 
isStreaming = true)
 
     // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
@@ -1236,7 +1236,7 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: no pushdown on watermark attribute #2") {
     val interval = new CalendarInterval(2, 2, 2000L)
-    val relation = LocalRelation($"a".timestamp, attrB, attrC)
+    val relation = LocalRelation(Seq($"a".timestamp, attrB, attrC), Nil, 
isStreaming = true)
 
     val originalQuery = EventTimeWatermark($"a", interval, relation)
       .where($"a" === new java.sql.Timestamp(0) && $"b" === 10)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 04a45633960..d36aaef5586 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -778,8 +778,7 @@ class Dataset[T] private[sql](
       val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold)
       require(!IntervalUtils.isNegative(parsedDelay),
         s"delay threshold ($delayThreshold) should not be negative.")
-      EliminateEventTimeWatermark(
-        EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan))
+      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to