arunmahadevan commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-501348243 Output watermark can be computed as some function of input watermark and the timestamp of events at that operator (e.g. min(input watermarks, timestamps of oldest event at that node)) so we could compute the other by storing only the input watermark. For now, we require the user to provide a timestamp column + lag using “withWatermark()” before each aggregate operation. E.g. ``` input. .withWatermark("inputtime",...) .groupBy(window(...)) .select($"window1.end".as("windowtime") .withWatermark("windowtime").. .groupBy(...)... ``` Here the window.end of the first groupBy is the output watermark which becomes the input watermark of the second groupBy. Also note that the input water mark of an operator is propagated to the next operator only in the next batch so that it processes the events first and then the watermark. Let me know the specific cases where you found issues.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org