RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-26 Thread Schwalbe Matthias
Hi Ori, … answering from remote …


  *   If not completely mistaken, Scala Vector is immutable, creating a copy 
whenever you append, but
  *   This is not the main problem, the vectors collected so far get 
deserialized with every incoming event (from state storage) and afterward 
serialized into stat storage
  *   This won’t matter so much if you only collect 2 or 3 events into a 
session window, but with maybe 1000 such events it does (you didn’t share your 
numbers  )
  *   For the ProcessFunction implementation you could use a Vector Builder and 
the assign the result.
  *   Regarding the "without touching the previously stored event" question, 
more detailed (I was in a rush)
 *   Windowing with ProcessFunction collects every event assigned to a 
session window into a list state … iterating/aggregating over the collected 
event only once when the window is triggered (i.e. the session is finished)
 *   While collecting the events into the list state it add()-s the new 
event to the list state
 *   For rocksdb this involves only serializing the single added event and 
appending the binary representation to the list state of the respective (key, 
session window key (namespace in Flink speak)), i.e.
 *   The previously stored events for the session window are not touched 
when a new event is added
  *   Next question: the overhead can easily be the cause of such backpressure, 
depending on the numbers:
 *   Serialized size of your accumulator, proportional to the number of 
aggregated events
 *   Size and entropy, frquency of your key space -> cache hits vs. cache 
fails in RocksDb
  *   Of course there could be additional sources of backpressure

I hope this helps, … I’ll be back next week

Thias

From: Ori Popowski 
Sent: Donnerstag, 21. Oktober 2021 15:32
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Huge backpressure when using AggregateFunction with Session Window


Thanks for taking the time to answer this.

  *   You're correct that the SimpleAggregator is not used in the job setup. I 
didn't copy the correct piece of code.
  *   I understand the overhead involved. But I do not agree with the O(n^2) 
complexity. Are you implying that Vector append is O(n) by itself?
  *   I understand your points regarding ProcessFunction except for the 
"without touching the previously stored event". Also with AggregateFunction + 
concatenation I don't touch the elements other than the new element. I forgot 
to mention by the way, that the issue reproduces also with Lists which should 
be much faster for appends and concats.
Could overhead by itself account for the backpressure?
From this job the only conclusion is that Flink just cannot do aggregating 
operations which collect values, only simple operations which produce a scalar 
values (like sum/avg). It seems weird to me Flink would be so limited in such 
way.



On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Ori,

Just a couple of comments (some code is missing for a concise explanation):

  *   SimpleAggregator is not used in the job setup below (assuming another job 
setup)
  *   SimpleAggregator is called for each event that goes into a specific 
session window, however

 *   The scala vectors will ever grow with the number of events that end up 
in a single window, hence
 *   Your BigO complexity will be O(n^2), n: number of events in window (or 
worse)
 *   For each event the accumulator is retrieved from window state and 
stored to window state (and serialized, if on RocksDB Backend)

  *   On the other hand when you use a process function

 *   Flink keeps a list state of events belonging to the session window, and
 *   Only when the window is triggered (on session gap timeout) all events 
are retrieved from window state and processed
 *   On RocksDbBackend the new events added to the window are appended to 
the existing window state key without touching the previously stored events, 
hence
 *   Serialization is only done once per incoming event, and
 *   BigO complexity is around O(n)

… much simplified

When I started with similar questions I spent quite some time in the debugger, 
breaking into the windowing functions and going up the call stack, in order to 
understand how Flink works … time well spent


I hope this helps …

I won’t be able to follow up for the next 1 ½ weeks, unless you try to meet me 
on FlinkForward conference …

Thias

From: Ori Popowski mailto:ori@gmail.com>>
Sent: Mittwoch, 20. Oktober 2021 16:17
To: user mailto:user@flink.apache.org>>
Subject: Huge backpressure when using AggregateFunction with Session Window

I have a simple Flink application with a simple keyBy, a SessionWindow, and I 
use an AggregateFunction to incrementally aggregate a result, and write to a 
Sink.

Some of the requirements involve accumulating lists of fields from the events 
(for example, all URLs)

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-21 Thread Ori Popowski
Thanks for taking the time to answer this.

   - You're correct that the SimpleAggregator is not used in the job setup.
   I didn't copy the correct piece of code.
   - I understand the overhead involved. But I do not agree with the O(n^2)
   complexity. Are you implying that Vector append is O(n) by itself?
   - I understand your points regarding ProcessFunction except for the "without
   touching the previously stored event". Also with AggregateFunction +
   concatenation I don't touch the elements other than the new element. I
   forgot to mention by the way, that the issue reproduces also with Lists
   which should be much faster for appends and concats.

Could overhead by itself account for the backpressure?
>From this job the only conclusion is that Flink just cannot do aggregating
operations which collect values, only simple operations which produce a
scalar values (like sum/avg). It seems weird to me Flink would be so
limited in such way.



