This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16eae412a1219fec1aaf43381fb03709e52407bd Author: Stefan Richter <srich...@confluent.io> AuthorDate: Mon Jan 22 15:46:32 2024 +0100 [FLINK-34199] Add tracing for durations of rescaling/restoring (from local and downloaded remote state). --- .../java/org/apache/flink/util/CollectionUtil.java | 42 +++++++ .../org/apache/flink/util/CollectionUtilTest.java | 47 +++++++ .../apache/flink/runtime/metrics/MetricNames.java | 3 + .../runtime/state/AbstractChannelStateHandle.java | 2 + .../flink/runtime/state/DirectoryStateHandle.java | 8 +- .../runtime/state/IncrementalKeyedStateHandle.java | 11 ++ .../apache/flink/runtime/state/KeyGroupRange.java | 4 + .../flink/runtime/state/StateBackendTestBase.java | 27 +++- ...logDelegateEmbeddedRocksDBStateBackendTest.java | 5 + .../state/RocksDBKeyedStateBackendBuilder.java | 6 +- .../streaming/state/RocksDBStateDownloader.java | 14 +-- .../RocksDBIncrementalRestoreOperation.java | 137 ++++++++++++++------- .../state/EmbeddedRocksDBStateBackendTest.java | 40 +++++- .../state/RocksDBStateDownloaderTest.java | 17 +-- 14 files changed, 280 insertions(+), 83 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java index bf1714cb5b8..38adccc2cd9 100644 --- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java @@ -56,10 +56,21 @@ public final class CollectionUtil { throw new AssertionError(); } + /** Returns true if the given collection is null or empty. */ public static boolean isNullOrEmpty(Collection<?> collection) { return collection == null || collection.isEmpty(); } + /** Returns true if the given collection is empty or contains only null elements. */ + public static boolean isEmptyOrAllElementsNull(Collection<?> collection) { + for (Object o : collection) { + if (o != null) { + return false; + } + } + return true; + } + public static boolean isNullOrEmpty(Map<?, ?> map) { return map == null || map.isEmpty(); } @@ -214,4 +225,35 @@ public final class CollectionUtil { ? (int) Math.ceil(expectedSize / loadFactor) : Integer.MAX_VALUE; } + + /** + * Casts the given collection to a subtype. This is an unchecked cast that can lead to runtime + * exceptions. + * + * @param collection the collection to cast. + * @return the collection unchecked-cast to a subtype. + * @param <T> the subtype to cast to. + */ + public static <T> Collection<T> subTypeCast(Collection<? super T> collection) { + @SuppressWarnings("unchecked") + Collection<T> result = (Collection<T>) collection; + return result; + } + + /** + * Casts the given collection to a subtype. This is a checked cast. + * + * @param collection the collection to cast. + * @param subTypeClass the class of the subtype to cast to. + * @return the collection checked and cast to a subtype. + * @param <T> the subtype to cast to. + */ + public static <T> Collection<T> checkedSubTypeCast( + Collection<? super T> collection, Class<T> subTypeClass) { + for (Object o : collection) { + // probe each object, will throw ClassCastException on mismatch. + subTypeClass.cast(o); + } + return subTypeCast(collection); + } } diff --git a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java index fb2abfec020..50493316e51 100644 --- a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java @@ -22,12 +22,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import static org.apache.flink.util.CollectionUtil.HASH_MAP_DEFAULT_LOAD_FACTOR; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** Tests for java collection utilities. */ @ExtendWith(TestLoggerExtension.class) @@ -107,4 +111,47 @@ public class CollectionUtilTest { } catch (IllegalArgumentException expected) { } } + + @Test + public void testIsEmptyOrAllElementsNull() { + Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Collections.emptyList())); + Assertions.assertTrue( + CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList(null))); + Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, null))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList("test"))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test"))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList("test", null))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test", null))); + } + + @Test + public void testCheckedSubTypeCast() { + List<A> list = new ArrayList<>(); + B b = new B(); + C c = new C(); + list.add(b); + list.add(c); + list.add(null); + Collection<B> castSuccess = CollectionUtil.checkedSubTypeCast(list, B.class); + Iterator<B> iterator = castSuccess.iterator(); + Assertions.assertEquals(b, iterator.next()); + Assertions.assertEquals(c, iterator.next()); + Assertions.assertNull(iterator.next()); + Assertions.assertFalse(iterator.hasNext()); + try { + Collection<C> castFail = CollectionUtil.checkedSubTypeCast(list, C.class); + fail("Expected ClassCastException"); + } catch (ClassCastException expected) { + } + } + + static class A {} + + static class B extends A {} + + static class C extends B {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 4aa375020c9..392ad847117 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -69,7 +69,10 @@ public class MetricNames { public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs"; public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs"; public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs"; + public static final String RESTORE_STATE_DURATION = "RestoreStateDurationMs"; public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes"; + public static final String RESTORE_ASYNC_COMPACTION_DURATION = + "RestoreAsyncCompactionDurationMs"; public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java index 250d77c9f6d..f677acedfd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java @@ -129,6 +129,8 @@ public abstract class AbstractChannelStateHandle<Info> implements StateObject { + delegate + ", offsets=" + offsets + + ", size=" + + size + '}'; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java index 4e9ff682184..f9d5d24636d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java @@ -109,6 +109,12 @@ public class DirectoryStateHandle implements StateObject { @Override public String toString() { - return "DirectoryStateHandle{" + "directory=" + directoryString + '}'; + return "DirectoryStateHandle{" + + "directory='" + + directoryString + + '\'' + + ", directorySize=" + + directorySize + + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index f162efa936b..08e0fba18bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -100,5 +100,16 @@ public interface IncrementalKeyedStateHandle public int hashCode() { return Objects.hash(handle, localPath); } + + @Override + public String toString() { + return "HandleAndLocalPath{" + + "handle=" + + handle + + ", localPath='" + + localPath + + '\'' + + '}'; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java index 8b4be574d28..49f943a7b15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java @@ -135,6 +135,10 @@ public class KeyGroupRange implements KeyGroupsList, Serializable { + '}'; } + public String prettyPrintInterval() { + return "[" + startKeyGroup + ", " + endKeyGroup + "]"; + } + @Override public Iterator<Integer> iterator() { return new KeyGroupIterator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index d036955d0c8..1cec488d389 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -53,6 +53,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; @@ -217,13 +218,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), Collections.emptyList(), - new CloseableRegistry())); + new CloseableRegistry(), + 1.0d)); return backend; } + protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() { + return (name, value) -> {}; + } + protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend( TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception { return restoreKeyedBackend(keySerializer, state, env); @@ -255,9 +262,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), state, - new CloseableRegistry())); + new CloseableRegistry(), + 1.0d)); + } + + protected MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); } @TestTemplate @@ -283,9 +296,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { groupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), Collections.emptyList(), - cancelStreamRegistry)); + cancelStreamRegistry, + 1.0d)); try { KeyedStateBackend<Integer> nested = keyedStateBackend instanceof TestableKeyedStateBackend diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java index 5211e951e43..909c3c61704 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java @@ -123,4 +123,9 @@ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue( getStateBackend(), env, streamFactory); } + + @Override + protected boolean checkMetrics() { + return false; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 623f9c497ba..0d0b8e63992 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -471,7 +472,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { DBOptions dbOptions = optionsContainer.getDbOptions(); - if (restoreStateHandles.isEmpty()) { + if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new RocksDBNoneRestoreOperation<>( kvStateInformation, instanceRocksDBPath, @@ -500,7 +501,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken nativeMetricOptions, metricGroup, customInitializationMetrics, - restoreStateHandles, + CollectionUtil.checkedSubTypeCast( + restoreStateHandles, IncrementalKeyedStateHandle.class), ttlCompactFiltersManager, writeBatchSize, optionsContainer.getWriteBufferManagerCapacity(), diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index f7ea297d761..850bda6bad7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -19,13 +19,11 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; @@ -39,17 +37,11 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; - /** Help class for downloading RocksDB state files. */ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { - private final CustomInitializationMetrics customInitializationMetrics; - - public RocksDBStateDownloader( - int restoringThreadNum, CustomInitializationMetrics customInitializationMetrics) { + public RocksDBStateDownloader(int restoringThreadNum) { super(restoringThreadNum); - this.customInitializationMetrics = customInitializationMetrics; } /** @@ -68,15 +60,11 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { // Make sure we also react to external close signals. closeableRegistry.registerCloseable(internalCloser); try { - long startTimeMs = SystemClock.getInstance().relativeTimeMillis(); List<CompletableFuture<Void>> futures = transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. FutureUtils.completeAll(futures).get(); - customInitializationMetrics.addMetric( - DOWNLOAD_STATE_DURATION, - SystemClock.getInstance().relativeTimeMillis() - startTimeMs); } catch (Exception e) { downloadRequests.stream() .map(StateHandleDownloadSpec::getDownloadDestination) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 4d09b50b44e..c96d5d64224 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StateSerializerProvider; @@ -51,6 +50,7 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.RunnableWithException; @@ -84,6 +84,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; +import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; +import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION; +import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_STATE_DURATION; import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException; /** Encapsulates the process of restoring a RocksDB instance from an incremental snapshot. */ @@ -92,10 +95,18 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private static final Logger logger = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class); + @SuppressWarnings("unchecked") + private static final Class<? extends IncrementalKeyedStateHandle>[] + EXPECTED_STATE_HANDLE_CLASSES = + new Class[] { + IncrementalRemoteKeyedStateHandle.class, + IncrementalLocalKeyedStateHandle.class + }; + private final String operatorIdentifier; private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; private final RocksDBHandle rocksHandle; - private final Collection<KeyedStateHandle> restoreStateHandles; + private final Collection<IncrementalKeyedStateHandle> restoreStateHandles; private final CloseableRegistry cancelStreamRegistry; private final KeyGroupRange keyGroupRange; private final File instanceBasePath; @@ -131,7 +142,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, CustomInitializationMetrics customInitializationMetrics, - @Nonnull Collection<KeyedStateHandle> restoreStateHandles, + @Nonnull Collection<IncrementalKeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, @@ -169,7 +180,10 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.asyncCompactAfterRescale = false; } - /** Root method that branches for different implementations of {@link KeyedStateHandle}. */ + /** + * Root method that branches for different implementations of {@link + * IncrementalKeyedStateHandle}. + */ @Override public RocksDBRestoreResult restore() throws Exception { @@ -177,6 +191,15 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper return null; } + logger.info( + "Starting RocksDB incremental recovery in operator {} " + + "target key-group range {}. Use IngestDB={}, Use AsyncCompaction={}, State Handles={}", + operatorIdentifier, + keyGroupRange.prettyPrintInterval(), + useIngestDbRestoreMode, + asyncCompactAfterRescale, + restoreStateHandles); + final List<StateHandleDownloadSpec> allDownloadSpecs = new ArrayList<>(restoreStateHandles.size()); @@ -186,16 +209,16 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); try { - prepareStateHandleDownloadsToLocal( - absolutInstanceBasePath, localKeyedStateHandles, allDownloadSpecs); + runAndReportDuration( + () -> + makeAllStateHandlesLocal( + absolutInstanceBasePath, + localKeyedStateHandles, + allDownloadSpecs), + DOWNLOAD_STATE_DURATION); - if (localKeyedStateHandles.size() == 1) { - // This happens if we don't rescale and for some scale out scenarios. - initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0)); - } else { - // This happens for all scale ins and some scale outs. - restoreFromMultipleStateHandles(localKeyedStateHandles); - } + runAndReportDuration( + () -> restoreFromLocalState(localKeyedStateHandles), RESTORE_STATE_DURATION); CompletableFuture<Void> asyncCompactFuture = null; if (asyncCompactAfterRescale) { @@ -215,7 +238,9 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper backendUID, operatorIdentifier); try { - run.run(); + runAndReportDuration( + run, + RESTORE_ASYNC_COMPACTION_DURATION); logger.info( "Completed async compaction after restore for backend {} in operator {} after {} ms.", backendUID, @@ -240,6 +265,11 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper return resultFuture; }) .orElse(null); + logger.info( + "Finished RocksDB incremental recovery in operator {} with " + + "target key-group range range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); } return new RocksDBRestoreResult( @@ -258,10 +288,20 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } } + private void restoreFromLocalState( + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception { + if (localKeyedStateHandles.size() == 1) { + // This happens if we don't rescale and for some scale out scenarios. + initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0)); + } else { + // This happens for all scale ins and some scale outs. + restoreFromMultipleStateHandles(localKeyedStateHandles); + } + } + /** - * Prepares the download of all {@link IncrementalRemoteKeyedStateHandle}s to {@link - * IncrementalLocalKeyedStateHandle}s by creating the download specs and already converting the - * handle type. + * Downloads and converts all {@link IncrementalRemoteKeyedStateHandle}s to {@link + * IncrementalLocalKeyedStateHandle}s. * * @param absolutInstanceBasePath the base path of the restoring DB instance as absolute path. * @param localKeyedStateHandlesOut the output parameter for the created {@link @@ -269,14 +309,13 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper * @param allDownloadSpecsOut output parameter for the created download specs. * @throws Exception if an unexpected state handle type is passed as argument. */ - @SuppressWarnings("unchecked") - private void prepareStateHandleDownloadsToLocal( + private void makeAllStateHandlesLocal( Path absolutInstanceBasePath, List<IncrementalLocalKeyedStateHandle> localKeyedStateHandlesOut, List<StateHandleDownloadSpec> allDownloadSpecsOut) throws Exception { // Prepare and collect all the download request to pull remote state to a local directory - for (KeyedStateHandle stateHandle : restoreStateHandles) { + for (IncrementalKeyedStateHandle stateHandle : restoreStateHandles) { if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { StateHandleDownloadSpec downloadRequest = new StateHandleDownloadSpec( @@ -287,11 +326,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle) stateHandle); } else { throw unexpectedStateHandleException( - new Class[] { - IncrementalRemoteKeyedStateHandle.class, - IncrementalLocalKeyedStateHandle.class - }, - stateHandle.getClass()); + EXPECTED_STATE_HANDLE_CLASSES, stateHandle.getClass()); } } @@ -312,9 +347,9 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper throws Exception { logger.info( - "Starting to restore Base DB in backend with range {} in operator {} from selected state handle {}.", - keyGroupRange, + "Starting opening base RocksDB instance in operator {} with target key-group range {} from state handle {}.", operatorIdentifier, + keyGroupRange.prettyPrintInterval(), stateHandle); // Restore base DB from selected initial handle @@ -344,9 +379,9 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } } logger.info( - "Completed restoring backend with range {} in operator {} from selected state handle.", - keyGroupRange, - operatorIdentifier); + "Finished opening base RocksDB instance in operator {} with target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); } /** @@ -360,7 +395,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier, localKeyedStateHandles, useIngestDbRestoreMode); @@ -385,7 +420,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier, useIngestDbRestoreMode); } @@ -562,7 +597,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); // Choose the best state handle for the initial DB @@ -585,7 +620,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); } @@ -643,7 +678,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId(); logger.info( "Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier, backendUID, lastCompletedCheckpointId); @@ -664,11 +699,6 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory(); - logger.debug( - "Restoring keyed backend uid in operator {} from incremental snapshot to {}.", - operatorIdentifier, - backendUID); - this.rocksHandle.openDB( createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, @@ -684,11 +714,18 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper */ private void transferRemoteStateToLocalDirectory( Collection<StateHandleDownloadSpec> downloadSpecs) throws Exception { + logger.info( + "Start downloading remote state to local directory in operator {} for target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader( - numberOfTransferringThreads, customInitializationMetrics)) { + new RocksDBStateDownloader(numberOfTransferringThreads)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadSpecs, cancelStreamRegistry); + logger.info( + "Finished downloading remote state to local directory in operator {} for target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); } } @@ -713,7 +750,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Starting to copy state handles for backend with range {} in operator {} using temporary instances.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); try (RocksDBWriteBatchWrapper writeBatchWrapper = @@ -732,7 +769,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Competed copying state handles for backend with range {} in operator {} using temporary instances.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); } @@ -755,7 +792,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.debug( "Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", tmpRestoreDBInfo.srcStateHandle, - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = @@ -802,7 +839,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.debug( "Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", tmpRestoreDBInfo.srcStateHandle, - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); } @@ -920,6 +957,14 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper return columnFamilyDescriptors; } + private void runAndReportDuration(RunnableWithException runnable, String metricName) + throws Exception { + final SystemClock clock = SystemClock.getInstance(); + final long startTime = clock.relativeTimeMillis(); + runnable.run(); + customInitializationMetrics.addMetric(metricName, clock.relativeTimeMillis() - startTime); + } + /** Reads Flink's state meta data file from the state handle. */ private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index d41b166e3c9..5feeeaf5608 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -24,22 +24,26 @@ import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.SharedStateRegistryKey; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -51,6 +55,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.function.SupplierWithException; @@ -80,6 +85,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -155,6 +161,7 @@ public class EmbeddedRocksDBStateBackendTest private ColumnFamilyHandle defaultCFHandle = null; private RocksDBStateUploader rocksDBStateUploader = null; private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); + private final HashMap<String, Long> initMetricBackingMap = new HashMap<>(); public void prepareRocksDB() throws Exception { String dbPath = @@ -180,7 +187,7 @@ public class EmbeddedRocksDBStateBackendTest EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); Configuration configuration = new Configuration(); - configuration.setBoolean(USE_INGEST_DB_RESTORE_MODE, useIngestDB); + configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB); configuration.set( RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); @@ -189,6 +196,14 @@ public class EmbeddedRocksDBStateBackendTest return backend; } + @Override + protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() { + return (name, value) -> { + initMetricBackingMap.compute( + name, (key, oldValue) -> oldValue == null ? value : value + oldValue); + }; + } + @Override protected CheckpointStorage getCheckpointStorage() throws Exception { return storageSupplier.get(); @@ -672,6 +687,29 @@ public class EmbeddedRocksDBStateBackendTest } } + protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend( + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + List<KeyedStateHandle> state, + Environment env) + throws Exception { + CheckpointableKeyedStateBackend<K> restoreResult = + super.restoreKeyedBackend( + keySerializer, numberOfKeyGroups, keyGroupRange, state, env); + + // If something was restored, check that all expected metrics are present. + if (checkMetrics() && !CollectionUtil.isEmptyOrAllElementsNull(state)) { + assertThat(initMetricBackingMap.keySet()) + .containsExactlyInAnyOrder("RestoreStateDurationMs", "DownloadStateDurationMs"); + } + return restoreResult; + } + + protected boolean checkMetrics() { + return true; + } + private static class AcceptAllFilter implements IOFileFilter { @Override public boolean accept(File file) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index 3177f373f6a..dd5026da052 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -40,14 +39,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Random; -import java.util.Set; import java.util.UUID; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -75,8 +71,7 @@ public class RocksDBStateDownloaderTest extends TestLogger { stateHandles, stateHandle); - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(5, (key, value) -> {})) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { rocksDBStateDownloader.transferAllStateDataToDirectory( Collections.singletonList( new StateHandleDownloadSpec( @@ -102,16 +97,11 @@ public class RocksDBStateDownloaderTest extends TestLogger { temporaryFolder.newFolder().toPath(), contents[i], i)); } - Set<String> customMetrics = new HashSet<>(); - - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(4, (key, value) -> customMetrics.add(key))) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(4)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, new CloseableRegistry()); } - assertThat(customMetrics).containsExactly(MetricNames.DOWNLOAD_STATE_DURATION); - for (int i = 0; i < numRemoteHandles; ++i) { StateHandleDownloadSpec downloadRequest = downloadRequests.get(i); Path dstPath = downloadRequest.getDownloadDestination(); @@ -148,8 +138,7 @@ public class RocksDBStateDownloaderTest extends TestLogger { "error-handle")); CloseableRegistry closeableRegistry = new CloseableRegistry(); - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(5, (key, value) -> {})) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, closeableRegistry); fail("Exception is expected");