This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c1ed3e60e67f [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan c1ed3e60e67f is described below commit c1ed3e60e67f53bb323e2b9fa47789fcde70a75a Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Fri Jan 19 11:38:53 2024 +0900 [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan ### What changes were proposed in this pull request? This PR proposes to fix the bug on canonicalizing the plan which contains the physical node of dropDuplicatesWithinWatermark (`StreamingDeduplicateWithinWatermarkExec`). ### Why are the changes needed? Canonicalization of the plan will replace the expressions (including attributes) to remove out cosmetic, including name, "and metadata", which denotes the event time column marker. StreamingDeduplicateWithinWatermarkExec assumes that the input attributes of child node contain the event time column, and it is determined at the initialization of the node instance. Once canonicalization is being triggered, child node will lose the notion of event time column from its attributes, and copy of StreamingDeduplicateWithinWatermarkExec will be performed which instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new child node, which no longer has an [...] ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44688 from HeartSaVioR/SPARK-46676. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/statefulOperators.scala | 10 +++++++--- ...StreamingDeduplicationWithinWatermarkSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 8cb99a162ab2..c8a55ed679d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -1097,10 +1097,14 @@ case class StreamingDeduplicateWithinWatermarkExec( protected val extraOptionOnStateStore: Map[String, String] = Map.empty - private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, + // Below three variables are defined as lazy, as evaluating these variables does not work with + // canonicalized plan. Specifically, attributes in child won't have an event time column in + // the canonicalized plan. These variables are NOT referenced in canonicalized plan, hence + // defining these variables as lazy would avoid such error. + private lazy val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, allowMultipleEventTimeColumns = false).get - private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) - private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + private lazy val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private lazy val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = { val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala index 595fc1cb9cea..9a02ab3df7dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala @@ -199,4 +199,25 @@ class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { ) } } + + test("SPARK-46676: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") { + withTempDir { checkpoint => + val dedupeInputData = MemoryStream[(String, Int)] + val dedupe = dedupeInputData.toDS() + .withColumn("eventTime", timestamp_seconds($"_2")) + .withWatermark("eventTime", "10 second") + .dropDuplicatesWithinWatermark("_1") + .select($"_1", $"eventTime".cast("long").as[Long]) + + testStream(dedupe, Append)( + StartStream(checkpointLocation = checkpoint.getCanonicalPath), + AddData(dedupeInputData, "a" -> 1), + CheckNewAnswer("a" -> 1), + Execute { q => + // This threw out error before SPARK-46676. + q.lastExecution.executedPlan.canonicalized + } + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org