This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16eae412a1219fec1aaf43381fb03709e52407bd
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Mon Jan 22 15:46:32 2024 +0100

    [FLINK-34199] Add tracing for durations of rescaling/restoring (from local 
and downloaded remote state).
---
 .../java/org/apache/flink/util/CollectionUtil.java |  42 +++++++
 .../org/apache/flink/util/CollectionUtilTest.java  |  47 +++++++
 .../apache/flink/runtime/metrics/MetricNames.java  |   3 +
 .../runtime/state/AbstractChannelStateHandle.java  |   2 +
 .../flink/runtime/state/DirectoryStateHandle.java  |   8 +-
 .../runtime/state/IncrementalKeyedStateHandle.java |  11 ++
 .../apache/flink/runtime/state/KeyGroupRange.java  |   4 +
 .../flink/runtime/state/StateBackendTestBase.java  |  27 +++-
 ...logDelegateEmbeddedRocksDBStateBackendTest.java |   5 +
 .../state/RocksDBKeyedStateBackendBuilder.java     |   6 +-
 .../streaming/state/RocksDBStateDownloader.java    |  14 +--
 .../RocksDBIncrementalRestoreOperation.java        | 137 ++++++++++++++-------
 .../state/EmbeddedRocksDBStateBackendTest.java     |  40 +++++-
 .../state/RocksDBStateDownloaderTest.java          |  17 +--
 14 files changed, 280 insertions(+), 83 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
index bf1714cb5b8..38adccc2cd9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
@@ -56,10 +56,21 @@ public final class CollectionUtil {
         throw new AssertionError();
     }
 
+    /** Returns true if the given collection is null or empty. */
     public static boolean isNullOrEmpty(Collection<?> collection) {
         return collection == null || collection.isEmpty();
     }
 
+    /** Returns true if the given collection is empty or contains only null 
elements. */
+    public static boolean isEmptyOrAllElementsNull(Collection<?> collection) {
+        for (Object o : collection) {
+            if (o != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     public static boolean isNullOrEmpty(Map<?, ?> map) {
         return map == null || map.isEmpty();
     }
@@ -214,4 +225,35 @@ public final class CollectionUtil {
                 ? (int) Math.ceil(expectedSize / loadFactor)
                 : Integer.MAX_VALUE;
     }
+
+    /**
+     * Casts the given collection to a subtype. This is an unchecked cast that 
can lead to runtime
+     * exceptions.
+     *
+     * @param collection the collection to cast.
+     * @return the collection unchecked-cast to a subtype.
+     * @param <T> the subtype to cast to.
+     */
+    public static <T> Collection<T> subTypeCast(Collection<? super T> 
collection) {
+        @SuppressWarnings("unchecked")
+        Collection<T> result = (Collection<T>) collection;
+        return result;
+    }
+
+    /**
+     * Casts the given collection to a subtype. This is a checked cast.
+     *
+     * @param collection the collection to cast.
+     * @param subTypeClass the class of the subtype to cast to.
+     * @return the collection checked and cast to a subtype.
+     * @param <T> the subtype to cast to.
+     */
+    public static <T> Collection<T> checkedSubTypeCast(
+            Collection<? super T> collection, Class<T> subTypeClass) {
+        for (Object o : collection) {
+            // probe each object, will throw ClassCastException on mismatch.
+            subTypeClass.cast(o);
+        }
+        return subTypeCast(collection);
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java 
b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java
index fb2abfec020..50493316e51 100644
--- a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java
@@ -22,12 +22,16 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import static 
org.apache.flink.util.CollectionUtil.HASH_MAP_DEFAULT_LOAD_FACTOR;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for java collection utilities. */
 @ExtendWith(TestLoggerExtension.class)
@@ -107,4 +111,47 @@ public class CollectionUtilTest {
         } catch (IllegalArgumentException expected) {
         }
     }
+
+    @Test
+    public void testIsEmptyOrAllElementsNull() {
+        
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Collections.emptyList()));
+        Assertions.assertTrue(
+                
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList(null)));
+        
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null,
 null)));