On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ori,
>
>
>
> Just a couple of comments (some code is missing for a concise explanation):
>
>- SimpleAggregator is not used in the job setup below (assuming
>another job setup)
>- SimpleAggregator is called for each event that goes into a specific
>session window, however
>   - The scala vectors will ever grow with the number of events that
>   end up in a single window, hence
>   - Your BigO complexity will be O(n^2), n: number of events in
>   window (or worse)
>   - For each event the accumulator is retrieved from window state and
>   stored to window state (and serialized, if on RocksDB Backend)
>- On the other hand when you use a process function
>   - Flink keeps a list state of events belonging to the session
>   window, and
>   - Only when the window is triggered (on session gap timeout) all
>   events are retrieved from window state and processed
>   - On RocksDbBackend the new events added to the window are appended
>   to the existing window state key without touching the previously stored
>   events, hence
>   - Serialization is only done once per incoming event, and
>   - BigO complexity is around O(n)
>
>
>
> … much simplified
>
>
>
> When I started with similar questions I spent quite some time in the
> debugger, breaking into the windowing functions and going up the call
> stack, in order to understand how Flink works … time well spent
>
>
>
>
>
> I hope this helps …
>
>
>
> I won’t be able to follow up for the next 1 ½ weeks, unless you try to
> meet me on FlinkForward conference …
>
>
>
> Thias
>
>
>
> *From:* Ori Popowski 
> *Sent:* Mittwoch, 20. Oktober 2021 16:17
> *To:* user 
> *Subject:* Huge backpressure when using AggregateFunction with Session
> Window
>
>
>
> I have a simple Flink application with a simple keyBy, a SessionWindow,
> and I use an AggregateFunction to incrementally aggregate a result, and
> write to a Sink.
>
>
>
> Some of the requirements involve accumulating lists of fields from the
> events (for example, all URLs), so not all the values in the end should be
> primitives (although some are, like total number of events, and session
> duration).
>
>
>
> This job is experiencing a huge backpressure 40 minutes after launching.
>
>
>
> I've found out that the append and concatenate operations in the logic of
> my AggregateFunction's add() and merge() functions are what's ruining the
> job (i.e. causing the backpressure).
>
>
>
> I've managed to create a reduced version of my job, where I just append
> and concatenate some of the event values and I can confirm that a
> backpressure starts just 40 minutes after launching the job:
>
>
>
> *class *SimpleAggregator *extends *AggregateFunction[Event, Accumulator, 
> Session] *with *LazyLogging {
>
>   *override def *createAccumulator(): Accumulator = (
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.
> *empty  *)
>
>   *override def *add(value: Event, accumulator: Accumulator): Accumulator 
> = {
> (
>   accumulator._1 :+ value.getEnvUrl,
>   accumulator._2 :+ value.getCtxVisitId,
>   accumulator._3 :+ value.getVisionsSId,
>   accumulator._4 :+ value.getTime.longValue(),
>   accumulator._5 :+ value.getTime.longValue()
> )
>   }
>
>   *override def *merge(a: Accumulator, b: Accumulator): Accumulator = {
> (
>   a._1 ++ b._1,
>   a._2 ++ b._2,
>   a._3 ++ b._3,
>   a._4 ++ b._4,
>   a._5 ++ b._5
> )
>   }
>
>   *override def *getResult(accumulator: Accumulator): Session = {
> Session.*newBuilder*()
>   .setSessionDuration(1000)
>   .setSessionTotalEvents(1000)
>   .setSId(*"-" *+ UUID.*randomUUID*().toString)
>   .build()
>   }
> }
>
>
>
> 

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-21 Thread Ori Popowski
I didn't try to reproduce it locally since this job reads 14K events per
second.
I am using Flink version 1.12.1 and RocksDB state backend. It also happens
with Flink 1.10.

I tried to profile with JVisualVM and I didn't see any bottleneck. All the
user functions almost didn't take any CPU time.

On Wed, Oct 20, 2021 at 6:50 PM Timo Walther  wrote:

