rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1443420860


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java:
##########
@@ -305,6 +311,11 @@ private EmbeddedRocksDBStateBackend(
                 overlapFractionThreshold >= 0 && this.overlapFractionThreshold 
<= 1,
                 "Overlap fraction threshold of restoring should be between 0 
and 1");
 
+        useIngestDbRestoreMode =
+                original.useIngestDbRestoreMode == 
UNDEFINED_USE_INGEST_DB_RESTORE_MODE
+                        ? config.get(USE_INGEST_DB_RESTORE_MODE)
+                        : original.useIngestDbRestoreMode;

Review Comment:
   Won't this ignore `original.useIngestDbRestoreMode` if it's set to `false`?
   (I guess this is a copy-paste from other fields, but they aren't boolean)



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -329,74 +332,164 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
             CompositeKeySerializationUtils.serializeKeyGroup(
                     keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
 
-            // Insert all remaining state through creating temporary RocksDB 
instances
+            rescalingRestoreFromLocalStateOperation.accept(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleCopyFromTemporaryInstance(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
+        Preconditions.checkNotNull(selectedInitialHandle);
+
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        // Init the base DB instance with the initial state
+        initBaseDBForRescaling(selectedInitialHandle);
+
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            logger.info("Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                            restoreTempDBInstanceFromLocalState(stateHandle);
+                    RocksDBWriteBatchWrapper writeBatchWrapper =
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) 
{
+
+                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                        tmpRestoreDBInfo.columnFamilyDescriptors;
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // iterating only the requested descriptors automatically 
skips the default
+                // column
+                // family handle
+                for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+                    ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+                    ColumnFamilyHandle targetColumnFamilyHandle =
+                            
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                            null,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                                    .columnFamilyHandle;
+
+                    try (RocksIteratorWrapper iterator =
+                            RocksDBOperationUtils.getRocksIterator(
+                                    tmpRestoreDBInfo.db,
+                                    tmpColumnFamilyHandle,
+                                    tmpRestoreDBInfo.readOptions)) {
+
+                        iterator.seek(startKeyGroupPrefixBytes);
+
+                        while (iterator.isValid()) {
+
+                            if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                                    iterator.key(), stopKeyGroupPrefixBytes)) {
+                                writeBatchWrapper.put(
+                                        targetColumnFamilyHandle, 
iterator.key(), iterator.value());
+                            } else {
+                                // Since the iterator will visit the record 
according to the
+                                // sorted
+                                // order,
+                                // we can just break here.
+                                break;
+                            }
+
+                            iterator.next();
+                        }
+                    } // releases native iterator resources
+                }
+                logger.info(
+                        "Finished restoring from state handle: {} with 
rescaling.", stateHandle);
+            }
+        }
+    }
+
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void rescaleClipIngestDB(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
+        Files.createDirectories(exportCfBasePath);
+
+        final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                columnFamilyMetaDataToImport = new HashMap<>();
+
+        try {
             for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
                 logger.info(
-                        "Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+                        "Starting to restore from state handle: {} with 
rescaling using Clip/Ingest DB.",
+                        stateHandle);
 
                 try (RestoredDBInstance tmpRestoreDBInfo =
-                                
restoreTempDBInstanceFromLocalState(stateHandle);
-                        RocksDBWriteBatchWrapper writeBatchWrapper =
-                                new RocksDBWriteBatchWrapper(
-                                        this.rocksHandle.getDb(), 
writeBatchSize)) {
+                        restoreTempDBInstanceFromLocalState(stateHandle)) {
 
-                    List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
-                            tmpRestoreDBInfo.columnFamilyDescriptors;
                     List<ColumnFamilyHandle> tmpColumnFamilyHandles =
                             tmpRestoreDBInfo.columnFamilyHandles;
 
-                    // iterating only the requested descriptors automatically 
skips the default
-                    // column
-                    // family handle
-                    for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
-                        ColumnFamilyHandle tmpColumnFamilyHandle =
-                                tmpColumnFamilyHandles.get(descIdx);
-
-                        ColumnFamilyHandle targetColumnFamilyHandle =
-                                
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                                null,
-                                                
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
-                                                        descIdx))
-                                        .columnFamilyHandle;
-
-                        try (RocksIteratorWrapper iterator =
-                                RocksDBOperationUtils.getRocksIterator(
-                                        tmpRestoreDBInfo.db,
-                                        tmpColumnFamilyHandle,
-                                        tmpRestoreDBInfo.readOptions)) {
-
-                            iterator.seek(startKeyGroupPrefixBytes);
-
-                            while (iterator.isValid()) {
-
-                                if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
-                                        iterator.key(), 
stopKeyGroupPrefixBytes)) {
-                                    writeBatchWrapper.put(
-                                            targetColumnFamilyHandle,
-                                            iterator.key(),
-                                            iterator.value());
-                                } else {
-                                    // Since the iterator will visit the 
record according to the
-                                    // sorted
-                                    // order,
-                                    // we can just break here.
-                                    break;
-                                }
-
-                                iterator.next();
-                            }
-                        } // releases native iterator resources
+                    // Clip all tmp db to Range [startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes)
+                    RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
+                            tmpRestoreDBInfo.db,
+                            tmpColumnFamilyHandles,
+                            startKeyGroupPrefixBytes,
+                            stopKeyGroupPrefixBytes);
+
+                    // Export all the Column Families
+                    List<Pair<RegisteredStateMetaInfoBase, 
ExportImportFilesMetaData>>
+                            exportedCFAndMetaData =
+                                    
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+                                            tmpRestoreDBInfo.db,
+                                            tmpColumnFamilyHandles,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots,
+                                            exportCfBasePath);
+
+                    for (Pair<RegisteredStateMetaInfoBase, 
ExportImportFilesMetaData> entry :
+                            exportedCFAndMetaData) {
+                        ExportImportFilesMetaData cfMetaData = 
entry.getValue();
+                        // TODO: method files() doesn't exist in the RocksDB 
API
+                        //                        if 
(cfMetaData.files().isEmpty()) {
+                        //                            continue;
+                        //                        }
+                        columnFamilyMetaDataToImport
+                                .computeIfAbsent(entry.getKey(), (k) -> new 
ArrayList<>())
+                                .add(cfMetaData);
                     }
-                    logger.info(
-                            "Finished restoring from state handle: {} with 
rescaling.",
-                            stateHandle);
                 }