+        Assertions.assertFalse(
+                
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList("test")));
+        Assertions.assertFalse(
+                CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, 
"test")));
+        Assertions.assertFalse(
+                CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList("test", 
null)));
+        Assertions.assertFalse(
+                CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, 
"test", null)));
+    }
+
+    @Test
+    public void testCheckedSubTypeCast() {
+        List<A> list = new ArrayList<>();
+        B b = new B();
+        C c = new C();
+        list.add(b);
+        list.add(c);
+        list.add(null);
+        Collection<B> castSuccess = CollectionUtil.checkedSubTypeCast(list, 
B.class);
+        Iterator<B> iterator = castSuccess.iterator();
+        Assertions.assertEquals(b, iterator.next());
+        Assertions.assertEquals(c, iterator.next());
+        Assertions.assertNull(iterator.next());
+        Assertions.assertFalse(iterator.hasNext());
+        try {
+            Collection<C> castFail = CollectionUtil.checkedSubTypeCast(list, 
C.class);
+            fail("Expected ClassCastException");
+        } catch (ClassCastException expected) {
+        }
+    }
+
+    static class A {}
+
+    static class B extends A {}
+
+    static class C extends B {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 4aa375020c9..392ad847117 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -69,7 +69,10 @@ public class MetricNames {
     public static final String INITIALIZE_STATE_DURATION = 
"InitializeStateDurationMs";
     public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
     public static final String DOWNLOAD_STATE_DURATION = 
"DownloadStateDurationMs";
+    public static final String RESTORE_STATE_DURATION = 
"RestoreStateDurationMs";
     public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes";
+    public static final String RESTORE_ASYNC_COMPACTION_DURATION =
+            "RestoreAsyncCompactionDurationMs";
 
     public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" 
+ SUFFIX_RATE;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
index 250d77c9f6d..f677acedfd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
@@ -129,6 +129,8 @@ public abstract class AbstractChannelStateHandle<Info> 
implements StateObject {
                 + delegate
                 + ", offsets="
                 + offsets
+                + ", size="
+                + size
                 + '}';
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
index 4e9ff682184..f9d5d24636d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -109,6 +109,12 @@ public class DirectoryStateHandle implements StateObject {
 
     @Override
     public String toString() {
-        return "DirectoryStateHandle{" + "directory=" + directoryString + '}';
+        return "DirectoryStateHandle{"
+                + "directory='"
+                + directoryString
+                + '\''
+                + ", directorySize="
+                + directorySize
+                + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index f162efa936b..08e0fba18bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -100,5 +100,16 @@ public interface IncrementalKeyedStateHandle
         public int hashCode() {
             return Objects.hash(handle, localPath);
         }
+
+        @Override
+        public String toString() {
+            return "HandleAndLocalPath{"
+                    + "handle="
+                    + handle
+                    + ", localPath='"
+                    + localPath
+                    + '\''
+                    + '}';
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
index 8b4be574d28..49f943a7b15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -135,6 +135,10 @@ public class KeyGroupRange implements KeyGroupsList, 
Serializable {
                 + '}';
     }
 
+    public String prettyPrintInterval() {
+        return "[" + startKeyGroup + ", " + endKeyGroup + "]";
+    }
+
     @Override
     public Iterator<Integer> iterator() {
         return new KeyGroupIterator();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index d036955d0c8..1cec488d389 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -217,13 +218,19 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                         keyGroupRange,
                                         env.getTaskKvStateRegistry(),
                                         TtlTimeProvider.DEFAULT,
-                                        new UnregisteredMetricsGroup(),
+                                        getMetricGroup(),
+                                        getCustomInitializationMetrics(),
                                         Collections.emptyList(),
-                                        new CloseableRegistry()));
+                                        new CloseableRegistry(),
+                                        1.0d));
 
         return backend;
     }
 
+    protected StateBackend.CustomInitializationMetrics 
getCustomInitializationMetrics() {
+        return (name, value) -> {};
+    }
+
     protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
             TypeSerializer<K> keySerializer, KeyedStateHandle state) throws 
Exception {
         return restoreKeyedBackend(keySerializer, state, env);
@@ -255,9 +262,15 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                 keyGroupRange,
                                 env.getTaskKvStateRegistry(),
                                 TtlTimeProvider.DEFAULT,
-                                new UnregisteredMetricsGroup(),
+                                getMetricGroup(),
+                                getCustomInitializationMetrics(),
                                 state,
-                                new CloseableRegistry()));
+                                new CloseableRegistry(),
+                                1.0d));
+    }
+
+    protected MetricGroup getMetricGroup() {
+        return new UnregisteredMetricsGroup();
     }
 
     @TestTemplate
@@ -283,9 +296,11 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                 groupRange,
                                 kvStateRegistry,
                                 TtlTimeProvider.DEFAULT,