> Hi Ori,
>
> this sounds indeed strange. Can you also reproduce this behavior locally
> with a faker source? We should definitely add a profiler and see where
> the bottleneck lies.
>
> Which Flink version and state backend are you using?
>
> Regards,
> Timo
>
> On 20.10.21 16:17, Ori Popowski wrote:
> > I have a simple Flink application with a simple keyBy, a SessionWindow,
> > and I use an AggregateFunction to incrementally aggregate a result, and
> > write to a Sink.
> >
> > Some of the requirements involve accumulating lists of fields from the
> > events (for example, all URLs), so not all the values in the end should
> > be primitives (although some are, like total number of events, and
> > session duration).
> >
> > This job is experiencing a huge backpressure 40 minutes after launching.
> >
> > I've found out that the append and concatenate operations in the logic
> > of my AggregateFunction's add() and merge() functions are what's ruining
> > the job (i.e. causing the backpressure).
> >
> > I've managed to create a reduced version of my job, where I just append
> > and concatenate some of the event values and I can confirm that a
> > backpressure starts just 40 minutes after launching the job:
> >
> > class SimpleAggregator extends AggregateFunction[Event, Accumulator,
> > Session] with LazyLogging {
> >
> > override def createAccumulator(): Accumulator = (
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty
> > )
> >
> > override def add(value: Event, accumulator: Accumulator): Accumulator = {
> > (
> > accumulator._1 :+ value.getEnvUrl,
> > accumulator._2 :+ value.getCtxVisitId,
> > accumulator._3 :+ value.getVisionsSId,
> > accumulator._4 :+ value.getTime.longValue(),
> > accumulator._5 :+ value.getTime.longValue()
> > )
> > }
> >
> > override def merge(a: Accumulator, b: Accumulator): Accumulator = {
> > (
> > a._1 ++ b._1,
> > a._2 ++ b._2,
> > a._3 ++ b._3,
> > a._4 ++ b._4,
> > a._5 ++ b._5
> > )
> > }
> >
> > override def getResult(accumulator: Accumulator): Session = {
> > Session.newBuilder()
> > .setSessionDuration(1000)
> > .setSessionTotalEvents(1000)
> > .setSId("-" + UUID.randomUUID().toString)
> > .build()
> > }
> > }
> >
> >
> > This is the job overall (simplified version):
> >
> > class App(
> > source: SourceFunction[Event],
> > sink: SinkFunction[Session]
> > ) {
> >
> > def run(config: Config): Unit = {
> > val senv = StreamExecutionEnvironment.getExecutionEnvironment
> > senv.setMaxParallelism(256)
> > val dataStream = senv.addSource(source).uid("source")
> > dataStream
> > .assignAscendingTimestamps(_.getTime)
> > .keyBy(event => (event.getWmUId, event.getWmEnv,
> event.getSId).toString())
> > .window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
> > .allowedLateness(0.seconds.asFlinkTime)
> > .process(new ProcessFunction).uid("process-session")
> > .addSink(sink).uid("sink")
> >
> > senv.execute("session-aggregation")
> > }
> > }
> >
> >
> > After 3 weeks of grueling debugging, profiling, checking the
> > serialization and more I couldn't solve the backpressure issue.
> > However, I got an idea and used Flink's ProcessWindowFunction which just
> > aggregates all the events behind the scenes and just gives them to me as
> > an iterator, where I can then do all my calculations.
> > Surprisingly, there's no backpressure. So even though the
> > ProcessWindowFunction actually aggregates more data, and also does
> > concatenations and appends, for some reason there's no backpressure.
> >
> > To finish this long post, what I'm trying to understand here is why when
> > I collected the events using an AggregateFunction there was a
> > backpressure, and when Flink does this for me with ProcessWindowFunction
> > there's no backpressure? It seems to me something is fundamentally wrong
> > here, since it means I cannot do any non-reducing operations without
> > creating backpressure. I think it shouldn't cause the backpressure I
> > experienced. I'm trying to understand what I did wrong here.
> >
> > Thanks!
>
>


RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Schwalbe Matthias
Hi Ori,

Just a couple of comments (some code is missing for a concise explanation):

  *   SimpleAggregator is not used in the job setup below (assuming another job 
setup)
  *   SimpleAggregator is called for each event that goes into a specific 
session window, however
 *   The scala vectors will ever grow with the number of events that end up 
in a single window, hence
 *   Your BigO complexity will be O(n^2), n: number of events in window (or 
worse)
 *   For each event the accumulator is retrieved from window state and 
stored to window state (and serialized, if on RocksDB Backend)
  *   On the other hand when you use a process function
 *   Flink keeps a list state of events belonging to the session window, and
 *   Only when the window is triggered (on session gap timeout) all events 
are retrieved from window state and processed
 *   On RocksDbBackend the new events added to the window are appended to 
the existing window state key without touching the previously stored events, 
hence
 *   Serialization is only done once per incoming event, and
 *   BigO complexity is around O(n)

… much simplified

When I started with similar questions I spent quite some time in the debugger, 
breaking into the windowing functions and going up the call stack, in order to 
understand how Flink works … time well spent


I hope this helps …

I won’t be able to follow up for the next 1 ½ weeks, unless you try to meet me 
on FlinkForward conference …

Thias

From: Ori Popowski 
Sent: Mittwoch, 20. Oktober 2021 16:17
To: user 
Subject: Huge backpressure when using AggregateFunction with Session Window

I have a simple Flink application with a simple keyBy, a SessionWindow, and I 
use an AggregateFunction to incrementally aggregate a result, and write to a 
Sink.

