Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) that 
contained lots of obsolete data points and overwhelmed our infrastructure (i.e. 
failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the 
savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API 
uneasy with larger savepoints, found simple solutions ...

I would like to contribute my findings and 'fixes', however on my corporate 
infrastructure I cannot fork/build Flink locally nor PR the changes later on.

Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:


  *   (We are currently on Flink 1.13 (RocksDB state backend) but all findings 
apply as well to the latest version)
  *   WritableSavepoint.write(...) falls back to JobManagerCheckpointStorage 
which restricts savepoint size to 5MiB
     *   See relevant exception stack here [1]
     *   This is because SavepointTaskManagerRuntimeInfo.getConfiguration() 
always returns empty Configuration, hence
     *   Neither "state.checkpoint-storage" nor "state.checkpoints.dir" are/can 
be configured
     *   'fix': provide SavepointTaskManagerRuntimeInfo.getConfiguration() with 
a meaningful implementation and set configuration in 
SavepointEnvironment.getTaskManagerInfo()
  *   When loading a state, MultiStateKeyIterator load and bufferes the whole 
state in memory before it event processes a single data point
     *   This is absolutely no problem for small state (hence the unit tests 
work fine)
     *   MultiStateKeyIterator ctor sets up a java Stream that iterates all 
state descriptors and flattens all datapoints contained within
     *   The java.util.stream.Stream#flatMap function causes the buffering of 
the whole data set when enumerated later on
     *   See call stack [2]
        *   I our case this is 150e6 data points (> 1GiB just for the pointers 
to the data, let alone the data itself ~30GiB)
     *   I'm not aware of some instrumentation if Stream in order to avoid the 
problem, hence
     *   I coded an alternative implementation of MultiStateKeyIterator that 
avoids using java Stream,
     *   I can contribute our implementation (MultiStateKeyIteratorNoStreams)
  *   I found out that, at least when using LocalFileSystem on a windows 
system, read I/O to load a savepoint is unbuffered,
     *   See example stack [3]
     *   i.e. in order to load only a long in a serializer, it needs to go into 
kernel mode 8 times and load the 8 bytes one by one
     *   I coded a BufferedFSDataInputStreamWrapper that allows to opt-in 
buffered reads on any FileSystem implementation
     *   In our setting savepoint load is now 30 times faster
     *   I've once seen a Jira ticket as to improve savepoint load time in 
general (lost the link unfortunately), maybe this approach can help with it
     *   not sure if HDFS has got the same problem
     *   I can contribute my implementation

Looking forward to your comments


Matthias (Thias) Schwalbe


[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR 
BatchTask - Error in task code:  MapPartition 
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state 
is larger than the maximum permitted memory-backed state. Size=180075318 , 
maxSize=5242880 . Consider using a different state backend, like the File 
System State backend.
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:192)
            at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
            at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
            at 
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
            at 
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
            at 
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
            at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
            at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
            at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
            at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
            at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
            at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
            at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Size of the state is larger than the maximum 
permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider 
using a different state backend, like the File System State backend.
            at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
            at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
            at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
            at 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
            at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
            at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
            at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
            at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
            at 
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
            at java.util.concurrent.FutureTask.run(FutureTask.java)
            at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
            ... 14 more

[2] Streams call stack:
hasNext:77, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1 (java.util.stream)                            
           # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends 
Stream<? extends R>> var1)
accept:373, ReferencePipeline$11$1 (java.util.stream)                           
           # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)                            
           # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator 
(java.util.stream)
getAsBoolean:-1, 1528195520 
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
(java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
(org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

[3] unbuffered reads stack:
read:207, FileInputStream (java.io)
read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
read:42, ForwardingInputStream (org.apache.flink.runtime.util)
readInt:390, DataInputStream (java.io)
deserialize:80, BytePrimitiveArraySerializer 
(org.apache.flink.api.common.typeutils.base.array)
next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
(org.apache.flink.runtime.state.restore)
next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
(org.apache.flink.runtime.state.restore)
restoreKVStateData:147, RocksDBFullRestoreOperation 
(org.apache.flink.contrib.streaming.state.restore)



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to