Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-24 Thread Andrzej Zera
Hi,

I'm sorry but I got confused about the inner workings of late events
watermark. You're completely right. Thanks for clarifying.

Regards,
Andrzej

czw., 11 sty 2024 o 13:02 Jungtaek Lim 
napisał(a):

> Hi,
>
> The time window is closed and evicted as long as "eviction watermark"
> passes the end of the window. Late events watermark only deals with
> discarding late events from "inputs". We did not introduce additional delay
> on the work of multiple stateful operators. We just allowed more late
> events to be accepted.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera 
> wrote:
>
>> I'm struggling with the following issue in Spark >=3.4, related to
>> multiple stateful operations.
>>
>> When spark.sql.streaming.statefulOperator.allowMultiple is enabled,
>> Spark keeps track of two types of watermarks:
>> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents.
>> Introducing them allowed chaining multiple stateful operations but also
>> introduced an additional delay for getting the output out of the streaming
>> query.
>>
>> I'll show this on the example. Assume we have a stream of click events
>> and we aggregate it first by 1-min window and then by 5-min window. If we
>> have a trigger interval of 30s, then in most cases we'll get output 30s
>> later compared to single stateful operations queries. To find out how,
>> let's look at the following examples:
>>
>> Example 1. Single stateful operation (aggregation by 5-min window, assume
>> watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp
>> at the time of getting data from Kafka
>> Global watermark Output
>> 14:10:00 14:09:56 0 -
>> 14:10:30 14:10:26 14:09:56 -
>> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>>
>> Example 2. Mutliple stateful operations (aggregation by 1-min window
>> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp at the time of
>> getting data from Kafka Late events watermark Eviction watermark Output
>> 14:10:00 14:09:56 0 0 -
>> 14:10:30 14:10:26 0 14:09:56 -
>> 14:11:00 14:10:56 14:09:56 14:10:26 -
>> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>>
>> In Example 2, we need to wait until both watermarks cross the end of the
>> window to get the output for that window, which happens one iteration later
>> compared to Example 1.
>>
>> Now, in use cases that require near-real-time processing, this one
>> iteration delay can be quite a significant difference.
>>
>> Do we have any option to make streaming queries with multiple stateful
>> operations output data without waiting this extra iteration? One of my
>> ideas was to force an empty microbatch to run and propagate late events
>> watermark without any new data. While this conceptually works, I didn't
>> find a way to trigger an empty microbatch while being connected to Kafka
>> that constantly receives new data and while having a constant 30s trigger
>> interval.
>>
>> Thanks,
>> Andrzej
>>
>


Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-11 Thread Jungtaek Lim
Hi,

The time window is closed and evicted as long as "eviction watermark"
passes the end of the window. Late events watermark only deals with
discarding late events from "inputs". We did not introduce additional delay
on the work of multiple stateful operators. We just allowed more late
events to be accepted.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Ant Kutschera
Hi

*Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.*

Not sure if this helps or works, but in a Kafka streaming Api solution
without Spark that I did a few years ago, we used artificial events
published once a second to ensure that windows were closed because by
design Kafka streaming only closes windows when events are flowing.  So you
could artificially trigger an 'empty' microbatch because it would contain
only artificial events, which you can of course filter out in the
microbatch processing.




On Thu, 11 Jan 2024, 00:26 Andrzej Zera,  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>