Some of the requirements involve accumulating lists of fields from the events 
(for example, all URLs), so not all the values in the end should be primitives 
(although some are, like total number of events, and session duration).

This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic of my 
AggregateFunction's add() and merge() functions are what's ruining the job 
(i.e. causing the backpressure).

I've managed to create a reduced version of my job, where I just append and 
concatenate some of the event values and I can confirm that a backpressure 
starts just 40 minutes after launching the job:


class SimpleAggregator extends AggregateFunction[Event, Accumulator, 
Session] with LazyLogging {

  override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
  )

  override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
  accumulator._1 :+ value.getEnvUrl,
  accumulator._2 :+ value.getCtxVisitId,
  accumulator._3 :+ value.getVisionsSId,
  accumulator._4 :+ value.getTime.longValue(),
  accumulator._5 :+ value.getTime.longValue()
)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
  a._1 ++ b._1,
  a._2 ++ b._2,
  a._3 ++ b._3,
  a._4 ++ b._4,
  a._5 ++ b._5
)
  }

  override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
  .setSessionDuration(1000)
  .setSessionTotalEvents(1000)
  .setSId("-" + UUID.randomUUID().toString)
  .build()
  }
}

This is the job overall (simplified version):


class App(
  source: SourceFunction[Event],
  sink: SinkFunction[Session]
) {

  def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
  .assignAscendingTimestamps(_.getTime)
  .keyBy(event => (event.getWmUId, event.getWmEnv, 
event.getSId).toString())
  
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
  .allowedLateness(0.seconds.asFlinkTime)
  .process(new ProcessFunction).uid("process-session")
  .addSink(sink).uid("sink")

senv.execute("session-aggregation")
  }
}

After 3 weeks of grueling debugging, profiling, checking the serialization and 
more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just 
aggregates all the events behind the scenes and just gives them to me as an 
iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the ProcessWindowFunction 
actually aggregates more data, and also does concatenations and appends, for 
some reason there's no backpressure.

To finish this long post, what I'm trying to 

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Timo Walther

Hi Ori,

this sounds indeed strange. Can you also reproduce this behavior locally 
with a faker source? We should definitely add a profiler and see where 
the bottleneck lies.


Which Flink version and state backend are you using?

Regards,
Timo

On 20.10.21 16:17, Ori Popowski wrote:
I have a simple Flink application with a simple keyBy, a SessionWindow, 
and I use an AggregateFunction to incrementally aggregate a result, and 
write to a Sink.


Some of the requirements involve accumulating lists of fields from the 
events (for example, all URLs), so not all the values in the end should 
be primitives (although some are, like total number of events, and 
session duration).


This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic 
of my AggregateFunction's add() and merge() functions are what's ruining 
the job (i.e. causing the backpressure).


I've managed to create a reduced version of my job, where I just append 
and concatenate some of the event values and I can confirm that a 
backpressure starts just 40 minutes after launching the job:


class SimpleAggregator extends AggregateFunction[Event, Accumulator, 
Session] with LazyLogging {


override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
)

override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
accumulator._1 :+ value.getEnvUrl,
accumulator._2 :+ value.getCtxVisitId,
accumulator._3 :+ value.getVisionsSId,
accumulator._4 :+ value.getTime.longValue(),
accumulator._5 :+ value.getTime.longValue()
)
}

override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
a._1 ++ b._1,
a._2 ++ b._2,
a._3 ++ b._3,
a._4 ++ b._4,
a._5 ++ b._5
)
}

override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
.setSessionDuration(1000)
.setSessionTotalEvents(1000)
.setSId("-" + UUID.randomUUID().toString)
.build()
}
}


This is the job overall (simplified version):

class App(
source: SourceFunction[Event],
sink: SinkFunction[Session]
) {

def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
.assignAscendingTimestamps(_.getTime)
.keyBy(event => (event.getWmUId, event.getWmEnv, event.getSId).toString())
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
.allowedLateness(0.seconds.asFlinkTime)
.process(new ProcessFunction).uid("process-session")
.addSink(sink).uid("sink")

senv.execute("session-aggregation")
}
}


After 3 weeks of grueling debugging, profiling, checking the 
serialization and more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just 
aggregates all the events behind the scenes and just gives them to me as 
an iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the 
ProcessWindowFunction actually aggregates more data, and also does 
concatenations and appends, for some reason there's no backpressure.


To finish this long post, what I'm trying to understand here is why when 
I collected the events using an AggregateFunction there was a 
backpressure, and when Flink does this for me with ProcessWindowFunction 
there's no backpressure? It seems to me something is fundamentally wrong 
here, since it means I cannot do any non-reducing operations without 
creating backpressure. I think it shouldn't cause the backpressure I 
experienced. I'm trying to understand what I did wrong here.


Thanks!