rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1448593360


##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -234,7 +247,8 @@ public void setupRocksKeyedStateBackend() throws Exception {
                                 spy(db),
                                 defaultCFHandle,
                                 optionsContainer.getColumnOptions())
-                        
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing);
+                        
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
+                        .setUseIngestDbRestoreMode(useIngestDB);

Review Comment:
   I see that `testCorrectMergeOperatorSet` sets up the backend independently 
of this method - we could also add `setUseIngestDbRestoreMode` there.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java:
##########
@@ -154,6 +164,67 @@ private static void deleteRange(
         }
     }
 
+    /**
+     * Clip the entries in the CF according to the range [begin_key, end_key). 
Any entries outside
+     * this range will be completely deleted (including tombstones).
+     *
+     * @param db the target need to be clipped.
+     * @param columnFamilyHandles the column family need to be clipped.
+     * @param beginKeyBytes the begin key bytes
+     * @param endKeyBytes the end key bytes
+     */
+    public static void clipColumnFamilies(
+            RocksDB db,
+            List<ColumnFamilyHandle> columnFamilyHandles,
+            byte[] beginKeyBytes,
+            byte[] endKeyBytes)
+            throws RocksDBException {
+
+        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+            db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, 
endKeyBytes);
+            // TODO: temporary fix until 
https://github.com/facebook/rocksdb/pull/12219
+            //  is in the frocksDB release.
+            db.compactRange(columnFamilyHandle);
+        }
+    }
+
+    public static void exportColumnFamilies(
+            RocksDB db,
+            List<ColumnFamilyHandle> columnFamilyHandles,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            Path exportBasePath,
+            Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> 
resultOutput)
+            throws RocksDBException {
+
+        Preconditions.checkArgument(
+                columnFamilyHandles.size() == stateMetaInfoSnapshots.size(),
+                "Lists are aligned by index and must be of the same size!");
+
+        try (final Checkpoint checkpoint = Checkpoint.create(db)) {
+            for (int i = 0; i < columnFamilyHandles.size(); i++) {
+                StateMetaInfoSnapshot metaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
+
+                RegisteredStateMetaInfoBase stateMetaInfo =
+                        
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(metaInfoSnapshot);
+
+                Path subPath = 
exportBasePath.resolve(UUID.randomUUID().toString());
+                ExportImportFilesMetaData cfMetaData =
+                        checkpoint.exportColumnFamily(
+                                columnFamilyHandles.get(i), 
subPath.toString());
+
+                File[] exportedSstFiles =
+                        subPath.toFile()
+                                .listFiles((file, name) -> 
name.toLowerCase().endsWith(".sst"));
+
+                if (exportedSstFiles != null && exportedSstFiles.length > 0) {
+                    resultOutput
+                            .computeIfAbsent(stateMetaInfo, (key) -> new 
ArrayList<>())
+                            .add(cfMetaData);
+                }

Review Comment:
   `else cfMetaData.close();` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to