This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new fa6bf22112b4 [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan fa6bf22112b4 is described below commit fa6bf22112b4300dae1e7617f1480c0d12124b90 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Fri Jan 19 11:38:53 2024 +0900 [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan ### What changes were proposed in this pull request? This PR proposes to fix the bug on canonicalizing the plan which contains the physical node of dropDuplicatesWithinWatermark (`StreamingDeduplicateWithinWatermarkExec`). ### Why are the changes needed? Canonicalization of the plan will replace the expressions (including attributes) to remove out cosmetic, including name, "and metadata", which denotes the event time column marker. StreamingDeduplicateWithinWatermarkExec assumes that the input attributes of child node contain the event time column, and it is determined at the initialization of the node instance. Once canonicalization is being triggered, child node will lose the notion of event time column from its attributes, and copy of StreamingDeduplicateWithinWatermarkExec will be performed which instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new child node, which no longer has an [...] ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44688 from HeartSaVioR/SPARK-46676. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit c1ed3e60e67f53bb323e2b9fa47789fcde70a75a) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/statefulOperators.scala | 10 +++++++--- ...StreamingDeduplicationWithinWatermarkSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index b31f6151fce2..b597c9723f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -1037,10 +1037,14 @@ case class StreamingDeduplicateWithinWatermarkExec( protected val extraOptionOnStateStore: Map[String, String] = Map.empty - private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, + // Below three variables are defined as lazy, as evaluating these variables does not work with + // canonicalized plan. Specifically, attributes in child won't have an event time column in + // the canonicalized plan. These variables are NOT referenced in canonicalized plan, hence + // defining these variables as lazy would avoid such error. + private lazy val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, allowMultipleEventTimeColumns = false).get - private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) - private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + private lazy val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private lazy val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = { val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala index 595fc1cb9cea..9a02ab3df7dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala @@ -199,4 +199,25 @@ class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { ) } } + + test("SPARK-46676: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") { + withTempDir { checkpoint => + val dedupeInputData = MemoryStream[(String, Int)] + val dedupe = dedupeInputData.toDS() + .withColumn("eventTime", timestamp_seconds($"_2")) + .withWatermark("eventTime", "10 second") + .dropDuplicatesWithinWatermark("_1") + .select($"_1", $"eventTime".cast("long").as[Long]) + + testStream(dedupe, Append)( + StartStream(checkpointLocation = checkpoint.getCanonicalPath), + AddData(dedupeInputData, "a" -> 1), + CheckNewAnswer("a" -> 1), + Execute { q => + // This threw out error before SPARK-46676. + q.lastExecution.executedPlan.canonicalized + } + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org