Re: End-to-end lag spikes when closing a large number of panes
Hey Caio, Your analysis of the problem sounds right to me, I don't have a good solution for you :( I’ve validated that CPU profiles show clearAllState using a significant > amount of CPU. Did you use something like async-profiler here? Do you have more info on the breakdown into what used the CPU time? Once we know that, there might be an opportunity to do such operations async/lazily, or fix something with the underlying platform (e.g. Rocksdb is slow, ...) On Thu, Mar 21, 2024 at 12:05 AM Caio Camatta via user < user@flink.apache.org> wrote: > Hey Asimansu, > > The inputDataStream is a KeyedStream, I forgot to mention that. > > Caio > > On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera > wrote: > >> Hello Caio, >> >> Based on the pseudocode, there is no keyed function present. Hence, the >> window will not be processed parallely . Please check again and respond >> back. >> >> val windowDataStream = >> >> inputDataStream >> >> .window(TumblingEventTimeWindows of 1 hour) >> >> .trigger(custom trigger) >> >> .aggregate( >> >>preAggregator = custom AggregateFunction, >> >>windowFunction = custom ProcessWindowFunction >> >> ) >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows >> >> -A >> >> >> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user < >> user@flink.apache.org> wrote: >> >>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations >>> and we’re seeing lag spikes on window closure. I’m curious if others have >>> encountered similar issues before and if anyone has suggestions for how to >>> tackle this problem (other than simply increasing parallelism). >>> ContextLag definition >>> >>> We define end-to-end lag as the delta between the time when the event >>> was persisted in Kafka and the time when Flink finishes processing the >>> event. >>> Window operator definition >>> >>> The important parts (in pseudocode): >>> >>> val windowDataStream = >>> >>> inputDataStream >>> >>> .window(TumblingEventTimeWindows of 1 hour) >>> >>> .trigger(custom trigger) >>> >>> .aggregate( >>> >>>preAggregator = custom AggregateFunction, >>> >>>windowFunction = custom ProcessWindowFunction >>> >>> ) >>> >>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. >>> we don’t run any user-defined logic at the end of the window. (This trigger >>> only fires while the window is active via custom logic in onElement.) >>> Numbers >>> >>> Our Flink app processes ~3K events per second and I’ve calculated that >>> there are around 200-300K panes to close per Task at the end of the 1-hour >>> window. Our lag is fairly stable at a few hundred milliseconds during >>> the window but spikes to 5-10 seconds when the window expires, which is a >>> problem for us. >>> The issue >>> >>> The magnitude of the lag spikes on window closure correlate with >>> >>>- >>> >>>the size of the window (a 1-hour window has bigger spikes than a >>>5-minute window.) >>>- >>> >>>the cardinality of the keys in the event stream. >>>- >>> >>>the number of events being processed per second. >>> >>> In other words, the more panes to close, the bigger the lag spike. I'm >>> fairly sure that the lag is coming entirely from the WindowOperator’s >>> clearAllState and I’ve validated that CPU profiles show clearAllState >>> using a significant amount of CPU. >>> >>> Does this theory sound plausible? What could we do to minimize the >>> effects of window clean-up? It would be nice to do it incrementally or >>> asynchronously but I'm not sure if Flink provides this functionality today. >>> Thanks, >>> Caio >>> >>
Re: End-to-end lag spikes when closing a large number of panes
Hey Asimansu, The inputDataStream is a KeyedStream, I forgot to mention that. Caio On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera wrote: > Hello Caio, > > Based on the pseudocode, there is no keyed function present. Hence, the > window will not be processed parallely . Please check again and respond > back. > > val windowDataStream = > > inputDataStream > > .window(TumblingEventTimeWindows of 1 hour) > > .trigger(custom trigger) > > .aggregate( > >preAggregator = custom AggregateFunction, > >windowFunction = custom ProcessWindowFunction > > ) > > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows > > -A > > > On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user < > user@flink.apache.org> wrote: > >> We run a large-scale Flink 1.16 cluster that uses windowed aggregations >> and we’re seeing lag spikes on window closure. I’m curious if others have >> encountered similar issues before and if anyone has suggestions for how to >> tackle this problem (other than simply increasing parallelism). >> ContextLag definition >> >> We define end-to-end lag as the delta between the time when the event >> was persisted in Kafka and the time when Flink finishes processing the >> event. >> Window operator definition >> >> The important parts (in pseudocode): >> >> val windowDataStream = >> >> inputDataStream >> >> .window(TumblingEventTimeWindows of 1 hour) >> >> .trigger(custom trigger) >> >> .aggregate( >> >>preAggregator = custom AggregateFunction, >> >>windowFunction = custom ProcessWindowFunction >> >> ) >> >> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. >> we don’t run any user-defined logic at the end of the window. (This trigger >> only fires while the window is active via custom logic in onElement.) >> Numbers >> >> Our Flink app processes ~3K events per second and I’ve calculated that >> there are around 200-300K panes to close per Task at the end of the 1-hour >> window. Our lag is fairly stable at a few hundred milliseconds during >> the window but spikes to 5-10 seconds when the window expires, which is a >> problem for us. >> The issue >> >> The magnitude of the lag spikes on window closure correlate with >> >>- >> >>the size of the window (a 1-hour window has bigger spikes than a >>5-minute window.) >>- >> >>the cardinality of the keys in the event stream. >>- >> >>the number of events being processed per second. >> >> In other words, the more panes to close, the bigger the lag spike. I'm >> fairly sure that the lag is coming entirely from the WindowOperator’s >> clearAllState and I’ve validated that CPU profiles show clearAllState >> using a significant amount of CPU. >> >> Does this theory sound plausible? What could we do to minimize the >> effects of window clean-up? It would be nice to do it incrementally or >> asynchronously but I'm not sure if Flink provides this functionality today. >> Thanks, >> Caio >> >
Re: End-to-end lag spikes when closing a large number of panes
Hello Caio, Based on the pseudocode, there is no keyed function present. Hence, the window will not be processed parallely . Please check again and respond back. val windowDataStream = inputDataStream .window(TumblingEventTimeWindows of 1 hour) .trigger(custom trigger) .aggregate( preAggregator = custom AggregateFunction, windowFunction = custom ProcessWindowFunction ) https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows -A On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user wrote: > We run a large-scale Flink 1.16 cluster that uses windowed aggregations > and we’re seeing lag spikes on window closure. I’m curious if others have > encountered similar issues before and if anyone has suggestions for how to > tackle this problem (other than simply increasing parallelism). > ContextLag definition > > We define end-to-end lag as the delta between the time when the event was > persisted in Kafka and the time when Flink finishes processing the event. > Window operator definition > > The important parts (in pseudocode): > > val windowDataStream = > > inputDataStream > > .window(TumblingEventTimeWindows of 1 hour) > > .trigger(custom trigger) > > .aggregate( > >preAggregator = custom AggregateFunction, > >windowFunction = custom ProcessWindowFunction > > ) > > The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we > don’t run any user-defined logic at the end of the window. (This trigger > only fires while the window is active via custom logic in onElement.) > Numbers > > Our Flink app processes ~3K events per second and I’ve calculated that > there are around 200-300K panes to close per Task at the end of the 1-hour > window. Our lag is fairly stable at a few hundred milliseconds during the > window but spikes to 5-10 seconds when the window expires, which is a > problem for us. > The issue > > The magnitude of the lag spikes on window closure correlate with > >- > >the size of the window (a 1-hour window has bigger spikes than a >5-minute window.) >- > >the cardinality of the keys in the event stream. >- > >the number of events being processed per second. > > In other words, the more panes to close, the bigger the lag spike. I'm > fairly sure that the lag is coming entirely from the WindowOperator’s > clearAllState and I’ve validated that CPU profiles show clearAllState > using a significant amount of CPU. > > Does this theory sound plausible? What could we do to minimize the effects > of window clean-up? It would be nice to do it incrementally or > asynchronously but I'm not sure if Flink provides this functionality today. > Thanks, > Caio >
End-to-end lag spikes when closing a large number of panes
We run a large-scale Flink 1.16 cluster that uses windowed aggregations and we’re seeing lag spikes on window closure. I’m curious if others have encountered similar issues before and if anyone has suggestions for how to tackle this problem (other than simply increasing parallelism). ContextLag definition We define end-to-end lag as the delta between the time when the event was persisted in Kafka and the time when Flink finishes processing the event. Window operator definition The important parts (in pseudocode): val windowDataStream = inputDataStream .window(TumblingEventTimeWindows of 1 hour) .trigger(custom trigger) .aggregate( preAggregator = custom AggregateFunction, windowFunction = custom ProcessWindowFunction ) The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we don’t run any user-defined logic at the end of the window. (This trigger only fires while the window is active via custom logic in onElement.) Numbers Our Flink app processes ~3K events per second and I’ve calculated that there are around 200-300K panes to close per Task at the end of the 1-hour window. Our lag is fairly stable at a few hundred milliseconds during the window but spikes to 5-10 seconds when the window expires, which is a problem for us. The issue The magnitude of the lag spikes on window closure correlate with - the size of the window (a 1-hour window has bigger spikes than a 5-minute window.) - the cardinality of the keys in the event stream. - the number of events being processed per second. In other words, the more panes to close, the bigger the lag spike. I'm fairly sure that the lag is coming entirely from the WindowOperator’s clearAllState and I’ve validated that CPU profiles show clearAllState using a significant amount of CPU. Does this theory sound plausible? What could we do to minimize the effects of window clean-up? It would be nice to do it incrementally or asynchronously but I'm not sure if Flink provides this functionality today. Thanks, Caio