This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b048a9cd37d0f202117bfbb74f2991879223c160
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Mon Feb 26 20:08:56 2024 +0800

    [FLINK-34050][state] Introduce parameter protection for DeleteFilesInRange
---
 .../rocksdb_configurable_configuration.html        |  6 +++
 .../state/EmbeddedRocksDBStateBackend.java         | 22 +++++++-
 .../state/RocksDBConfigurableOptions.java          |  7 +++
 .../state/RocksDBIncrementalCheckpointUtils.java   | 59 ++++++++++++++--------
 .../state/RocksDBKeyedStateBackendBuilder.java     | 13 ++++-
 .../RocksDBIncrementalRestoreOperation.java        |  9 +++-
 .../RocksDBIncrementalCheckpointUtilsTest.java     |  5 +-
 .../streaming/state/RocksDBRecoveryTest.java       |  1 +
 8 files changed, 97 insertions(+), 25 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index 4ec59a27110..bafad2db137 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -104,6 +104,12 @@
             <td>MemorySize</td>
             <td>The maximum size of RocksDB's file used for information 
logging. If the log files becomes larger than this, a new file will be created. 
If 0, all logs will be written to one log file. The default maximum file size 
is '25MB'. </td>
         </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.rescaling.use-delete-files-in-range</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, during rescaling, the deleteFilesInRange API will be 
invoked to clean up the useless files so that local disk space can be reclaimed 
more promptly.</td>
+        </tr>
         <tr>
             
<td><h5>state.backend.rocksdb.restore-overlap-fraction-threshold</h5></td>
             <td style="word-wrap: break-word;">0.0</td>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 0c5e748772e..1a489fad8c8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -72,6 +72,7 @@ import java.util.function.Supplier;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
@@ -184,6 +185,12 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      */
     private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale;
 
+    /**
+     * Whether to leverage deleteFilesInRange API to clean up useless rocksdb 
files during
+     * rescaling.
+     */
+    private final TernaryBoolean rescalingUseDeleteFilesInRange;
+
     /** Factory for Write Buffer Manager and Block Cache. */
     private RocksDBMemoryFactory rocksDBMemoryFactory;
     // ------------------------------------------------------------------------
@@ -218,6 +225,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
         this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
         this.incrementalRestoreAsyncCompactAfterRescale = 
