This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b048a9cd37d0f202117bfbb74f2991879223c160 Author: Jinzhong Li <lijinzhong2...@gmail.com> AuthorDate: Mon Feb 26 20:08:56 2024 +0800 [FLINK-34050][state] Introduce parameter protection for DeleteFilesInRange --- .../rocksdb_configurable_configuration.html | 6 +++ .../state/EmbeddedRocksDBStateBackend.java | 22 +++++++- .../state/RocksDBConfigurableOptions.java | 7 +++ .../state/RocksDBIncrementalCheckpointUtils.java | 59 ++++++++++++++-------- .../state/RocksDBKeyedStateBackendBuilder.java | 13 ++++- .../RocksDBIncrementalRestoreOperation.java | 9 +++- .../RocksDBIncrementalCheckpointUtilsTest.java | 5 +- .../streaming/state/RocksDBRecoveryTest.java | 1 + 8 files changed, 97 insertions(+), 25 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html index 4ec59a27110..bafad2db137 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html @@ -104,6 +104,12 @@ <td>MemorySize</td> <td>The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is '25MB'. </td> </tr> + <tr> + <td><h5>state.backend.rocksdb.rescaling.use-delete-files-in-range</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.</td> + </tr> <tr> <td><h5>state.backend.rocksdb.restore-overlap-fraction-threshold</h5></td> <td style="word-wrap: break-word;">0.0</td> diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 0c5e748772e..1a489fad8c8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -72,6 +72,7 @@ import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; @@ -184,6 +185,12 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke */ private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale; + /** + * Whether to leverage deleteFilesInRange API to clean up useless rocksdb files during + * rescaling. + */ + private final TernaryBoolean rescalingUseDeleteFilesInRange; + /** Factory for Write Buffer Manager and Block Cache. */ private RocksDBMemoryFactory rocksDBMemoryFactory; // ------------------------------------------------------------------------ @@ -218,6 +225,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.priorityQueueConfig = new RocksDBPriorityQueueConfig(); this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED; + this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED; } /** @@ -323,6 +331,12 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke ? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE)) : TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode()); + rescalingUseDeleteFilesInRange = + original.rescalingUseDeleteFilesInRange == TernaryBoolean.UNDEFINED + ? TernaryBoolean.fromBoxedBoolean( + config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING)) + : original.rescalingUseDeleteFilesInRange; + this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } @@ -496,7 +510,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke .setOverlapFractionThreshold(getOverlapFractionThreshold()) .setIncrementalRestoreAsyncCompactAfterRescale( getIncrementalRestoreAsyncCompactAfterRescale()) - .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()); + .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()) + .setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange()); return builder.build(); } @@ -844,6 +859,11 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke return useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue()); } + boolean isRescalingUseDeleteFilesInRange() { + return rescalingUseDeleteFilesInRange.getOrDefault( + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()); + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 8acf8af11fa..47e66f03295 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -324,6 +324,13 @@ public class RocksDBConfigurableOptions implements Serializable { .withDescription( "If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend."); + public static final ConfigOption<Boolean> USE_DELETE_FILES_IN_RANGE_DURING_RESCALING = + key("state.backend.rocksdb.rescaling.use-delete-files-in-range") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly."); + static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] { // configurable DBOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 6409840ed4d..bdb94466de9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes; @@ -33,7 +34,7 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -111,57 +112,75 @@ public class RocksDBIncrementalCheckpointUtils { * @param targetKeyGroupRange the target key group range. * @param currentKeyGroupRange the key group range of the db instance. * @param keyGroupPrefixBytes Number of bytes required to prefix the key groups. + * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files. */ public static void clipDBWithKeyGroupRange( @Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull KeyGroupRange targetKeyGroupRange, @Nonnull KeyGroupRange currentKeyGroupRange, - @Nonnegative int keyGroupPrefixBytes) + @Nonnegative int keyGroupPrefixBytes, + boolean useDeleteFilesInRange) throws RocksDBException { - - final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; - final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + List<byte[]> deletedRanges = new ArrayList<>(4); if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) { + final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; CompositeKeySerializationUtils.serializeKeyGroup( currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); CompositeKeySerializationUtils.serializeKeyGroup( targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes); - deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); + deletedRanges.add(beginKeyGroupBytes); + deletedRanges.add(endKeyGroupBytes); } if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) { + final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; CompositeKeySerializationUtils.serializeKeyGroup( targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes); CompositeKeySerializationUtils.serializeKeyGroup( currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); - deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); + deletedRanges.add(beginKeyGroupBytes); + deletedRanges.add(endKeyGroupBytes); } + + deleteRangeData(db, columnFamilyHandles, deletedRanges, useDeleteFilesInRange); } /** - * Delete the record falls into [beginKeyBytes, endKeyBytes) of the db. + * Delete the record that falls into the given deleteRanges of the db. * * @param db the target need to be clipped. * @param columnFamilyHandles the column family need to be clipped. - * @param beginKeyBytes the begin key bytes - * @param endKeyBytes the end key bytes + * @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, ...). For each pair + * [from, to), the startKey ('from') is inclusive, the endKey ('to') is exclusive. + * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files. */ - private static void deleteRange( + private static void deleteRangeData( RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, - byte[] beginKeyBytes, - byte[] endKeyBytes) + List<byte[]> deleteRanges, + boolean useDeleteFilesInRange) throws RocksDBException { - List<byte[]> deletedRange = Arrays.asList(beginKeyBytes, endKeyBytes); + + Preconditions.checkArgument(deleteRanges.size() % 2 == 0); for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - // Using RocksDB's deleteRange will take advantage of delete - // tombstones, which mark the range as deleted. - // - // https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377 - db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes); - db.deleteFilesInRanges(columnFamilyHandle, deletedRange, false); + // First delete the files in ranges + if (useDeleteFilesInRange) { + db.deleteFilesInRanges(columnFamilyHandle, deleteRanges, false); + } + + // Then put range limiting tombstones in place. + for (int i = 0; i < deleteRanges.size() / 2; i++) { + // Using RocksDB's deleteRange will take advantage of delete + // tombstones, which mark the range as deleted. + // + // https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377 + db.deleteRange( + columnFamilyHandle, deleteRanges.get(i * 2), deleteRanges.get(i * 2 + 1)); + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 0d0b8e63992..226ec0ac36a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -80,6 +80,7 @@ import java.util.function.Function; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; @@ -130,6 +131,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private RocksDB injectedTestDB; // for testing private boolean incrementalRestoreAsyncCompactAfterRescale = INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue(); + private boolean rescalingUseDeleteFilesInRange = + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue(); + private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue(); private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing @@ -288,6 +292,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken return this; } + RocksDBKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange( + boolean rescalingUseDeleteFilesInRange) { + this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange; + return this; + } + public static File getInstanceRocksDBPath(File instanceBasePath) { return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); } @@ -508,7 +518,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken optionsContainer.getWriteBufferManagerCapacity(), overlapFractionThreshold, useIngestDbRestoreMode, - incrementalRestoreAsyncCompactAfterRescale); + incrementalRestoreAsyncCompactAfterRescale, + rescalingUseDeleteFilesInRange); } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index afd19c75c7c..b5276abc848 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -126,6 +126,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private final boolean asyncCompactAfterRescale; + private final boolean useDeleteFilesInRange; + public RocksDBIncrementalRestoreOperation( String operatorIdentifier, KeyGroupRange keyGroupRange, @@ -148,7 +150,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper Long writeBufferManagerCapacity, double overlapFractionThreshold, boolean useIngestDbRestoreMode, - boolean asyncCompactAfterRescale) { + boolean asyncCompactAfterRescale, + boolean useDeleteFilesInRange) { this.rocksHandle = new RocksDBHandle( kvStateInformation, @@ -178,6 +181,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper // this.asyncCompactAfterRescale = asyncCompactAfterRescale; this.useIngestDbRestoreMode = false; this.asyncCompactAfterRescale = false; + this.useDeleteFilesInRange = useDeleteFilesInRange; } /** @@ -371,7 +375,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.rocksHandle.getColumnFamilyHandles(), keyGroupRange, stateHandleKeyGroupRange, - keyGroupPrefixBytes); + keyGroupPrefixBytes, + useDeleteFilesInRange); } catch (RocksDBException e) { String errMsg = "Failed to clip DB after initialization."; logger.error(errMsg, e); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java index 1d78eba4fac..9b1df89c2f1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java @@ -58,6 +58,8 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger { testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(0, 1), new KeyGroupRange(2, 4), 1); + testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(4, 5), new KeyGroupRange(2, 7), 1); + testClipDBWithKeyGroupRangeHelper( new KeyGroupRange(Byte.MAX_VALUE - 15, Byte.MAX_VALUE), new KeyGroupRange(Byte.MAX_VALUE - 10, Byte.MAX_VALUE), @@ -179,7 +181,8 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger { Collections.singletonList(columnFamilyHandle), targetGroupRange, currentGroupRange, - keyGroupPrefixBytes); + keyGroupPrefixBytes, + true); for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) { for (int j = 0; j < 100; ++j) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java index 4a233ec06ba..ce4d93ea0be 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java @@ -289,6 +289,7 @@ public class RocksDBRecoveryTest { .setEnableIncrementalCheckpointing(true) .setUseIngestDbRestoreMode(useIngest) .setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale) + .setRescalingUseDeleteFilesInRange(true) .build(); long instanceTime = System.currentTimeMillis() - tInstance;