This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1ed3e60e67f [SPARK-46676][SS] dropDuplicatesWithinWatermark should 
not fail on canonicalization of the plan
c1ed3e60e67f is described below

commit c1ed3e60e67f53bb323e2b9fa47789fcde70a75a
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Fri Jan 19 11:38:53 2024 +0900

    [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on 
canonicalization of the plan
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix the bug on canonicalizing the plan which contains 
the physical node of dropDuplicatesWithinWatermark 
(`StreamingDeduplicateWithinWatermarkExec`).
    
    ### Why are the changes needed?
    
    Canonicalization of the plan will replace the expressions (including 
attributes) to remove out cosmetic, including name, "and metadata", which 
denotes the event time column marker.
    
    StreamingDeduplicateWithinWatermarkExec assumes that the input attributes 
of child node contain the event time column, and it is determined at the 
initialization of the node instance. Once canonicalization is being triggered, 
child node will lose the notion of event time column from its attributes, and 
copy of StreamingDeduplicateWithinWatermarkExec will be performed which 
instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new 
child node, which no longer has an [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44688 from HeartSaVioR/SPARK-46676.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/statefulOperators.scala | 10 +++++++---
 ...StreamingDeduplicationWithinWatermarkSuite.scala | 21 +++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 8cb99a162ab2..c8a55ed679d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -1097,10 +1097,14 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+  // Below three variables are defined as lazy, as evaluating these variables 
does not work with
+  // canonicalized plan. Specifically, attributes in child won't have an event 
time column in
+  // the canonicalized plan. These variables are NOT referenced in 
canonicalized plan, hence
+  // defining these variables as lazy would avoid such error.
+  private lazy val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
     allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+  private lazy val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private lazy val eventTimeColOrdinal: Int = 
child.output.indexOf(eventTimeCol)
 
   protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = {
     val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
index 595fc1cb9cea..9a02ab3df7dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
@@ -199,4 +199,25 @@ class StreamingDeduplicationWithinWatermarkSuite extends 
StateStoreMetricsTest {
       )
     }
   }
+
+  test("SPARK-46676: canonicalization of 
StreamingDeduplicateWithinWatermarkExec should work") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS()
+        .withColumn("eventTime", timestamp_seconds($"_2"))
+        .withWatermark("eventTime", "10 second")
+        .dropDuplicatesWithinWatermark("_1")
+        .select($"_1", $"eventTime".cast("long").as[Long])
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+        AddData(dedupeInputData, "a" -> 1),
+        CheckNewAnswer("a" -> 1),
+        Execute { q =>
+          // This threw out error before SPARK-46676.
+          q.lastExecution.executedPlan.canonicalized
+        }
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to