Hello Caio,
Based on the pseudocode, there is no keyed function present. Hence, the
window will not be processed parallely . Please check again and respond
back.
val windowDataStream =
inputDataStream
.window(TumblingEventTimeWindows of 1 hour)
.trigger(custom trigger)
.aggregate(
preAggregator = custom AggregateFunction,
windowFunction = custom ProcessWindowFunction
)
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
-A
On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <[email protected]>
wrote:
> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
> and we’re seeing lag spikes on window closure. I’m curious if others have
> encountered similar issues before and if anyone has suggestions for how to
> tackle this problem (other than simply increasing parallelism).
> ContextLag definition
>
> We define end-to-end lag as the delta between the time when the event was
> persisted in Kafka and the time when Flink finishes processing the event.
> Window operator definition
>
> The important parts (in pseudocode):
>
> val windowDataStream =
>
> inputDataStream
>
> .window(TumblingEventTimeWindows of 1 hour)
>
> .trigger(custom trigger)
>
> .aggregate(
>
> preAggregator = custom AggregateFunction,
>
> windowFunction = custom ProcessWindowFunction
>
> )
>
> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we
> don’t run any user-defined logic at the end of the window. (This trigger
> only fires while the window is active via custom logic in onElement.)
> Numbers
>
> Our Flink app processes ~3K events per second and I’ve calculated that
> there are around 200-300K panes to close per Task at the end of the 1-hour
> window. Our lag is fairly stable at a few hundred milliseconds during the
> window but spikes to 5-10 seconds when the window expires, which is a
> problem for us.
> The issue
>
> The magnitude of the lag spikes on window closure correlate with
>
> -
>
> the size of the window (a 1-hour window has bigger spikes than a
> 5-minute window.)
> -
>
> the cardinality of the keys in the event stream.
> -
>
> the number of events being processed per second.
>
> In other words, the more panes to close, the bigger the lag spike. I'm
> fairly sure that the lag is coming entirely from the WindowOperator’s
> clearAllState and I’ve validated that CPU profiles show clearAllState
> using a significant amount of CPU.
>
> Does this theory sound plausible? What could we do to minimize the effects
> of window clean-up? It would be nice to do it incrementally or
> asynchronously but I'm not sure if Flink provides this functionality today.
> Thanks,
> Caio
>