Hello Caio,

Based on the pseudocode, there is no keyed function present. Hence, the
window will not be processed parallely . Please check again and respond
back.

val windowDataStream =

  inputDataStream

    .window(TumblingEventTimeWindows of 1 hour)

    .trigger(custom trigger)

    .aggregate(

       preAggregator = custom AggregateFunction,

       windowFunction = custom ProcessWindowFunction

)

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows

-A


On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <user@flink.apache.org>
wrote:

> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
> and we’re seeing lag spikes on window closure. I’m curious if others have
> encountered similar issues before and if anyone has suggestions for how to
> tackle this problem (other than simply increasing parallelism).
> ContextLag definition
>
> We define end-to-end lag as the delta between the time when the event was
> persisted in Kafka and the time when Flink finishes processing the event.
> Window operator definition
>
> The important parts (in pseudocode):
>
> val windowDataStream =
>
>   inputDataStream
>
>     .window(TumblingEventTimeWindows of 1 hour)
>
>     .trigger(custom trigger)
>
>     .aggregate(
>
>        preAggregator = custom AggregateFunction,
>
>        windowFunction = custom ProcessWindowFunction
>
> )
>
> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we
> don’t run any user-defined logic at the end of the window. (This trigger
> only fires while the window is active via custom logic in onElement.)
> Numbers
>
> Our Flink app processes ~3K events per second and I’ve calculated that
> there are around 200-300K panes to close per Task at the end of the 1-hour
> window. Our lag is fairly stable at a few hundred milliseconds during the
> window but spikes to 5-10 seconds when the window expires, which is a
> problem for us.
> The issue
>
> The magnitude of the lag spikes on window closure correlate with
>
>    -
>
>    the size of the window (a 1-hour window has bigger spikes than a
>    5-minute window.)
>    -
>
>    the cardinality of the keys in the event stream.
>    -
>
>    the number of events being processed per second.
>
> In other words, the more panes to close, the bigger the lag spike. I'm
> fairly sure that the lag is coming entirely from the WindowOperator’s
> clearAllState and I’ve validated that CPU profiles show clearAllState
> using a significant amount of CPU.
>
> Does this theory sound plausible? What could we do to minimize the effects
> of window clean-up? It would be nice to do it incrementally or
> asynchronously but I'm not sure if Flink provides this functionality today.
> Thanks,
> Caio
>

Reply via email to