sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1589359837


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala:
##########
@@ -49,26 +80,31 @@ case class EventTimeWatermark(
   // logic here because we also maintain the compatibility flag. (See
   // SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE for details.)
   // TODO: Disallow updating the metadata once we remove the compatibility 
flag.
-  override val output: Seq[Attribute] = child.output.map { a =>
-    if (a semanticEquals eventTime) {
-      val delayMs = EventTimeWatermark.getDelayMs(delay)
-      val updatedMetadata = new MetadataBuilder()
-        .withMetadata(a.metadata)
-        .putLong(EventTimeWatermark.delayKey, delayMs)
-        .build()
-      a.withMetadata(updatedMetadata)
-    } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-      // Remove existing watermark
-      val updatedMetadata = new MetadataBuilder()
-        .withMetadata(a.metadata)
-        .remove(EventTimeWatermark.delayKey)
-        .build()
-      a.withMetadata(updatedMetadata)
-    } else {
-      a
-    }
+  override val output: Seq[Attribute] = {
+    val delayMs = EventTimeWatermark.getDelayMs(delay)
+    updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: LogicalPlan): 
EventTimeWatermark =
     copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ *
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ */
+case class UpdateEventTimeWatermarkColumn(
+    eventTime: Attribute,
+    delay: CalendarInterval,
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = {
+    val delayMs = EventTimeWatermark.getDelayMs(delay)
+    updateEventTimeColumn(child.output, delayMs, eventTime)
+}

Review Comment:
   Fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to