Hi, I'm sorry but I got confused about the inner workings of late events watermark. You're completely right. Thanks for clarifying.
Regards, Andrzej czw., 11 sty 2024 o 13:02 Jungtaek Lim <kabhwan.opensou...@gmail.com> napisał(a): > Hi, > > The time window is closed and evicted as long as "eviction watermark" > passes the end of the window. Late events watermark only deals with > discarding late events from "inputs". We did not introduce additional delay > on the work of multiple stateful operators. We just allowed more late > events to be accepted. > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera <andrzejz...@gmail.com> > wrote: > >> I'm struggling with the following issue in Spark >=3.4, related to >> multiple stateful operations. >> >> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, >> Spark keeps track of two types of watermarks: >> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents. >> Introducing them allowed chaining multiple stateful operations but also >> introduced an additional delay for getting the output out of the streaming >> query. >> >> I'll show this on the example. Assume we have a stream of click events >> and we aggregate it first by 1-min window and then by 5-min window. If we >> have a trigger interval of 30s, then in most cases we'll get output 30s >> later compared to single stateful operations queries. To find out how, >> let's look at the following examples: >> >> Example 1. Single stateful operation (aggregation by 5-min window, assume >> watermark is 0 seconds) >> >> Wall clock >> (microbatch processing starts) Max event timestamp >> at the time of getting data from Kafka >> Global watermark Output >> 14:10:00 14:09:56 0 - >> 14:10:30 14:10:26 14:09:56 - >> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10) >> >> Example 2. Mutliple stateful operations (aggregation by 1-min window >> followed by aggregation by 5-min window, assume watermark is 0 seconds) >> >> Wall clock >> (microbatch processing starts) Max event timestamp at the time of >> getting data from Kafka Late events watermark Eviction watermark Output >> 14:10:00 14:09:56 0 0 - >> 14:10:30 14:10:26 0 14:09:56 - >> 14:11:00 14:10:56 14:09:56 14:10:26 - >> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10) >> >> In Example 2, we need to wait until both watermarks cross the end of the >> window to get the output for that window, which happens one iteration later >> compared to Example 1. >> >> Now, in use cases that require near-real-time processing, this one >> iteration delay can be quite a significant difference. >> >> Do we have any option to make streaming queries with multiple stateful >> operations output data without waiting this extra iteration? One of my >> ideas was to force an empty microbatch to run and propagate late events >> watermark without any new data. While this conceptually works, I didn't >> find a way to trigger an empty microbatch while being connected to Kafka >> that constantly receives new data and while having a constant 30s trigger >> interval. >> >> Thanks, >> Andrzej >> >