Hi Ori,

Just a couple of comments (some code is missing for a concise explanation):

  *   SimpleAggregator is not used in the job setup below (assuming another job 
setup)
  *   SimpleAggregator is called for each event that goes into a specific 
session window, however
     *   The scala vectors will ever grow with the number of events that end up 
in a single window, hence
     *   Your BigO complexity will be O(n^2), n: number of events in window (or 
worse)
     *   For each event the accumulator is retrieved from window state and 
stored to window state (and serialized, if on RocksDB Backend)
  *   On the other hand when you use a process function
     *   Flink keeps a list state of events belonging to the session window, and
     *   Only when the window is triggered (on session gap timeout) all events 
are retrieved from window state and processed
     *   On RocksDbBackend the new events added to the window are appended to 
the existing window state key without touching the previously stored events, 
hence
     *   Serialization is only done once per incoming event, and
     *   BigO complexity is around O(n)

… much simplified

When I started with similar questions I spent quite some time in the debugger, 
breaking into the windowing functions and going up the call stack, in order to 
understand how Flink works … time well spent


I hope this helps …

I won’t be able to follow up for the next 1 ½ weeks, unless you try to meet me 
on FlinkForward conference …

Thias

From: Ori Popowski <ori....@gmail.com>
Sent: Mittwoch, 20. Oktober 2021 16:17
To: user <user@flink.apache.org>
Subject: Huge backpressure when using AggregateFunction with Session Window

I have a simple Flink application with a simple keyBy, a SessionWindow, and I 
use an AggregateFunction to incrementally aggregate a result, and write to a 
Sink.

Some of the requirements involve accumulating lists of fields from the events 
(for example, all URLs), so not all the values in the end should be primitives 
(although some are, like total number of events, and session duration).

This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic of my 
AggregateFunction's add() and merge() functions are what's ruining the job 
(i.e. causing the backpressure).

I've managed to create a reduced version of my job, where I just append and 
concatenate some of the event values and I can confirm that a backpressure 
starts just 40 minutes after launching the job:


    class SimpleAggregator extends AggregateFunction[Event, Accumulator, 
Session] with LazyLogging {

      override def createAccumulator(): Accumulator = (
        Vector.empty,
        Vector.empty,
        Vector.empty,
        Vector.empty,
        Vector.empty
      )

      override def add(value: Event, accumulator: Accumulator): Accumulator = {
        (
          accumulator._1 :+ value.getEnvUrl,
          accumulator._2 :+ value.getCtxVisitId,
          accumulator._3 :+ value.getVisionsSId,
          accumulator._4 :+ value.getTime.longValue(),
          accumulator._5 :+ value.getTime.longValue()
        )
      }

      override def merge(a: Accumulator, b: Accumulator): Accumulator = {
        (
          a._1 ++ b._1,
          a._2 ++ b._2,
          a._3 ++ b._3,
          a._4 ++ b._4,
          a._5 ++ b._5
        )
      }

      override def getResult(accumulator: Accumulator): Session = {
        Session.newBuilder()
          .setSessionDuration(1000)
          .setSessionTotalEvents(1000)
          .setSId("-" + UUID.randomUUID().toString)
          .build()
      }
    }

This is the job overall (simplified version):


    class App(
      source: SourceFunction[Event],
      sink: SinkFunction[Session]
    ) {

      def run(config: Config): Unit = {
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
        senv.setMaxParallelism(256)
        val dataStream = senv.addSource(source).uid("source")
        dataStream
          .assignAscendingTimestamps(_.getTime)
          .keyBy(event => (event.getWmUId, event.getWmEnv, 
event.getSId).toString())
          
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
          .allowedLateness(0.seconds.asFlinkTime)
          .process(new ProcessFunction).uid("process-session")
          .addSink(sink).uid("sink")

        senv.execute("session-aggregation")
      }
    }

After 3 weeks of grueling debugging, profiling, checking the serialization and 
more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just 
aggregates all the events behind the scenes and just gives them to me as an 
iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the ProcessWindowFunction 
actually aggregates more data, and also does concatenations and appends, for 
some reason there's no backpressure.

To finish this long post, what I'm trying to understand here is why when I 
collected the events using an AggregateFunction there was a backpressure, and 
when Flink does this for me with ProcessWindowFunction there's no backpressure? 
It seems to me something is fundamentally wrong here, since it means I cannot 
do any non-reducing operations without creating backpressure. I think it 
shouldn't cause the backpressure I experienced. I'm trying to understand what I 
did wrong here.

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to