Hi All, I am working on trying to process a Savepoint in order to produce basic statistics on it for monitoring. I’m running into an issue where processing a large Savepoint is running out of memory before I can process the Savepoint completely.
One thing I noticed in profiling the code is that there seems to be a lot of memory given to the RocksDB ColumnFamilyOptions class because it is producing a lot of Java.lang.ref.Finalizer objects that don’t seem to be garbage collected. I see in the RocksDB code that these should be closed but it doesn’t seem like they are being closed. https://github.com/facebook/rocksdb/blob/f57745814f2d9e937383b4bfea55373306182c14/java/src/main/java/org/rocksdb/AbstractNativeReference.java#L71 Is there a way to close these via the Flink API? Also, more generally, why am I seeing hundreds of thousands of these generated? In case it’s helpful, here’s a genericized/simplified version of the code: import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.state.api.ExistingSavepoint; import org.apache.flink.state.api.Savepoint; import org.apache.flink.state.api.functions.KeyedStateReaderFunction; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.common.functions.RichReduceFunction; Configuration config = new Configuration(); config.setInteger("state.backend.rocksdb.files.open", 20000); ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config); env.getConfig().enableObjectReuse(); EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(); final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend = stateBackend.configure( config, Thread.currentThread().getContextClassLoader()); // The below function just downloads the savepoint from our cloud storage and runs Savepoint.load() ExistingSavepoint savepoint = loadSavepoint(env, configuredRocksDBStateBackend, savepointPath); // ReFunctionStateReader() is a KeyedStateReaderFunction and does basic processing in readKey DataSource<StateReport> source = savepoint.readKeyedState("process-1", new FunctionStateReader()); final MapOperator<StateReport, Metrics> sizes = source .map(s -> new Metrics(s.key, s.stateFields.values().stream().mapToInt(Integer::parseInt).sum(), 0, 0, 0, 0, 0, 0, 0)) .returns(TypeInformation.of(new TypeHint<>() { })); // MetricsRed() below is a RichReduceFunction DataSet<Metrics> stats = sizes.reduce(new MetricsRed()); If you spot anything wrong with this approach that would cause memory issues, please let me know, I am not 100% sure that the specific issue/question above is the full cause of the memory issues that I have been having. Thank you! Natalie