This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new fb6c4c2c774 [FLINK-39753][state/rocksdb] Close ColumnFamilyOptions 
from getDescriptor() in Compactor (#28280)
fb6c4c2c774 is described below

commit fb6c4c2c774f6dba914ebeeaa1d78ac839b7053c
Author: Keith Lee <[email protected]>
AuthorDate: Mon Jun 1 11:53:39 2026 +0100

    [FLINK-39753][state/rocksdb] Close ColumnFamilyOptions from getDescriptor() 
in Compactor (#28280)
    
    ColumnFamilyHandle.getDescriptor() allocates a new native 
ColumnFamilyOptions
    on every call and does not close it, preventing the shared block cache from
    being freed. Wrap the call in try-with-resources so the options are closed
    after reading numLevels().
---
 .../java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java  | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java
index 2a4d7b26c73..e6e0e65d46e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.rocksdb.sstmerge;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionJobInfo;
 import org.rocksdb.CompactionOptions;
 import org.rocksdb.RocksDB;
@@ -51,7 +52,13 @@ class Compactor {
     }
 
     void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
throws RocksDBException {
-        int outputLevel = Math.min(level + 1, 
cfName.getDescriptor().getOptions().numLevels() - 1);
+        // FLINK-39753: getDescriptor() allocates a new native 
ColumnFamilyOptions on every call.
+        // Closing it is required to release the native object and its 
reference to the shared
+        // block cache; leaking it keeps the LRUCache alive and grows native 
memory until OOM.
+        final int outputLevel;
+        try (ColumnFamilyOptions cfOptions = 
cfName.getDescriptor().getOptions()) {
+            outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);
+        }
         LOG.debug(
                 "Manually compacting {} files from level {} to {}: {}",
                 files.size(),

Reply via email to