-                                new UnregisteredMetricsGroup(),
+                                getMetricGroup(),
+                                getCustomInitializationMetrics(),
                                 Collections.emptyList(),
-                                cancelStreamRegistry));
+                                cancelStreamRegistry,
+                                1.0d));
         try {
             KeyedStateBackend<Integer> nested =
                     keyedStateBackend instanceof TestableKeyedStateBackend
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index 5211e951e43..909c3c61704 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -123,4 +123,9 @@ public class 
ChangelogDelegateEmbeddedRocksDBStateBackendTest
         ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(
                 getStateBackend(), env, streamFactory);
     }
+
+    @Override
+    protected boolean checkMetrics() {
+        return false;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 623f9c497ba..0d0b8e63992 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -471,7 +472,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
         DBOptions dbOptions = optionsContainer.getDbOptions();
-        if (restoreStateHandles.isEmpty()) {
+        if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
             return new RocksDBNoneRestoreOperation<>(
                     kvStateInformation,
                     instanceRocksDBPath,
@@ -500,7 +501,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     nativeMetricOptions,
                     metricGroup,
                     customInitializationMetrics,
-                    restoreStateHandles,
+                    CollectionUtil.checkedSubTypeCast(
+                            restoreStateHandles, 
IncrementalKeyedStateHandle.class),
                     ttlCompactFiltersManager,
                     writeBatchSize,
                     optionsContainer.getWriteBufferManagerCapacity(),
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index f7ea297d761..850bda6bad7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,13 +19,11 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -39,17 +37,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
-
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
 
-    private final CustomInitializationMetrics customInitializationMetrics;
-
-    public RocksDBStateDownloader(
-            int restoringThreadNum, CustomInitializationMetrics 
customInitializationMetrics) {
+    public RocksDBStateDownloader(int restoringThreadNum) {
         super(restoringThreadNum);
-        this.customInitializationMetrics = customInitializationMetrics;
     }
 
     /**
@@ -68,15 +60,11 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
         // Make sure we also react to external close signals.
         closeableRegistry.registerCloseable(internalCloser);
         try {
-            long startTimeMs = SystemClock.getInstance().relativeTimeMillis();
             List<CompletableFuture<Void>> futures =
                     transferAllStateDataToDirectoryAsync(downloadRequests, 
internalCloser)
                             .collect(Collectors.toList());
             // Wait until either all futures completed successfully or one 
failed exceptionally.
             FutureUtils.completeAll(futures).get();
-            customInitializationMetrics.addMetric(
-                    DOWNLOAD_STATE_DURATION,
-                    SystemClock.getInstance().relativeTimeMillis() - 
startTimeMs);
         } catch (Exception e) {
             downloadRequests.stream()
                     .map(StateHandleDownloadSpec::getDownloadDestination)
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 4d09b50b44e..c96d5d64224 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -41,7 +41,6 @@ import 
org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
 import org.apache.flink.runtime.state.StateSerializerProvider;
@@ -51,6 +50,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -84,6 +84,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
 
+import static 
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
+import static 
org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION;
+import static 
org.apache.flink.runtime.metrics.MetricNames.RESTORE_STATE_DURATION;
 import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
 
 /** Encapsulates the process of restoring a RocksDB instance from an 
incremental snapshot. */
@@ -92,10 +95,18 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     private static final Logger logger =
             LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
+    @SuppressWarnings("unchecked")
+    private static final Class<? extends IncrementalKeyedStateHandle>[]
+            EXPECTED_STATE_HANDLE_CLASSES =
+                    new Class[] {
+                        IncrementalRemoteKeyedStateHandle.class,
+                        IncrementalLocalKeyedStateHandle.class
+                    };
+
     private final String operatorIdentifier;
     private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
     private final RocksDBHandle rocksHandle;
-    private final Collection<KeyedStateHandle> restoreStateHandles;
+    private final Collection<IncrementalKeyedStateHandle> restoreStateHandles;
     private final CloseableRegistry cancelStreamRegistry;
     private final KeyGroupRange keyGroupRange;
     private final File instanceBasePath;
@@ -131,7 +142,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
             CustomInitializationMetrics customInitializationMetrics,
-            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
+            @Nonnull Collection<IncrementalKeyedStateHandle> 
restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity,
@@ -169,7 +180,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.asyncCompactAfterRescale = false;
     }
 
-    /** Root method that branches for different implementations of {@link 
KeyedStateHandle}. */
+    /**
+     * Root method that branches for different implementations of {@link
+     * IncrementalKeyedStateHandle}.
+     */
     @Override
     public RocksDBRestoreResult restore() throws Exception {
 
@@ -177,6 +191,15 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             return null;
         }
 
+        logger.info(
+                "Starting RocksDB incremental recovery in operator {} "
+                        + "target key-group range {}. Use IngestDB={}, Use 
AsyncCompaction={}, State Handles={}",
+                operatorIdentifier,
+                keyGroupRange.prettyPrintInterval(),
+                useIngestDbRestoreMode,
+                asyncCompactAfterRescale,
+                restoreStateHandles);
+
         final List<StateHandleDownloadSpec> allDownloadSpecs =
                 new ArrayList<>(restoreStateHandles.size());
 
@@ -186,16 +209,16 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
 
         try {
-            prepareStateHandleDownloadsToLocal(
-                    absolutInstanceBasePath, localKeyedStateHandles, 
allDownloadSpecs);
+            runAndReportDuration(
+                    () ->
+                            makeAllStateHandlesLocal(
+                                    absolutInstanceBasePath,
+                                    localKeyedStateHandles,
+                                    allDownloadSpecs),
+                    DOWNLOAD_STATE_DURATION);
 
-            if (localKeyedStateHandles.size() == 1) {
-                // This happens if we don't rescale and for some scale out 
scenarios.
-                initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0));
-            } else {
-                // This happens for all scale ins and some scale outs.
-                restoreFromMultipleStateHandles(localKeyedStateHandles);
-            }
+            runAndReportDuration(
+                    () -> restoreFromLocalState(localKeyedStateHandles), 
RESTORE_STATE_DURATION);
 
             CompletableFuture<Void> asyncCompactFuture = null;
             if (asyncCompactAfterRescale) {
@@ -215,7 +238,9 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                                                                 backendUID,
                                                                 
operatorIdentifier);
                                                         try {
-                                                            run.run();
+                                                            
runAndReportDuration(
+                                                                    run,
+                                                                    
RESTORE_ASYNC_COMPACTION_DURATION);
                                                             logger.info(
                                                                     "Completed 
async compaction after restore for backend {} in operator {} after {} ms.",
                                                                     backendUID,
@@ -240,6 +265,11 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                                             return resultFuture;
                                         })
                                 .orElse(null);
+                logger.info(
+                        "Finished RocksDB incremental recovery in operator {} 
with "
+                                + "target key-group range range {}.",
+                        operatorIdentifier,
+                        keyGroupRange.prettyPrintInterval());
             }
 
             return new RocksDBRestoreResult(
@@ -258,10 +288,20 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         }
     }
 
