sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1589375804


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala:
##########
@@ -107,25 +108,67 @@ case class EventTimeWatermarkExec(
   }
 
   // Update the metadata on the eventTime column to include the desired delay.
-  override val output: Seq[Attribute] = child.output.map { a =>
-    if (a semanticEquals eventTime) {
-      val updatedMetadata = new MetadataBuilder()
-        .withMetadata(a.metadata)
-        .putLong(EventTimeWatermark.delayKey, delayMs)
-        .build()
-      a.withMetadata(updatedMetadata)
-    } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-      // Remove existing watermark
-      val updatedMetadata = new MetadataBuilder()
-        .withMetadata(a.metadata)
-        .remove(EventTimeWatermark.delayKey)
-        .build()
-      a.withMetadata(updatedMetadata)
-    } else {
-      a
-    }
+  override val output: Seq[Attribute] = {
+    val delayMs = EventTimeWatermark.getDelayMs(delay)
+    updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec =
     copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ *
+ * This node also ensures that output emitted by the child node adheres
+ * to watermark. If the child node emits rows which are older than global
+ * watermark, the node will throw an query execution error and fail the user
+ * query.
+ */
+case class UpdateEventTimeColumnExec(
+    eventTime: Attribute,
+    delay: CalendarInterval,
+    eventTimeWatermarkForEviction: Option[Long],
+    child: SparkPlan) extends UnaryExecNode {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions[InternalRow](

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to