Hey Asimansu,

The inputDataStream is a KeyedStream, I forgot to mention that.

Caio

On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera <asimansu.b...@gmail.com>
wrote:

> Hello Caio,
>
> Based on the pseudocode, there is no keyed function present. Hence, the
> window will not be processed parallely . Please check again and respond
> back.
>
> val windowDataStream =
>
>   inputDataStream
>
>     .window(TumblingEventTimeWindows of 1 hour)
>
>     .trigger(custom trigger)
>
>     .aggregate(
>
>        preAggregator = custom AggregateFunction,
>
>        windowFunction = custom ProcessWindowFunction
>
> )
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>
> -A
>
>
> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <
> user@flink.apache.org> wrote:
>
>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
>> and we’re seeing lag spikes on window closure. I’m curious if others have
>> encountered similar issues before and if anyone has suggestions for how to
>> tackle this problem (other than simply increasing parallelism).
>> ContextLag definition
>>
>> We define end-to-end lag as the delta between the time when the event
>> was persisted in Kafka and the time when Flink finishes processing the
>> event.
>> Window operator definition
>>
>> The important parts (in pseudocode):
>>
>> val windowDataStream =
>>
>>   inputDataStream
>>
>>     .window(TumblingEventTimeWindows of 1 hour)
>>
>>     .trigger(custom trigger)
>>
>>     .aggregate(
>>
>>        preAggregator = custom AggregateFunction,
>>
>>        windowFunction = custom ProcessWindowFunction
>>
>> )
>>
>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e.
>> we don’t run any user-defined logic at the end of the window. (This trigger
>> only fires while the window is active via custom logic in onElement.)
>> Numbers
>>
>> Our Flink app processes ~3K events per second and I’ve calculated that
>> there are around 200-300K panes to close per Task at the end of the 1-hour
>> window. Our lag is fairly stable at a few hundred milliseconds during
>> the window but spikes to 5-10 seconds when the window expires, which is a
>> problem for us.
>> The issue
>>
>> The magnitude of the lag spikes on window closure correlate with
>>
>>    -
>>
>>    the size of the window (a 1-hour window has bigger spikes than a
>>    5-minute window.)
>>    -
>>
>>    the cardinality of the keys in the event stream.
>>    -
>>
>>    the number of events being processed per second.
>>
>> In other words, the more panes to close, the bigger the lag spike. I'm
>> fairly sure that the lag is coming entirely from the WindowOperator’s
>> clearAllState and I’ve validated that CPU profiles show clearAllState
>> using a significant amount of CPU.
>>
>> Does this theory sound plausible? What could we do to minimize the
>> effects of window clean-up? It would be nice to do it incrementally or
>> asynchronously but I'm not sure if Flink provides this functionality today.
>> Thanks,
>> Caio
>>
>

Reply via email to