This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7564c810e55f952ea7014a707c9487b777131c2d
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jan 27 12:34:27 2021 +0100

    [FLINK-21206] Write savepoints in unified format from HeapStateBackend
    
    This closes #14925
---
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  19 ++-
 .../state/heap/HeapKeyedStateBackendBuilder.java   |  16 +-
 .../runtime/state/heap/HeapSavepointStrategy.java  |  95 +++++++++++
 .../runtime/state/heap/HeapSnapshotResources.java  | 190 +++++++++++++++++++++
 .../runtime/state/heap/HeapSnapshotStrategy.java   | 116 ++-----------
 flink-tests/pom.xml                                |   9 +
 .../state/HeapSavepointStateBackendSwitchTest.java |   4 +-
 7 files changed, 338 insertions(+), 111 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index c81bb68..0b42a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -106,7 +106,9 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
      * The snapshot strategy for this backend. This determines, e.g., if 
snapshots are synchronous
      * or asynchronous.
      */
-    private final SnapshotStrategyRunner<KeyedStateHandle, ?> 
snapshotStrategyRunner;
+    private final SnapshotStrategyRunner<KeyedStateHandle, ?> 
checkpointStrategyRunner;
+
+    private final SnapshotStrategyRunner<KeyedStateHandle, ?> 
savepointStrategyRunner;
 
     private final StateTableFactory<K> stateTableFactory;
 
@@ -125,7 +127,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
             LocalRecoveryConfig localRecoveryConfig,
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
-            SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
+            SnapshotStrategyRunner<KeyedStateHandle, ?> 
checkpointStrategyRunner,
+            SnapshotStrategyRunner<KeyedStateHandle, ?> 
savepointStrategyRunner,
             StateTableFactory<K> stateTableFactory,
             InternalKeyContext<K> keyContext) {
         super(
@@ -141,7 +144,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         this.registeredPQStates = registeredPQStates;
         this.localRecoveryConfig = localRecoveryConfig;
         this.priorityQueueSetFactory = priorityQueueSetFactory;
-        this.snapshotStrategyRunner = snapshotStrategyRunner;
+        this.checkpointStrategyRunner = checkpointStrategyRunner;
+        this.savepointStrategyRunner = savepointStrategyRunner;
         this.stateTableFactory = stateTableFactory;
         LOG.info("Initializing heap keyed state backend with stream factory.");
     }
@@ -356,8 +360,13 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
 
-        return snapshotStrategyRunner.snapshot(
-                checkpointId, timestamp, streamFactory, checkpointOptions);
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            return savepointStrategyRunner.snapshot(
+                    checkpointId, timestamp, streamFactory, checkpointOptions);
+        } else {
+            return checkpointStrategyRunner.snapshot(
+                    checkpointId, timestamp, streamFactory, checkpointOptions);
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 6519461..3991ba0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -96,6 +96,14 @@ public class HeapKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
         CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
         HeapSnapshotStrategy<K> snapshotStrategy =
                 initSnapshotStrategy(registeredKVStates, registeredPQStates);
+        HeapSavepointStrategy<K> savepointStrategy =
+                new HeapSavepointStrategy<>(
+                        registeredKVStates,
+                        registeredPQStates,
+                        keyGroupCompressionDecorator,
+                        keyGroupRange,
+                        keySerializerProvider,
+                        numberOfKeyGroups);
         InternalKeyContext<K> keyContext =
                 new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
 
@@ -124,6 +132,11 @@ public class HeapKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
                         snapshotStrategy,
                         cancelStreamRegistryForBackend,
                         asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
+                new SnapshotStrategyRunner<>(
+                        "Heap backend savepoint",
+                        savepointStrategy,
+                        cancelStreamRegistryForBackend,
+                        asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
                 stateTableFactory,
                 keyContext);
     }
@@ -187,6 +200,7 @@ public class HeapKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
                 keyGroupCompressionDecorator,
                 localRecoveryConfig,
                 keyGroupRange,
