This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 05cdb299e26db931a328bdc4e536dfb7effdb905 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Wed Oct 25 11:50:55 2023 +0200 [FLINK-33341][state] Add support for rescaling from local keyed state to RockDBIncrementalRestoreOperation. --- .../state/RocksDBIncrementalCheckpointUtils.java | 8 +- .../streaming/state/StateHandleDownloadSpec.java | 12 ++ .../RocksDBIncrementalRestoreOperation.java | 233 ++++++++++----------- 3 files changed, 129 insertions(+), 124 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 54121709876..a835d10c481 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -175,14 +175,14 @@ public class RocksDBIncrementalCheckpointUtils { * @return The best candidate or null if no candidate was a good fit. */ @Nullable - public static KeyedStateHandle chooseTheBestStateHandleForInitial( - @Nonnull Collection<KeyedStateHandle> restoreStateHandles, + public static <T extends KeyedStateHandle> T chooseTheBestStateHandleForInitial( + @Nonnull Collection<T> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) { - KeyedStateHandle bestStateHandle = null; + T bestStateHandle = null; Score bestScore = Score.MIN; - for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + for (T rawStateHandle : restoreStateHandles) { Score handleScore = stateHandleEvaluator( rawStateHandle, targetKeyGroupRange, overlapFractionThreshold); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java index 93a33fdc6fa..5f37f84921f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java @@ -18,6 +18,8 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import java.nio.file.Path; @@ -46,4 +48,14 @@ public class StateHandleDownloadSpec { public Path getDownloadDestination() { return downloadDestination; } + + public IncrementalLocalKeyedStateHandle createLocalStateHandleForDownloadedState() { + return new IncrementalLocalKeyedStateHandle( + stateHandle.getBackendIdentifier(), + stateHandle.getCheckpointId(), + new DirectoryStateHandle(downloadDestination), + stateHandle.getKeyGroupRange(), + stateHandle.getMetaDataStateHandle(), + stateHandle.getSharedState()); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index a62bbb4a70b..11d4756ae13 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -35,7 +35,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.BackendBuildingException; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; -import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; @@ -44,11 +43,9 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; -import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -227,35 +224,12 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper new StateHandleDownloadSpec(stateHandle, tmpRestoreInstancePath); try { transferRemoteStateToLocalDirectory(Collections.singletonList(downloadRequest)); - restoreBaseDBFromDownloadedState(downloadRequest); + restoreBaseDBFromLocalState(downloadRequest.createLocalStateHandleForDownloadedState()); } finally { cleanUpPathQuietly(downloadRequest.getDownloadDestination()); } } - /** - * This helper method creates a {@link IncrementalLocalKeyedStateHandle} for state that was - * previously downloaded for a {@link IncrementalRemoteKeyedStateHandle} and then invokes the - * restore procedure for local state on the downloaded state. - * - * @param downloadedState the specification of a completed state download. - * @throws Exception for restore problems. - */ - private void restoreBaseDBFromDownloadedState(StateHandleDownloadSpec downloadedState) - throws Exception { - // since we transferred all remote state to a local directory, we can use the same code - // as for local recovery. - IncrementalRemoteKeyedStateHandle stateHandle = downloadedState.getStateHandle(); - restoreBaseDBFromLocalState( - new IncrementalLocalKeyedStateHandle( - stateHandle.getBackendIdentifier(), - stateHandle.getCheckpointId(), - new DirectoryStateHandle(downloadedState.getDownloadDestination()), - stateHandle.getKeyGroupRange(), - stateHandle.getMetaDataStateHandle(), - stateHandle.getSharedState())); - } - /** Restores RocksDB instance from local state. */ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { @@ -304,113 +278,133 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); - Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs = - CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size()); - - // Choose the best state handle for the initial DB - final KeyedStateHandle selectedInitialHandle = - RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( - restoreStateHandles, keyGroupRange, overlapFractionThreshold); + final List<StateHandleDownloadSpec> allDownloadSpecs = new ArrayList<>(); - Preconditions.checkNotNull(selectedInitialHandle); + final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles = + new ArrayList<>(restoreStateHandles.size()); final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); // Prepare and collect all the download request to pull remote state to a local directory for (KeyedStateHandle stateHandle : restoreStateHandles) { - if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) { + if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { + StateHandleDownloadSpec downloadRequest = + new StateHandleDownloadSpec( + (IncrementalRemoteKeyedStateHandle) stateHandle, + absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); + allDownloadSpecs.add(downloadRequest); + } else if (stateHandle instanceof IncrementalLocalKeyedStateHandle) { + localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) stateHandle); + } else { throw unexpectedStateHandleException( IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); } - StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec( - (IncrementalRemoteKeyedStateHandle) stateHandle, - absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); - allDownloadSpecs.put(stateHandle.getStateHandleId(), downloadRequest); } - // Process all state downloads - transferRemoteStateToLocalDirectory(allDownloadSpecs.values()); - - // Init the base DB instance with the initial state - initBaseDBForRescaling(allDownloadSpecs.remove(selectedInitialHandle.getStateHandleId())); - - // Transfer remaining key-groups from temporary instance into base DB - byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); - - byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); - - // Insert all remaining state through creating temporary RocksDB instances - for (StateHandleDownloadSpec downloadRequest : allDownloadSpecs.values()) { - logger.info( - "Starting to restore from state handle: {} with rescaling.", - downloadRequest.getStateHandle()); - - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromDownloadedState(downloadRequest); - RocksDBWriteBatchWrapper writeBatchWrapper = - new RocksDBWriteBatchWrapper( - this.rocksHandle.getDb(), writeBatchSize)) { - - List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = - tmpRestoreDBInfo.columnFamilyDescriptors; - List<ColumnFamilyHandle> tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - // iterating only the requested descriptors automatically skips the default column - // family handle - for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { - ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); - - ColumnFamilyHandle targetColumnFamilyHandle = - this.rocksHandle.getOrRegisterStateColumnFamilyHandle( - null, - tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx)) - .columnFamilyHandle; - - try (RocksIteratorWrapper iterator = - RocksDBOperationUtils.getRocksIterator( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandle, - tmpRestoreDBInfo.readOptions)) { - - iterator.seek(startKeyGroupPrefixBytes); - - while (iterator.isValid()) { - - if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( - iterator.key(), stopKeyGroupPrefixBytes)) { - writeBatchWrapper.put( - targetColumnFamilyHandle, iterator.key(), iterator.value()); - } else { - // Since the iterator will visit the record according to the sorted - // order, - // we can just break here. - break; - } + allDownloadSpecs.stream() + .map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState) + .forEach(localKeyedStateHandles::add); - iterator.next(); - } - } // releases native iterator resources - } + // Choose the best state handle for the initial DB + final IncrementalLocalKeyedStateHandle selectedInitialHandle = + RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( + localKeyedStateHandles, keyGroupRange, overlapFractionThreshold); + Preconditions.checkNotNull(selectedInitialHandle); + // Remove the selected handle from the list so that we don't restore it twice. + localKeyedStateHandles.remove(selectedInitialHandle); + + try { + // Process all state downloads + transferRemoteStateToLocalDirectory(allDownloadSpecs); + + // Init the base DB instance with the initial state + initBaseDBForRescaling(selectedInitialHandle); + + // Transfer remaining key-groups from temporary instance into base DB + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + + byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + + // Insert all remaining state through creating temporary RocksDB instances + for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { logger.info( - "Finished restoring from state handle: {} with rescaling.", - downloadRequest.getStateHandle()); - } finally { - cleanUpPathQuietly(downloadRequest.getDownloadDestination()); + "Starting to restore from state handle: {} with rescaling.", stateHandle); + + try (RestoredDBInstance tmpRestoreDBInfo = + restoreTempDBInstanceFromLocalState(stateHandle); + RocksDBWriteBatchWrapper writeBatchWrapper = + new RocksDBWriteBatchWrapper( + this.rocksHandle.getDb(), writeBatchSize)) { + + List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = + tmpRestoreDBInfo.columnFamilyDescriptors; + List<ColumnFamilyHandle> tmpColumnFamilyHandles = + tmpRestoreDBInfo.columnFamilyHandles; + + // iterating only the requested descriptors automatically skips the default + // column + // family handle + for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { + ColumnFamilyHandle tmpColumnFamilyHandle = + tmpColumnFamilyHandles.get(descIdx); + + ColumnFamilyHandle targetColumnFamilyHandle = + this.rocksHandle.getOrRegisterStateColumnFamilyHandle( + null, + tmpRestoreDBInfo.stateMetaInfoSnapshots.get( + descIdx)) + .columnFamilyHandle; + + try (RocksIteratorWrapper iterator = + RocksDBOperationUtils.getRocksIterator( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandle, + tmpRestoreDBInfo.readOptions)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( + iterator.key(), stopKeyGroupPrefixBytes)) { + writeBatchWrapper.put( + targetColumnFamilyHandle, + iterator.key(), + iterator.value()); + } else { + // Since the iterator will visit the record according to the + // sorted + // order, + // we can just break here. + break; + } + + iterator.next(); + } + } // releases native iterator resources + } + logger.info( + "Finished restoring from state handle: {} with rescaling.", + stateHandle); + } } + } finally { + // Cleanup all download directories + allDownloadSpecs.stream() + .map(StateHandleDownloadSpec::getDownloadDestination) + .forEach(this::cleanUpPathQuietly); } } - private void initBaseDBForRescaling(StateHandleDownloadSpec downloadedInitialState) + private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle stateHandle) throws Exception { // 1. Restore base DB from selected initial handle - restoreBaseDBFromDownloadedState(downloadedInitialState); + restoreBaseDBFromLocalState(stateHandle); // 2. Clip the base DB instance try { @@ -418,7 +412,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.rocksHandle.getDb(), this.rocksHandle.getColumnFamilyHandles(), keyGroupRange, - downloadedInitialState.getStateHandle().getKeyGroupRange(), + stateHandle.getKeyGroupRange(), keyGroupPrefixBytes); } catch (RocksDBException e) { String errMsg = "Failed to clip DB after initialization."; @@ -470,11 +464,10 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } } - private RestoredDBInstance restoreTempDBInstanceFromDownloadedState( - StateHandleDownloadSpec downloadRequest) throws Exception { - + private RestoredDBInstance restoreTempDBInstanceFromLocalState( + IncrementalLocalKeyedStateHandle stateHandle) throws Exception { KeyedBackendSerializationProxy<K> serializationProxy = - readMetaData(downloadRequest.getStateHandle().getMetaDataStateHandle()); + readMetaData(stateHandle.getMetaDataStateHandle()); // read meta data List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); @@ -487,7 +480,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper RocksDB restoreDb = RocksDBOperationUtils.openDB( - downloadRequest.getDownloadDestination().toString(), + stateHandle.getDirectoryStateHandle().getDirectory().toString(), columnFamilyDescriptors, columnFamilyHandles, RocksDBOperationUtils.createColumnFamilyOptions(