+    private void restoreFromLocalState(
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) 
throws Exception {
+        if (localKeyedStateHandles.size() == 1) {
+            // This happens if we don't rescale and for some scale out 
scenarios.
+            initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0));
+        } else {
+            // This happens for all scale ins and some scale outs.
+            restoreFromMultipleStateHandles(localKeyedStateHandles);
+        }
+    }
+
     /**
-     * Prepares the download of all {@link IncrementalRemoteKeyedStateHandle}s 
to {@link
-     * IncrementalLocalKeyedStateHandle}s by creating the download specs and 
already converting the
-     * handle type.
+     * Downloads and converts all {@link IncrementalRemoteKeyedStateHandle}s 
to {@link
+     * IncrementalLocalKeyedStateHandle}s.
      *
      * @param absolutInstanceBasePath the base path of the restoring DB 
instance as absolute path.
      * @param localKeyedStateHandlesOut the output parameter for the created 
{@link
@@ -269,14 +309,13 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      * @param allDownloadSpecsOut output parameter for the created download 
specs.
      * @throws Exception if an unexpected state handle type is passed as 
argument.
      */
-    @SuppressWarnings("unchecked")
-    private void prepareStateHandleDownloadsToLocal(
+    private void makeAllStateHandlesLocal(
             Path absolutInstanceBasePath,
             List<IncrementalLocalKeyedStateHandle> localKeyedStateHandlesOut,
             List<StateHandleDownloadSpec> allDownloadSpecsOut)
             throws Exception {
         // Prepare and collect all the download request to pull remote state 
to a local directory
-        for (KeyedStateHandle stateHandle : restoreStateHandles) {
+        for (IncrementalKeyedStateHandle stateHandle : restoreStateHandles) {
             if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
                 StateHandleDownloadSpec downloadRequest =
                         new StateHandleDownloadSpec(
@@ -287,11 +326,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 
localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle) stateHandle);
             } else {
                 throw unexpectedStateHandleException(
-                        new Class[] {
-                            IncrementalRemoteKeyedStateHandle.class,
-                            IncrementalLocalKeyedStateHandle.class
-                        },
-                        stateHandle.getClass());
+                        EXPECTED_STATE_HANDLE_CLASSES, stateHandle.getClass());
             }
         }
 
