[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860481#comment-16860481 ]
Congxian Qiu(klion26) edited comment on FLINK-12785 at 6/11/19 2:34 AM: ------------------------------------------------------------------------ I think your analysis is correct, we should better add another flush strategy based on the in-flight byte size in RocksDBWriteBatchWrapper. Do you want to fix it, or If you don't mind I'll give a patch for this [~mikekap] cc [~srichter] was (Author: klion26): I think your analysis is correct, we should better add another flush strategy based on the in-flight byte size in RocksDBWriteBatchWrapper. If you don't mind I'll give a patch for this [~mikekap] > RocksDB savepoint recovery can use a lot of unmanaged memory > ------------------------------------------------------------ > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Reporter: Mike Kaplinskiy > Priority: Major > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)