sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1589380529


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -78,15 +78,32 @@ case class TransformWithStateExec(
   override def shortName: String = "transformWithStateExec"
 
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
+    if (timeMode == ProcessingTime) {
+      // TODO: check if we can return true only if actual timers are 
registered, or there is
+      // expired state
+      true
+    } else if (outputMode == OutputMode.Append || outputMode == 
OutputMode.Update) {
+      eventTimeWatermarkForEviction.isDefined &&
+      newInputWatermark > eventTimeWatermarkForEviction.get
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Controls watermark propagation to downstream modes. If timeMode is
+   * ProcessingTime, the output rows cannot be interpreted in eventTime, hence
+   * this node will not propagate watermark in this timeMode.
+   *
+   * For timeMode EventTime, output watermark is same as input Watermark 
because
+   * transformWithState node does not buffer any input rows between 
micro-batches.

Review Comment:
   I meant to say that TransformWithState operator itself passes all the 
inputRows to the StatefulProcessor function, and does not buffer any input 
data. 
   
   I can see that it can be confusing when we implement other functionality 
(like SessionWindow) on top of TWS, updated the doc as suggested. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to