TernaryBoolean.UNDEFINED;
+        this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
     }
 
     /**
@@ -323,6 +331,12 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         ? 
TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
                         : 
TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());
 
+        rescalingUseDeleteFilesInRange =
+                original.rescalingUseDeleteFilesInRange == 
TernaryBoolean.UNDEFINED
+                        ? TernaryBoolean.fromBoxedBoolean(
+                                
config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING))
+                        : original.rescalingUseDeleteFilesInRange;
+
         this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
     }
 
@@ -496,7 +510,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         
.setOverlapFractionThreshold(getOverlapFractionThreshold())
                         .setIncrementalRestoreAsyncCompactAfterRescale(
                                 
getIncrementalRestoreAsyncCompactAfterRescale())
-                        
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode());
+                        .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode())
+                        
.setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange());
         return builder.build();
     }
 
@@ -844,6 +859,11 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         return 
useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue());
     }
 
+    boolean isRescalingUseDeleteFilesInRange() {
+        return rescalingUseDeleteFilesInRange.getOrDefault(
+                USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue());
+    }
+
     // ------------------------------------------------------------------------
     //  utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 8acf8af11fa..47e66f03295 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -324,6 +324,13 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                     .withDescription(
                             "If true, an async compaction of RocksDB is 
started after every restore after which we detect keys (including tombstones) 
in the database that are outside the key-groups range of the backend.");
 
+    public static final ConfigOption<Boolean> 
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING =
+            key("state.backend.rocksdb.rescaling.use-delete-files-in-range")
+                    .booleanType()
+                    .defaultValue(Boolean.FALSE)
+                    .withDescription(
+                            "If true, during rescaling, the deleteFilesInRange 
API will be invoked to clean up the useless files so that local disk space can 
be reclaimed more promptly.");
+
     static final ConfigOption<?>[] CANDIDATE_CONFIGS =
             new ConfigOption<?>[] {
                 // configurable DBOptions
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 6409840ed4d..bdb94466de9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import 
org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;
@@ -33,7 +34,7 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -111,57 +112,75 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param targetKeyGroupRange the target key group range.
      * @param currentKeyGroupRange the key group range of the db instance.
      * @param keyGroupPrefixBytes Number of bytes required to prefix the key 
groups.
+     * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean 
up redundant files.
      */
     public static void clipDBWithKeyGroupRange(
             @Nonnull RocksDB db,
             @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
             @Nonnull KeyGroupRange targetKeyGroupRange,
             @Nonnull KeyGroupRange currentKeyGroupRange,
-            @Nonnegative int keyGroupPrefixBytes)
+            @Nonnegative int keyGroupPrefixBytes,
+            boolean useDeleteFilesInRange)
             throws RocksDBException {
-
-        final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
-        final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
+        List<byte[]> deletedRanges = new ArrayList<>(4);
 
         if (currentKeyGroupRange.getStartKeyGroup() < 
targetKeyGroupRange.getStartKeyGroup()) {
+            final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
+            final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
             CompositeKeySerializationUtils.serializeKeyGroup(
                     currentKeyGroupRange.getStartKeyGroup(), 
beginKeyGroupBytes);
             CompositeKeySerializationUtils.serializeKeyGroup(
                     targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
-            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, 
endKeyGroupBytes);
+            deletedRanges.add(beginKeyGroupBytes);
+            deletedRanges.add(endKeyGroupBytes);
         }
 
         if (currentKeyGroupRange.getEndKeyGroup() > 
targetKeyGroupRange.getEndKeyGroup()) {
+            final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
+            final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
             CompositeKeySerializationUtils.serializeKeyGroup(
                     targetKeyGroupRange.getEndKeyGroup() + 1, 
beginKeyGroupBytes);
             CompositeKeySerializationUtils.serializeKeyGroup(
                     currentKeyGroupRange.getEndKeyGroup() + 1, 
endKeyGroupBytes);
-            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, 
endKeyGroupBytes);
+            deletedRanges.add(beginKeyGroupBytes);
+            deletedRanges.add(endKeyGroupBytes);
         }
+
+        deleteRangeData(db, columnFamilyHandles, deletedRanges, 
useDeleteFilesInRange);
     }
 
     /**
-     * Delete the record falls into [beginKeyBytes, endKeyBytes) of the db.
+     * Delete the record that falls into the given deleteRanges of the db.
      *
      * @param db the target need to be clipped.
      * @param columnFamilyHandles the column family need to be clipped.
-     * @param beginKeyBytes the begin key bytes
-     * @param endKeyBytes the end key bytes
+     * @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, 
...). For each pair
+     *     [from, to), the startKey ('from') is inclusive, the endKey ('to') 
is exclusive.
+     * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean 
up redundant files.
      */