@@ -312,9 +347,9 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             throws Exception {
 
         logger.info(
-                "Starting to restore Base DB in backend with range {} in 
operator {} from selected state handle {}.",
-                keyGroupRange,
+                "Starting opening base RocksDB instance in operator {} with 
target key-group range {} from state handle {}.",
                 operatorIdentifier,
+                keyGroupRange.prettyPrintInterval(),
                 stateHandle);
 
         // Restore base DB from selected initial handle
@@ -344,9 +379,9 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             }
         }
         logger.info(
-                "Completed restoring backend with range {} in operator {} from 
selected state handle.",
-                keyGroupRange,
-                operatorIdentifier);
+                "Finished opening base RocksDB instance in operator {} with 
target key-group range {}.",
+                operatorIdentifier,
+                keyGroupRange.prettyPrintInterval());
     }
 
     /**
@@ -360,7 +395,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Starting to restore backend with range {} in operator {} from 
multiple state handles {} with useIngestDbRestoreMode = {}.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier,
                 localKeyedStateHandles,
                 useIngestDbRestoreMode);
@@ -385,7 +420,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Completed restoring backend with range {} in operator {} from 
multiple state handles with useIngestDbRestoreMode = {}.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier,
                 useIngestDbRestoreMode);
     }
@@ -562,7 +597,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Starting to merge state for backend with range {} in operator 
{} from multiple state handles using temporary instances.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
 
         // Choose the best state handle for the initial DB
@@ -585,7 +620,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Completed merging state for backend with range {} in operator 
{} from multiple state handles using temporary instances.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
     }
 
@@ -643,7 +678,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId();
         logger.info(
                 "Restored previous incremental files status in backend with 
range {} in operator {}: backend uuid {}, last checkpoint id {}.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier,
                 backendUID,
                 lastCompletedCheckpointId);
@@ -664,11 +699,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         Path restoreSourcePath = 
localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
 
-        logger.debug(
-                "Restoring keyed backend uid in operator {} from incremental 
snapshot to {}.",
-                operatorIdentifier,
-                backendUID);
-
         this.rocksHandle.openDB(
                 createColumnFamilyDescriptors(stateMetaInfoSnapshots, true),
                 stateMetaInfoSnapshots,
@@ -684,11 +714,18 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      */
     private void transferRemoteStateToLocalDirectory(
             Collection<StateHandleDownloadSpec> downloadSpecs) throws 
Exception {
+        logger.info(
+                "Start downloading remote state to local directory in operator 
{} for target key-group range {}.",
+                operatorIdentifier,
+                keyGroupRange.prettyPrintInterval());
         try (RocksDBStateDownloader rocksDBStateDownloader =
-                new RocksDBStateDownloader(
-                        numberOfTransferringThreads, 
customInitializationMetrics)) {
+                new RocksDBStateDownloader(numberOfTransferringThreads)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadSpecs, cancelStreamRegistry);
+            logger.info(
+                    "Finished downloading remote state to local directory in 
operator {} for target key-group range {}.",
+                    operatorIdentifier,
+                    keyGroupRange.prettyPrintInterval());
         }
     }
 
@@ -713,7 +750,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Starting to copy state handles for backend with range {} in 
operator {} using temporary instances.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
 
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
@@ -732,7 +769,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Competed copying state handles for backend with range {} in 
operator {} using temporary instances.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
     }
 
@@ -755,7 +792,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         logger.debug(
                 "Starting copy of state handle {} for backend with range {} in 
operator {} to base DB using temporary instance.",
                 tmpRestoreDBInfo.srcStateHandle,
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
 
         List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
@@ -802,7 +839,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         logger.debug(
                 "Finished copy of state handle {} for backend with range {} in 
operator {} using temporary instance.",
                 tmpRestoreDBInfo.srcStateHandle,
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
     }
 
@@ -920,6 +957,14 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         return columnFamilyDescriptors;
     }
 
