I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268
but it's been inactive for two years so I'm not sure it will be visible.

While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException. It's happening because some of my
windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.

How can I prevent this?

As I understand it, I need somehow to limit the accumulated size of the
window I'm using, which is EventTimeWindow. However, I have no way of doing
so, because the WindowOperator manages its state on its own.

Below is a full stack trace.

org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
139 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: Unnamed (23/189).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: null
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
at
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 common frames omitted

Reply via email to