-    private static void deleteRange(
+    private static void deleteRangeData(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
-            byte[] beginKeyBytes,
-            byte[] endKeyBytes)
+            List<byte[]> deleteRanges,
+            boolean useDeleteFilesInRange)
             throws RocksDBException {
-        List<byte[]> deletedRange = Arrays.asList(beginKeyBytes, endKeyBytes);
+
+        Preconditions.checkArgument(deleteRanges.size() % 2 == 0);
         for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-            // Using RocksDB's deleteRange will take advantage of delete
-            // tombstones, which mark the range as deleted.
-            //
-            // 
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
-            db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
-            db.deleteFilesInRanges(columnFamilyHandle, deletedRange, false);
+            // First delete the files in ranges
+            if (useDeleteFilesInRange) {
+                db.deleteFilesInRanges(columnFamilyHandle, deleteRanges, 
false);
+            }
+
+            // Then put range limiting tombstones in place.
+            for (int i = 0; i < deleteRanges.size() / 2; i++) {
+                // Using RocksDB's deleteRange will take advantage of delete
+                // tombstones, which mark the range as deleted.
+                //
+                // 
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
+                db.deleteRange(
+                        columnFamilyHandle, deleteRanges.get(i * 2), 
deleteRanges.get(i * 2 + 1));
+            }
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 0d0b8e63992..226ec0ac36a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -80,6 +80,7 @@ import java.util.function.Function;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -130,6 +131,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
     private RocksDB injectedTestDB; // for testing
     private boolean incrementalRestoreAsyncCompactAfterRescale =
             INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue();
+    private boolean rescalingUseDeleteFilesInRange =
+            USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue();
+
     private double overlapFractionThreshold = 
RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
     private boolean useIngestDbRestoreMode = 
USE_INGEST_DB_RESTORE_MODE.defaultValue();
     private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for 
testing
@@ -288,6 +292,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    RocksDBKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(
+            boolean rescalingUseDeleteFilesInRange) {
+        this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange;
+        return this;
+    }
+
     public static File getInstanceRocksDBPath(File instanceBasePath) {
         return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
     }
@@ -508,7 +518,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     optionsContainer.getWriteBufferManagerCapacity(),
                     overlapFractionThreshold,
                     useIngestDbRestoreMode,
-                    incrementalRestoreAsyncCompactAfterRescale);
+                    incrementalRestoreAsyncCompactAfterRescale,
+                    rescalingUseDeleteFilesInRange);
         } else if (priorityQueueConfig.getPriorityQueueStateType()
                 == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
             return new RocksDBHeapTimersFullRestoreOperation<>(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index afd19c75c7c..b5276abc848 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -126,6 +126,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
     private final boolean asyncCompactAfterRescale;
 
+    private final boolean useDeleteFilesInRange;
+
     public RocksDBIncrementalRestoreOperation(
             String operatorIdentifier,
             KeyGroupRange keyGroupRange,
@@ -148,7 +150,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             Long writeBufferManagerCapacity,
             double overlapFractionThreshold,
             boolean useIngestDbRestoreMode,
-            boolean asyncCompactAfterRescale) {
+            boolean asyncCompactAfterRescale,
+            boolean useDeleteFilesInRange) {
         this.rocksHandle =
                 new RocksDBHandle(
                         kvStateInformation,
@@ -178,6 +181,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         //        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
         this.useIngestDbRestoreMode = false;
         this.asyncCompactAfterRescale = false;
+        this.useDeleteFilesInRange = useDeleteFilesInRange;
     }
 
     /**
@@ -371,7 +375,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         this.rocksHandle.getColumnFamilyHandles(),
                         keyGroupRange,
                         stateHandleKeyGroupRange,
-                        keyGroupPrefixBytes);
+                        keyGroupPrefixBytes,
+                        useDeleteFilesInRange);
             } catch (RocksDBException e) {
                 String errMsg = "Failed to clip DB after initialization.";
                 logger.error(errMsg, e);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 1d78eba4fac..9b1df89c2f1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -58,6 +58,8 @@ public class RocksDBIncrementalCheckpointUtilsTest extends 
TestLogger {
 
         testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(0, 1), new 
KeyGroupRange(2, 4), 1);
 
+        testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(4, 5), new 
KeyGroupRange(2, 7), 1);
+
         testClipDBWithKeyGroupRangeHelper(
                 new KeyGroupRange(Byte.MAX_VALUE - 15, Byte.MAX_VALUE),
                 new KeyGroupRange(Byte.MAX_VALUE - 10, Byte.MAX_VALUE),
@@ -179,7 +181,8 @@ public class RocksDBIncrementalCheckpointUtilsTest extends 
TestLogger {
                     Collections.singletonList(columnFamilyHandle),
                     targetGroupRange,
                     currentGroupRange,
-                    keyGroupPrefixBytes);
+                    keyGroupPrefixBytes,
+                    true);
 
             for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; 
++i) {
                 for (int j = 0; j < 100; ++j) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
index 4a233ec06ba..ce4d93ea0be 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
@@ -289,6 +289,7 @@ public class RocksDBRecoveryTest {
                             .setEnableIncrementalCheckpointing(true)
                             .setUseIngestDbRestoreMode(useIngest)
                             
.setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale)
+                            .setRescalingUseDeleteFilesInRange(true)
                             .build();
 
             long instanceTime = System.currentTimeMillis() - tInstance;

Reply via email to