-                keySerializerProvider);
+                keySerializerProvider,
+                numberOfKeyGroups);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
new file mode 100644
index 0000000..ed70ed7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+
+/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
+class HeapSavepointStrategy<K>
+        implements SnapshotStrategy<KeyedStateHandle, 
FullSnapshotResources<K>> {
+
+    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates;
+    private final StreamCompressionDecorator keyGroupCompressionDecorator;
+    private final KeyGroupRange keyGroupRange;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final int totalKeyGroups;
+
+    HeapSavepointStrategy(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
+            StreamCompressionDecorator keyGroupCompressionDecorator,
+            KeyGroupRange keyGroupRange,
+            StateSerializerProvider<K> keySerializerProvider,
+            int totalKeyGroups) {
+        this.registeredKVStates = registeredKVStates;
+        this.registeredPQStates = registeredPQStates;
+        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializerProvider = keySerializerProvider;
+        this.totalKeyGroups = totalKeyGroups;
+    }
+
+    @Override
+    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
+        return HeapSnapshotResources.create(
+                registeredKVStates,
+                registeredPQStates,
+                keyGroupCompressionDecorator,
+                keyGroupRange,
+                getKeySerializer(),
+                totalKeyGroups);
+    }
+
+    @Override
+    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+            FullSnapshotResources<K> syncPartResource,
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions) {
+
+        assert checkpointOptions.getCheckpointType().isSavepoint();
+        return new FullSnapshotAsyncWriter<>(
+                checkpointOptions.getCheckpointType(),
+                () ->
+                        CheckpointStreamWithResultProvider.createSimpleStream(
+                                CheckpointedStateScope.EXCLUSIVE, 
streamFactory),
+                syncPartResource);
+    }
+
+    public TypeSerializer<K> getKeySerializer() {
+        return keySerializerProvider.currentSchemaSerializer();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
new file mode 100644
index 0000000..aa2c0af
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A set of resources required to take a checkpoint or savepoint from a {@link
+ * HeapKeyedStateBackend}.
+ */
+@Internal
+final class HeapSnapshotResources<K> implements FullSnapshotResources<K> {
+    private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
+    private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
+    private final StreamCompressionDecorator streamCompressionDecorator;
+    private final Map<StateUID, Integer> stateNamesToId;
+    private final KeyGroupRange keyGroupRange;
+    private final TypeSerializer<K> keySerializer;
+    private final int totalKeyGroups;
+
+    private HeapSnapshotResources(
+            List<StateMetaInfoSnapshot> metaInfoSnapshots,
+            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+            StreamCompressionDecorator streamCompressionDecorator,
+            Map<StateUID, Integer> stateNamesToId,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            int totalKeyGroups) {
+        this.metaInfoSnapshots = metaInfoSnapshots;
+        this.cowStateStableSnapshots = cowStateStableSnapshots;
+        this.streamCompressionDecorator = streamCompressionDecorator;
+        this.stateNamesToId = stateNamesToId;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializer = keySerializer;
+        this.totalKeyGroups = totalKeyGroups;
+    }
+
+    public static <K> HeapSnapshotResources<K> create(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
+            StreamCompressionDecorator streamCompressionDecorator,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            int totalKeyGroups) {
+
+        if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
+            return new HeapSnapshotResources<>(
+                    Collections.emptyList(),
+                    Collections.emptyMap(),
+                    streamCompressionDecorator,
+                    Collections.emptyMap(),
+                    keyGroupRange,
+                    keySerializer,
+                    totalKeyGroups);
+        }
+
+        int numStates = registeredKVStates.size() + registeredPQStates.size();
+
+        Preconditions.checkState(
+                numStates <= Short.MAX_VALUE,
+                "Too many states: "
+                        + numStates
+                        + ". Currently at most "
+                        + Short.MAX_VALUE
+                        + " states are supported");
+
+        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new 
ArrayList<>(numStates);
+        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
+        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new 
HashMap<>(numStates);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
+                registeredKVStates,
+                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
+                registeredPQStates,
+                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+
+        return new HeapSnapshotResources<>(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                streamCompressionDecorator,
+                stateNamesToId,
+                keyGroupRange,
+                keySerializer,
+                totalKeyGroups);
+    }
+
+    private static void processSnapshotMetaInfoForAllStates(
+            List<StateMetaInfoSnapshot> metaInfoSnapshots,
+            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+            Map<StateUID, Integer> stateNamesToId,
+            Map<String, ? extends StateSnapshotRestore> registeredStates,
+            StateMetaInfoSnapshot.BackendStateType stateType) {
+
+        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
+                registeredStates.entrySet()) {
+            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
+            stateNamesToId.put(stateUid, stateNamesToId.size());
+            StateSnapshotRestore state = kvState.getValue();
+            if (null != state) {
+                final StateSnapshot stateSnapshot = state.stateSnapshot();
+                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+                cowStateStableSnapshots.put(stateUid, stateSnapshot);
+            }
+        }
+    }
+
+    @Override
+    public void release() {
+        for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
+            stateSnapshot.release();
+        }
+    }
+
+    public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
+        return metaInfoSnapshots;
+    }
+
+    @Override
+    public KeyValueStateIterator createKVStateIterator() throws IOException {
+        return new HeapKeyValueStateIterator(
+                keyGroupRange,
+                keySerializer,
+                totalKeyGroups,
+                stateNamesToId,
+                cowStateStableSnapshots);
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyGroupRange;
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keySerializer;
+    }
+
+    @Override
+    public StreamCompressionDecorator getStreamCompressionDecorator() {
+        return streamCompressionDecorator;
+    }
+
+    public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
+        return cowStateStableSnapshots;
+    }
+
+    public Map<StateUID, Integer> getStateNamesToId() {
+        return stateNamesToId;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index fe48c90..78b96e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -30,26 +30,20 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -60,7 +54,7 @@ import static 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.
 
 /** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
 class HeapSnapshotStrategy<K>
-        implements SnapshotStrategy<KeyedStateHandle, 
HeapSnapshotStrategy.HeapSnapshotResources> {
+        implements SnapshotStrategy<KeyedStateHandle, 
HeapSnapshotResources<K>> {
 
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
     private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates;
@@ -68,6 +62,7 @@ class HeapSnapshotStrategy<K>
     private final LocalRecoveryConfig localRecoveryConfig;
     private final KeyGroupRange keyGroupRange;
     private final StateSerializerProvider<K> keySerializerProvider;
+    private final int totalKeyGroups;
 
     HeapSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
@@ -75,58 +70,31 @@ class HeapSnapshotStrategy<K>
             StreamCompressionDecorator keyGroupCompressionDecorator,
             LocalRecoveryConfig localRecoveryConfig,
             KeyGroupRange keyGroupRange,
-            StateSerializerProvider<K> keySerializerProvider) {
+            StateSerializerProvider<K> keySerializerProvider,
+            int totalKeyGroups) {
         this.registeredKVStates = registeredKVStates;
         this.registeredPQStates = registeredPQStates;
         this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
         this.localRecoveryConfig = localRecoveryConfig;
         this.keyGroupRange = keyGroupRange;
         this.keySerializerProvider = keySerializerProvider;
+        this.totalKeyGroups = totalKeyGroups;
     }
 
     @Override
-    public HeapSnapshotResources syncPrepareResources(long checkpointId) {
-
-        if (!hasRegisteredState()) {
-            return new HeapSnapshotResources(
-                    Collections.emptyList(), Collections.emptyMap(), 
Collections.emptyMap());
-        }
-
-        int numStates = registeredKVStates.size() + registeredPQStates.size();
-
-        Preconditions.checkState(
-                numStates <= Short.MAX_VALUE,
-                "Too many states: "
-                        + numStates
-                        + ". Currently at most "
-                        + Short.MAX_VALUE
-                        + " states are supported");
-
-        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new 
ArrayList<>(numStates);
-        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
-        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new 
HashMap<>(numStates);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
+    public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
+        return HeapSnapshotResources.create(
                 registeredKVStates,
-                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
                 registeredPQStates,
-                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
-
-        return new HeapSnapshotResources(
-                metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId);
+                keyGroupCompressionDecorator,
+                keyGroupRange,
+                getKeySerializer(),
+                totalKeyGroups);
     }
 
     @Override
     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            HeapSnapshotResources syncPartResource,
+            HeapSnapshotResources<K> syncPartResource,
             long checkpointId,
             long timestamp,
             @Nonnull CheckpointStreamFactory streamFactory,
@@ -142,7 +110,7 @@ class HeapSnapshotStrategy<K>
                         // TODO: this code assumes that writing a serializer 
is threadsafe, we
                         // should support to
                         // get a serialized form already at state registration 
time in the future
-                        getKeySerializer(),
+                        syncPartResource.getKeySerializer(),
                         metaInfoSnapshots,
                         !Objects.equals(
                                 
UncompressedStreamCompressionDecorator.INSTANCE,
@@ -214,65 +182,7 @@ class HeapSnapshotStrategy<K>
         };
     }
 
-    private void processSnapshotMetaInfoForAllStates(
-            List<StateMetaInfoSnapshot> metaInfoSnapshots,
-            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-            Map<StateUID, Integer> stateNamesToId,
-            Map<String, ? extends StateSnapshotRestore> registeredStates,
-            StateMetaInfoSnapshot.BackendStateType stateType) {
-
-        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
-                registeredStates.entrySet()) {
-            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
-            stateNamesToId.put(stateUid, stateNamesToId.size());
-            StateSnapshotRestore state = kvState.getValue();
-            if (null != state) {
-                final StateSnapshot stateSnapshot = state.stateSnapshot();
-                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
-                cowStateStableSnapshots.put(stateUid, stateSnapshot);
-            }
-        }
-    }
-
-    private boolean hasRegisteredState() {
-        return !(registeredKVStates.isEmpty() && registeredPQStates.isEmpty());
-    }
-
     public TypeSerializer<K> getKeySerializer() {
         return keySerializerProvider.currentSchemaSerializer();
     }
-
-    static class HeapSnapshotResources implements SnapshotResources {
-        private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
-        private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
-        private final Map<StateUID, Integer> stateNamesToId;
-
-        HeapSnapshotResources(
-                @Nonnull List<StateMetaInfoSnapshot> metaInfoSnapshots,
-                @Nonnull Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-                @Nonnull Map<StateUID, Integer> stateNamesToId) {
-            this.metaInfoSnapshots = metaInfoSnapshots;
-            this.cowStateStableSnapshots = cowStateStableSnapshots;
-            this.stateNamesToId = stateNamesToId;
-        }
-
-        @Override
-        public void release() {
-            for (StateSnapshot stateSnapshot : 
cowStateStableSnapshots.values()) {
-                stateSnapshot.release();
-            }
-        }
-
-        public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
-            return metaInfoSnapshots;
-        }
-
-        public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
-            return cowStateStableSnapshots;
-        }
-
-        public Map<StateUID, Integer> getStateNamesToId() {
-            return stateNamesToId;
-        }
-    }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 7031344..fb30a1d 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,6 +231,15 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>com.github.oshi</groupId>
                        <artifactId>oshi-core</artifactId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
index d5708a6..5138e03 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 
 /** Tests for switching a HEAP state backend to a different one. */
 @RunWith(Parameterized.class)
@@ -35,6 +35,6 @@ public class HeapSavepointStateBackendSwitchTest extends 
SavepointStateBackendSw
 
     @Parameterized.Parameters
     public static Collection<BackendSwitchSpec> targetBackends() {
-        return Collections.singletonList(BackendSwitchSpecs.HEAP);
+        return Arrays.asList(BackendSwitchSpecs.HEAP, 
BackendSwitchSpecs.ROCKS);
     }
 }

Reply via email to