Hi, Eventually flatMapWithState solved the problem. I started by looking into KeyedProcessFunction which lead me to flatMapWithState. It's working very well.
.keyBy(…) .flatMapWithState[Event, Int] { (event, countOpt) => val count = countOpt.getOrElse(0) if (count < config.limit) (List(event), Some(count + 1)) else (List.empty, Some(count)) } .keyBy(…) Using .aggregate(…, new MyProcessFunction) while using an aggregation to aggregate the events into a list, worked really bad and caused serious performance issues. Thanks! On Sun, Jul 12, 2020 at 10:32 AM Ori Popowski <ori....@gmail.com> wrote: > > AFAIK, current the 2GB limit is still there. as a workaround, maybe you > can reduce the state size. If this can not be done using the window > operator, can the keyedprocessfunction[1] be ok for you? > > I'll see if I can introduce it to the code. > > > if you do, the ProcessWindowFunction is getting as argument an Iterable > with ALL elements collected along the session. This will make the state per > key potentially huge (like you're experiencing). > > Thanks for noticing that. It's indeed true that we do this. The reason is > the nature of the computation, which cannot be done incrementally > unfortunately. It's not a classic avg(), max(), last() etc. computation > which can be reduced in each step. > I'm thinking of a way to cap the volume of the state per key using an > aggregate function that limits the number of elements and returns a list of > the collected events. > > class CappingAggregator(limit: Int) extends AggregateFunction[Event, > Vector[Event], Vector[Event]] { > override def createAccumulator(): Vector[Event] = Vector.empty > > override def add(value: Event, acc: Vector[Event]): Vector[Event] = > if (acc.size < limit) acc :+ value > else acc > > override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc: > _*) > > override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] = > (a ++ b).slice(0, limit) > } > > My only problem is with merge(). I'm not sure if b is always later > elements than a's or if I must sort and only then slice. > > On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Ori, >> >> In your code, are you using the process() API? >> >> .process(new MyProcessWindowFunction()); >> >> if you do, the ProcessWindowFunction is getting as argument an Iterable >> with ALL elements collected along the session. This will make the state per >> key potentially huge (like you're experiencing). >> >> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate() >> API and store in state only an aggregate that is getting incrementally >> updated on every incoming event (this could be ONE Class / Map / Tuple / >> etc) rather than keeping ALL elements. >> >> See example here: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction >> >> Thanks, >> Rafi >> >> >> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qcx978132...@gmail.com> >> wrote: >> >>> Hi Ori >>> >>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you >>> can reduce the state size. If this can not be done using the window >>> operator, can the keyedprocessfunction[1] be ok for you? >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction >>> >>> Best, >>> Congxian >>> >>> >>> Ori Popowski <ori....@gmail.com> 于2020年7月8日周三 下午8:30写道: >>> >>>> I've asked this question in >>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been >>>> inactive for two years so I'm not sure it will be visible. >>>> >>>> While creating a savepoint I get a >>>> org.apache.flink.util.SerializedThrowable: >>>> java.lang.NegativeArraySizeException. It's happening because some of >>>> my windows have a keyed state of more than 2GiB, hitting RocksDB memory >>>> limit. >>>> >>>> How can I prevent this? >>>> >>>> As I understand it, I need somehow to limit the accumulated size of the >>>> window I'm using, which is EventTimeWindow. However, I have no way of >>>> doing so, because the WindowOperator manages its state on its own. >>>> >>>> Below is a full stack trace. >>>> >>>> org.apache.flink.util.SerializedThrowable: Could not materialize >>>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000), >>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: >>>> Unnamed (23/189). >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.util.SerializedThrowable: >>>> java.lang.NegativeArraySizeException >>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) >>>> at >>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) >>>> ... 3 common frames omitted >>>> Caused by: org.apache.flink.util.SerializedThrowable: null >>>> at org.rocksdb.RocksIterator.value0(Native Method) >>>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102) >>>> at >>>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) >>>> at >>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) >>>> ... 5 common frames omitted >>>> >>>