Hi Ori

AFAIK, current the 2GB limit is still there. as a workaround, maybe you can
reduce the state size. If this can not be done using the window operator,
can the keyedprocessfunction[1] be ok for you?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction

Best,
Congxian


Ori Popowski <ori....@gmail.com> 于2020年7月8日周三 下午8:30写道:

> I've asked this question in
> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
> for two years so I'm not sure it will be visible.
>
> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
> java.lang.NegativeArraySizeException. It's happening because some of my
> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>
> How can I prevent this?
>
> As I understand it, I need somehow to limit the accumulated size of the
> window I'm using, which is EventTimeWindow. However, I have no way of
> doing so, because the WindowOperator manages its state on its own.
>
> Below is a full stack trace.
>
> org.apache.flink.util.SerializedThrowable: Could not materialize
> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
> Unnamed (23/189).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.NegativeArraySizeException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> ... 3 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: null
> at org.rocksdb.RocksIterator.value0(Native Method)
> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
> at
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> ... 5 common frames omitted
>

Reply via email to