sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1589380529
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -78,15 +78,32 @@ case class TransformWithStateExec( override def shortName: String = "transformWithStateExec" override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = { + if (timeMode == ProcessingTime) { + // TODO: check if we can return true only if actual timers are registered, or there is + // expired state + true + } else if (outputMode == OutputMode.Append || outputMode == OutputMode.Update) { + eventTimeWatermarkForEviction.isDefined && + newInputWatermark > eventTimeWatermarkForEviction.get + } else { + false + } + } + + /** + * Controls watermark propagation to downstream modes. If timeMode is + * ProcessingTime, the output rows cannot be interpreted in eventTime, hence + * this node will not propagate watermark in this timeMode. + * + * For timeMode EventTime, output watermark is same as input Watermark because + * transformWithState node does not buffer any input rows between micro-batches. Review Comment: I meant to say that TransformWithState operator itself passes all the inputRows to the StatefulProcessor function, and does not buffer any input data. I can see that it can be confusing when we implement other functionality (like SessionWindow) on top of TWS, updated the doc as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org