Hello Caio, Based on the pseudocode, there is no keyed function present. Hence, the window will not be processed parallely . Please check again and respond back.
val windowDataStream = inputDataStream .window(TumblingEventTimeWindows of 1 hour) .trigger(custom trigger) .aggregate( preAggregator = custom AggregateFunction, windowFunction = custom ProcessWindowFunction ) https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows -A On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <user@flink.apache.org> wrote: > We run a large-scale Flink 1.16 cluster that uses windowed aggregations > and we’re seeing lag spikes on window closure. I’m curious if others have > encountered similar issues before and if anyone has suggestions for how to > tackle this problem (other than simply increasing parallelism). > ContextLag definition > > We define end-to-end lag as the delta between the time when the event was > persisted in Kafka and the time when Flink finishes processing the event. > Window operator definition > > The important parts (in pseudocode): > > val windowDataStream = > > inputDataStream > > .window(TumblingEventTimeWindows of 1 hour) > > .trigger(custom trigger) > > .aggregate( > > preAggregator = custom AggregateFunction, > > windowFunction = custom ProcessWindowFunction > > ) > > The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we > don’t run any user-defined logic at the end of the window. (This trigger > only fires while the window is active via custom logic in onElement.) > Numbers > > Our Flink app processes ~3K events per second and I’ve calculated that > there are around 200-300K panes to close per Task at the end of the 1-hour > window. Our lag is fairly stable at a few hundred milliseconds during the > window but spikes to 5-10 seconds when the window expires, which is a > problem for us. > The issue > > The magnitude of the lag spikes on window closure correlate with > > - > > the size of the window (a 1-hour window has bigger spikes than a > 5-minute window.) > - > > the cardinality of the keys in the event stream. > - > > the number of events being processed per second. > > In other words, the more panes to close, the bigger the lag spike. I'm > fairly sure that the lag is coming entirely from the WindowOperator’s > clearAllState and I’ve validated that CPU profiles show clearAllState > using a significant amount of CPU. > > Does this theory sound plausible? What could we do to minimize the effects > of window clean-up? It would be nice to do it incrementally or > asynchronously but I'm not sure if Flink provides this functionality today. > Thanks, > Caio >