Hi All,

I am working on trying to process a Savepoint in order to produce basic 
statistics on it for monitoring. I’m running into an issue where processing a 
large Savepoint is running out of memory before I can process the Savepoint 
completely.

One thing I noticed in profiling the code is that there seems to be a lot of 
memory given to the  RocksDB ColumnFamilyOptions class because it is producing 
a lot of Java.lang.ref.Finalizer objects that don’t seem to be garbage 
collected.

I see in the RocksDB code that these should be closed but it doesn’t seem like 
they are being closed. 
https://github.com/facebook/rocksdb/blob/f57745814f2d9e937383b4bfea55373306182c14/java/src/main/java/org/rocksdb/AbstractNativeReference.java#L71

Is there a way to close these via the Flink API? Also, more generally, why am I 
seeing hundreds of thousands of these generated?

In case it’s helpful, here’s a genericized/simplified version of the code:


import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;

import org.apache.flink.state.api.functions.KeyedStateReaderFunction;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.common.functions.RichReduceFunction;

Configuration config = new Configuration();
config.setInteger("state.backend.rocksdb.files.open", 20000);
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().enableObjectReuse();

EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();

final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend =
        stateBackend.configure(
                config, Thread.currentThread().getContextClassLoader());

// The below function just downloads the savepoint from our cloud storage and 
runs Savepoint.load()
ExistingSavepoint savepoint = loadSavepoint(env, configuredRocksDBStateBackend, 
savepointPath);


// ReFunctionStateReader() is a KeyedStateReaderFunction and does basic 
processing in readKey

DataSource<StateReport> source = savepoint.readKeyedState("process-1", new 
FunctionStateReader());

final MapOperator<StateReport, Metrics> sizes =
        source
                .map(s -> new Metrics(s.key, 
s.stateFields.values().stream().mapToInt(Integer::parseInt).sum(),
                        0, 0, 0, 0, 0, 0, 0))
                .returns(TypeInformation.of(new TypeHint<>() {
                }));

// MetricsRed() below is a RichReduceFunction
DataSet<Metrics> stats = sizes.reduce(new MetricsRed());


If you spot anything wrong with this approach that would cause memory issues, 
please let me know, I  am not 100% sure that the specific issue/question above 
is the full cause of the memory issues that I have been having.

Thank you!
Natalie

Reply via email to