+    private void runAndReportDuration(RunnableWithException runnable, String 
metricName)
+            throws Exception {
+        final SystemClock clock = SystemClock.getInstance();
+        final long startTime = clock.relativeTimeMillis();
+        runnable.run();
+        customInitializationMetrics.addMetric(metricName, 
clock.relativeTimeMillis() - startTime);
+    }
+
     /** Reads Flink's state meta data file from the state handle. */
     private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle 
metaStateHandle)
             throws Exception {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index d41b166e3c9..5feeeaf5608 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -24,22 +24,26 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -51,6 +55,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -80,6 +85,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -155,6 +161,7 @@ public class EmbeddedRocksDBStateBackendTest
     private ColumnFamilyHandle defaultCFHandle = null;
     private RocksDBStateUploader rocksDBStateUploader = null;
     private final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
+    private final HashMap<String, Long> initMetricBackingMap = new HashMap<>();
 
     public void prepareRocksDB() throws Exception {
         String dbPath =
@@ -180,7 +187,7 @@ public class EmbeddedRocksDBStateBackendTest
         EmbeddedRocksDBStateBackend backend =
                 new 
EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
         Configuration configuration = new Configuration();
-        configuration.setBoolean(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
+        configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
         configuration.set(
                 RocksDBOptions.TIMER_SERVICE_FACTORY,
                 EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
@@ -189,6 +196,14 @@ public class EmbeddedRocksDBStateBackendTest
         return backend;
     }
 
+    @Override
+    protected StateBackend.CustomInitializationMetrics 
getCustomInitializationMetrics() {
+        return (name, value) -> {
+            initMetricBackingMap.compute(
+                    name, (key, oldValue) -> oldValue == null ? value : value 
+ oldValue);
+        };
+    }
+
     @Override
     protected CheckpointStorage getCheckpointStorage() throws Exception {
         return storageSupplier.get();
@@ -672,6 +687,29 @@ public class EmbeddedRocksDBStateBackendTest
         }
     }
 
+    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
+            TypeSerializer<K> keySerializer,
+            int numberOfKeyGroups,
+            KeyGroupRange keyGroupRange,
+            List<KeyedStateHandle> state,
+            Environment env)
+            throws Exception {
+        CheckpointableKeyedStateBackend<K> restoreResult =
+                super.restoreKeyedBackend(
+                        keySerializer, numberOfKeyGroups, keyGroupRange, 
state, env);
+
+        // If something was restored, check that all expected metrics are 
present.
+        if (checkMetrics() && !CollectionUtil.isEmptyOrAllElementsNull(state)) 
{
+            assertThat(initMetricBackingMap.keySet())
+                    .containsExactlyInAnyOrder("RestoreStateDurationMs", 
"DownloadStateDurationMs");
+        }
+        return restoreResult;
+    }
+
+    protected boolean checkMetrics() {
+        return true;
+    }
+
     private static class AcceptAllFilter implements IOFileFilter {
         @Override
         public boolean accept(File file) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index 3177f373f6a..dd5026da052 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.metrics.MetricNames;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -40,14 +39,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
-import java.util.Set;
 import java.util.UUID;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -75,8 +71,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                         stateHandles,
                         stateHandle);
 
-        try (RocksDBStateDownloader rocksDBStateDownloader =
-                new RocksDBStateDownloader(5, (key, value) -> {})) {
+        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(5)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     Collections.singletonList(
                             new StateHandleDownloadSpec(
@@ -102,16 +97,11 @@ public class RocksDBStateDownloaderTest extends TestLogger 
{
                             temporaryFolder.newFolder().toPath(), contents[i], 
i));
         }
 
-        Set<String> customMetrics = new HashSet<>();
-
-        try (RocksDBStateDownloader rocksDBStateDownloader =
-                new RocksDBStateDownloader(4, (key, value) -> 
customMetrics.add(key))) {
+        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(4)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadRequests, new CloseableRegistry());
         }
 
-        
assertThat(customMetrics).containsExactly(MetricNames.DOWNLOAD_STATE_DURATION);
-
         for (int i = 0; i < numRemoteHandles; ++i) {
             StateHandleDownloadSpec downloadRequest = downloadRequests.get(i);
             Path dstPath = downloadRequest.getDownloadDestination();
@@ -148,8 +138,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                                 "error-handle"));
 
         CloseableRegistry closeableRegistry = new CloseableRegistry();
-        try (RocksDBStateDownloader rocksDBStateDownloader =
-                new RocksDBStateDownloader(5, (key, value) -> {})) {
+        try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(5)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
                     downloadRequests, closeableRegistry);
             fail("Exception is expected");


Reply via email to