Hi,

The time window is closed and evicted as long as "eviction watermark"
passes the end of the window. Late events watermark only deals with
discarding late events from "inputs". We did not introduce additional delay
on the work of multiple stateful operators. We just allowed more late
events to be accepted.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera <andrzejz...@gmail.com> wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>

Reply via email to