Hi,

Eventually flatMapWithState solved the problem. I started by looking into
KeyedProcessFunction which lead me to flatMapWithState. It's working very
well.

.keyBy(…)
.flatMapWithState[Event, Int] { (event, countOpt) =>
  val count = countOpt.getOrElse(0)
  if (count < config.limit) (List(event), Some(count + 1))
  else (List.empty, Some(count))
}
.keyBy(…)

Using .aggregate(…, new MyProcessFunction) while using an aggregation to
aggregate the events into a list, worked really bad and caused serious
performance issues.

Thanks!

On Sun, Jul 12, 2020 at 10:32 AM Ori Popowski <ori....@gmail.com> wrote:

> > AFAIK, current the 2GB limit is still there. as a workaround, maybe you
> can reduce the state size. If this can not be done using the window
> operator, can the keyedprocessfunction[1] be ok for you?
>
> I'll see if I can introduce it to the code.
>
> > if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> Thanks for noticing that. It's indeed true that we do this. The reason is
> the nature of the computation, which cannot be done incrementally
> unfortunately. It's not a classic avg(), max(), last() etc. computation
> which can be reduced in each step.
> I'm thinking of a way to cap the volume of the state per key using an
> aggregate function that limits the number of elements and returns a list of
> the collected events.
>
> class CappingAggregator(limit: Int) extends AggregateFunction[Event,
> Vector[Event], Vector[Event]] {
>   override def createAccumulator(): Vector[Event] = Vector.empty
>
>   override def add(value: Event, acc: Vector[Event]): Vector[Event] =
>     if (acc.size < limit) acc :+ value
>     else acc
>
>   override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
> _*)
>
>   override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
> (a ++ b).slice(0, limit)
> }
>
> My only problem is with merge(). I'm not sure if b is always later
> elements than a's or if I must sort and only then slice.
>
> On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Ori,
>>
>> In your code, are you using the process() API?
>>
>> .process(new MyProcessWindowFunction());
>>
>> if you do, the ProcessWindowFunction is getting as argument an Iterable
>> with ALL elements collected along the session. This will make the state per
>> key potentially huge (like you're experiencing).
>>
>> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
>> API and store in state only an aggregate that is getting incrementally
>> updated on every incoming event (this could be ONE Class / Map / Tuple /
>> etc) rather than keeping ALL elements.
>>
>> See example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>>
>> Thanks,
>> Rafi
>>
>>
>> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qcx978132...@gmail.com>
>> wrote:
>>
>>> Hi Ori
>>>
>>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>>> can reduce the state size. If this can not be done using the window
>>> operator, can the keyedprocessfunction[1] be ok for you?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Ori Popowski <ori....@gmail.com> 于2020年7月8日周三 下午8:30写道:
>>>
>>>> I've asked this question in
>>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been
>>>> inactive for two years so I'm not sure it will be visible.
>>>>
>>>> While creating a savepoint I get a 
>>>> org.apache.flink.util.SerializedThrowable:
>>>> java.lang.NegativeArraySizeException. It's happening because some of
>>>> my windows have a keyed state of more than 2GiB, hitting RocksDB memory
>>>> limit.
>>>>
>>>> How can I prevent this?
>>>>
>>>> As I understand it, I need somehow to limit the accumulated size of the
>>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>>> doing so, because the WindowOperator manages its state on its own.
>>>>
>>>> Below is a full stack trace.
>>>>
>>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>>>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
>>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>>>> Unnamed (23/189).
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>>> java.lang.NegativeArraySizeException
>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>>>> at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>>> ... 3 common frames omitted
>>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>>> at org.rocksdb.RocksIterator.value0(Native Method)
>>>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>>>> at
>>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>>>> ... 5 common frames omitted
>>>>
>>>

Reply via email to