+                logger.info(
+                        "Finished exporting column family from state handle: 
{} for rescaling.",
+                        stateHandle);
             }
+
+            // Open the target RocksDB and import the exported column families
+            this.rocksHandle.openDB();
+            columnFamilyMetaDataToImport.forEach(
+                    
this.rocksHandle::registerStateColumnFamilyHandleWithImport);
+            logger.info(
+                    "Finished importing exported column families into target 
DB for rescaling.");
         } finally {
-            // Cleanup all download directories
-            allDownloadSpecs.stream()
-                    .map(StateHandleDownloadSpec::getDownloadDestination)
-                    .forEach(this::cleanUpPathQuietly);
+            cleanUpPathQuietly(exportCfBasePath);

Review Comment:
   Just to clarify, `cleanUpPathQuietly(exportCfBasePath)` doesn't affect the 
opened RocksDb instance by deleting the imported files?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java:
##########
@@ -154,6 +162,54 @@ private static void deleteRange(
         }
     }
 
+    /**
+     * Clip the entries in the CF according to the range [begin_key, end_key). 
Any entries outside
+     * this range will be completely deleted (including tombstones).
+     *
+     * @param db the target need to be clipped.
+     * @param columnFamilyHandles the column family need to be clipped.
+     * @param beginKeyBytes the begin key bytes
+     * @param endKeyBytes the end key bytes
+     */
+    public static void clipColumnFamilies(
+            RocksDB db,
+            List<ColumnFamilyHandle> columnFamilyHandles,
+            byte[] beginKeyBytes,
+            byte[] endKeyBytes)
+            throws RocksDBException {
+
+        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+            db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, 
endKeyBytes);
+        }
+    }
+
+    public static List<Pair<RegisteredStateMetaInfoBase, 
ExportImportFilesMetaData>>
+            exportColumnFamilies(
+                    RocksDB db,
+                    List<ColumnFamilyHandle> columnFamilyHandles,
+                    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,

Review Comment:
   This to lists should match each other, right?
   Should we check with `Preconditions.checkState(columnFamilyHandles.size() == 
stateMetaInfoSnapshots.size());`,
   or better pass RocksDB instance here?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java:
##########
@@ -185,6 +187,33 @@ RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
         return registeredStateMetaInfoEntry;
     }
 
