This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2b6588ef39cbeb1825cacf553a80272ae7b9418
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Tue Feb 27 11:54:28 2024 +0100

    [hotfix] Reduce code duplication and add logging in 
RocksDBIncrementalCheckpointUtils#deleteRangeData.
---
 .../state/RocksDBIncrementalCheckpointUtils.java   | 66 +++++++++++++++-------
 1 file changed, 46 insertions(+), 20 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index bdb94466de9..67bf02cd31a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -29,21 +29,28 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** Utils for RocksDB Incremental Checkpoint. */
 public class RocksDBIncrementalCheckpointUtils {
 
+    private static final Logger logger =
+            LoggerFactory.getLogger(RocksDBIncrementalCheckpointUtils.class);
+
     /**
      * Evaluates state handle's "score" regarding the target range when 
choosing the best state
      * handle to init the initial db for recovery, if the overlap fraction is 
less than
@@ -112,7 +119,7 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param targetKeyGroupRange the target key group range.
      * @param currentKeyGroupRange the key group range of the db instance.
      * @param keyGroupPrefixBytes Number of bytes required to prefix the key 
groups.
-     * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean 
up redundant files.
+     * @param useDeleteFilesInRange whether to call db.deleteFilesInRanges for 
the deleted ranges.
      */
     public static void clipDBWithKeyGroupRange(
             @Nonnull RocksDB db,
@@ -122,31 +129,45 @@ public class RocksDBIncrementalCheckpointUtils {
             @Nonnegative int keyGroupPrefixBytes,
             boolean useDeleteFilesInRange)
             throws RocksDBException {
-        List<byte[]> deletedRanges = new ArrayList<>(4);
+
+        List<byte[]> deleteFilesRanges = new ArrayList<>(4);
 
         if (currentKeyGroupRange.getStartKeyGroup() < 
targetKeyGroupRange.getStartKeyGroup()) {
-            final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
-            final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    currentKeyGroupRange.getStartKeyGroup(), 
beginKeyGroupBytes);
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
-            deletedRanges.add(beginKeyGroupBytes);
-            deletedRanges.add(endKeyGroupBytes);
+            prepareRangeDeletes(
+                    keyGroupPrefixBytes,
+                    currentKeyGroupRange.getStartKeyGroup(),
+                    targetKeyGroupRange.getStartKeyGroup(),
+                    deleteFilesRanges);
         }
 
         if (currentKeyGroupRange.getEndKeyGroup() > 
targetKeyGroupRange.getEndKeyGroup()) {
-            final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
-            final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    targetKeyGroupRange.getEndKeyGroup() + 1, 
beginKeyGroupBytes);
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    currentKeyGroupRange.getEndKeyGroup() + 1, 
endKeyGroupBytes);
-            deletedRanges.add(beginKeyGroupBytes);
-            deletedRanges.add(endKeyGroupBytes);
+            prepareRangeDeletes(
+                    keyGroupPrefixBytes,
+                    targetKeyGroupRange.getEndKeyGroup() + 1,
+                    currentKeyGroupRange.getEndKeyGroup() + 1,
+                    deleteFilesRanges);
         }
 
-        deleteRangeData(db, columnFamilyHandles, deletedRanges, 
useDeleteFilesInRange);
+        logger.info(
+                "Performing range delete for backend with target key-groups 
range {} with boundaries set {} - deleteFilesInRanges = {}.",
+                targetKeyGroupRange.prettyPrintInterval(),
+                
deleteFilesRanges.stream().map(Arrays::toString).collect(Collectors.toList()),
+                useDeleteFilesInRange);
+
+        deleteRangeData(db, columnFamilyHandles, deleteFilesRanges, 
useDeleteFilesInRange);
+    }
+
+    private static void prepareRangeDeletes(
+            int keyGroupPrefixBytes,
+            int beginKeyGroup,
+            int endKeyGroup,
+            List<byte[]> deleteFilesRangesOut) {
+        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
+        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(beginKeyGroup, 
beginKeyGroupBytes);
+        CompositeKeySerializationUtils.serializeKeyGroup(endKeyGroup, 
endKeyGroupBytes);
+        deleteFilesRangesOut.add(beginKeyGroupBytes);
+        deleteFilesRangesOut.add(endKeyGroupBytes);
     }
 
     /**
@@ -156,7 +177,7 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param columnFamilyHandles the column family need to be clipped.
      * @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, 
...). For each pair
      *     [from, to), the startKey ('from') is inclusive, the endKey ('to') 
is exclusive.
-     * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean 
up redundant files.
+     * @param useDeleteFilesInRange whether to use deleteFilesInRange to clean 
up redundant files.
      */
     private static void deleteRangeData(
             RocksDB db,
@@ -165,6 +186,11 @@ public class RocksDBIncrementalCheckpointUtils {
             boolean useDeleteFilesInRange)
             throws RocksDBException {
 
+        if (deleteRanges.isEmpty()) {
+            // nothing to do.
+            return;
+        }
+
         Preconditions.checkArgument(deleteRanges.size() % 2 == 0);
         for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
             // First delete the files in ranges

Reply via email to