I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive for two years so I'm not sure it will be visible.
While creating a savepoint I get a org.apache.flink.util.SerializedThrowable: java.lang.NegativeArraySizeException. It's happening because some of my windows have a keyed state of more than 2GiB, hitting RocksDB memory limit. How can I prevent this? As I understand it, I need somehow to limit the accumulated size of the window I'm using, which is EventTimeWindow. However, I have no way of doing so, because the WindowOperator manages its state on its own. Below is a full stack trace. org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 139 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: Unnamed (23/189). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: java.lang.NegativeArraySizeException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: null at org.rocksdb.RocksIterator.value0(Native Method) at org.rocksdb.RocksIterator.value(RocksIterator.java:50) at org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102) at org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) ... 5 common frames omitted