+    RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport(
+            RegisteredStateMetaInfoBase stateMetaInfo,
+            List<ExportImportFilesMetaData> cfMetaDataList) {
+
+        RocksDbKvStateInfo registeredStateMetaInfoEntry =
+                kvStateInformation.get(stateMetaInfo.getName());
+
+        Preconditions.checkState(registeredStateMetaInfoEntry == null);

Review Comment:
   Replace with
   ```
   
Preconditions.checkState(!kvStateInformation.containsKey(stateMetaInfo.getName()));
   ``` 
   ?



##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java:
##########
@@ -419,7 +431,12 @@ private KeyedOneInputStreamOperatorTestHarness<String, 
String, Integer> getHarne
     }
 
     private StateBackend getStateBackend() throws Exception {
-        return new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+        RocksDBStateBackend rocksDBStateBackend =
+                new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+        Configuration configuration = new Configuration();

Review Comment:
   Missing import.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java:
##########
@@ -179,6 +182,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      */
     private double overlapFractionThreshold;
 
+    private boolean useIngestDbRestoreMode;
+

Review Comment:
   Can be `final` according to IntelliJ :)



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java:
##########
@@ -123,16 +125,36 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo 
createStateInfo(
             RocksDB db,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nullable Long writeBufferManagerCapacity) {
+            @Nullable Long writeBufferManagerCapacity,
+            List<ExportImportFilesMetaData> cfMetaDataList) {

Review Comment:
   Can't we use an empty list for `cfMetaDataList` instead of `null`?
   If not, mark `@Nullable`?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -329,74 +332,164 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
             CompositeKeySerializationUtils.serializeKeyGroup(
                     keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
 
-            // Insert all remaining state through creating temporary RocksDB 
instances
+            rescalingRestoreFromLocalStateOperation.accept(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleCopyFromTemporaryInstance(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
+        Preconditions.checkNotNull(selectedInitialHandle);
+
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        // Init the base DB instance with the initial state
+        initBaseDBForRescaling(selectedInitialHandle);
+
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            logger.info("Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                            restoreTempDBInstanceFromLocalState(stateHandle);
+                    RocksDBWriteBatchWrapper writeBatchWrapper =
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) 
{
+
+                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                        tmpRestoreDBInfo.columnFamilyDescriptors;
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // iterating only the requested descriptors automatically 
skips the default
+                // column
+                // family handle
+                for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+                    ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+                    ColumnFamilyHandle targetColumnFamilyHandle =
+                            
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                            null,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                                    .columnFamilyHandle;
+
+                    try (RocksIteratorWrapper iterator =
+                            RocksDBOperationUtils.getRocksIterator(
+                                    tmpRestoreDBInfo.db,
+                                    tmpColumnFamilyHandle,
+                                    tmpRestoreDBInfo.readOptions)) {
+
+                        iterator.seek(startKeyGroupPrefixBytes);
+
+                        while (iterator.isValid()) {
+
+                            if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                                    iterator.key(), stopKeyGroupPrefixBytes)) {
+                                writeBatchWrapper.put(
+                                        targetColumnFamilyHandle, 
iterator.key(), iterator.value());
+                            } else {
+                                // Since the iterator will visit the record 
according to the
+                                // sorted
+                                // order,
+                                // we can just break here.
+                                break;
+                            }
+
+                            iterator.next();
+                        }
+                    } // releases native iterator resources
+                }
+                logger.info(
+                        "Finished restoring from state handle: {} with 
rescaling.", stateHandle);
+            }
+        }
+    }
+
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void rescaleClipIngestDB(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");

Review Comment:
   Just to clarify, this is not a temporary folder so that the files can be 
efficiently sym-linked?



##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java:
##########
@@ -41,14 +41,26 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 /** Tests to guard rescaling from checkpoint. */
+@RunWith(Parameterized.class)
 public class RocksIncrementalCheckpointRescalingTest extends TestLogger {
 
     @Rule public TemporaryFolder rootFolder = new TemporaryFolder();
 
+    @Parameterized.Parameters(name = "useIngestDbRestoreMode: {0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @Parameterized.Parameter public boolean useIngestDbRestoreMode;

Review Comment:
   It would be great to parameterise other unit-tests. Maybe 
`EmbeddedRocksDBStateBackendTest` and `FileStateBackendTest`?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java:
##########
@@ -185,6 +187,33 @@ RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
         return registeredStateMetaInfoEntry;
     }
 
+    RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport(
+            RegisteredStateMetaInfoBase stateMetaInfo,
+            List<ExportImportFilesMetaData> cfMetaDataList) {
+
+        RocksDbKvStateInfo registeredStateMetaInfoEntry =
+                kvStateInformation.get(stateMetaInfo.getName());
+
+        Preconditions.checkState(registeredStateMetaInfoEntry == null);
+
+        registeredStateMetaInfoEntry =
+                RocksDBOperationUtils.createStateInfo(
+                        stateMetaInfo,
+                        db,
+                        columnFamilyOptionsFactory,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity,
+                        cfMetaDataList);
+
+        RocksDBOperationUtils.registerKvStateInformation(
+                kvStateInformation,
+                nativeMetricMonitor,
+                stateMetaInfo.getName(),
+                registeredStateMetaInfoEntry);
+
+        return registeredStateMetaInfoEntry;

Review Comment:
   The return value isn't used, so return `void` and inline?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -105,6 +110,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
     private boolean isKeySerializerCompatibilityChecked;
 
+    private final TriConsumerWithException<
+                    Collection<IncrementalLocalKeyedStateHandle>, byte[], 
byte[], Exception>
+            rescalingRestoreFromLocalStateOperation;
+

Review Comment:
   This can be replaced by a boolean flag (`useIngestDbRestoreMode`) and a 
"normal" if and call in `restoreWithRescaling`.
   I think it would be more readable.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java:
##########
@@ -230,6 +252,19 @@ private static ColumnFamilyHandle createColumnFamily(
         }
     }
 
+    private static ColumnFamilyHandle createColumnFamilyWithImport(
+            ColumnFamilyDescriptor columnDescriptor,
+            RocksDB db,
+            List<ExportImportFilesMetaData> metaDataList) {
+        try {
+            return db.createColumnFamilyWithImport(
+                    columnDescriptor, new ImportColumnFamilyOptions(), 
metaDataList);
+        } catch (RocksDBException e) {
+            IOUtils.closeQuietly(columnDescriptor.getOptions());
+            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);

Review Comment:
   Missing `metaDataList.forEach(IOUtils::closeQuietly);` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to