This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c726bc90609d31f07494344ccaaed4d2c7e7c5
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jan 13 17:20:29 2021 +0100

    [FLINK-20978] Extract common restoring logic from RocksDB
---
 .../restore/FullSnapshotRestoreOperation.java      | 323 +++++++++++++++++++++
 .../flink/runtime/state/restore/KeyGroup.java      |  41 +++
 .../flink/runtime/state/restore/KeyGroupEntry.java |  47 +++
 .../state/restore/SavepointRestoreResult.java      |  44 +++
 .../runtime/state/restore/ThrowingIterator.java    |  52 ++++
 .../state/restore/RocksDBFullRestoreOperation.java | 175 +++--------
 6 files changed, 549 insertions(+), 133 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation.java
new file mode 100644
index 0000000..8147ea5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RestoreOperation;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static 
org.apache.flink.runtime.state.FullSnapshotUtil.clearMetaDataFollowsFlag;
+import static 
org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
+import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
+
+/**
+ * The procedure of restoring state from a savepoint written with the unified 
binary format. All
+ * state backends should support restoring from that format.
+ *
+ * <p>The format was adopted from the RocksDB state backend. It is as follows:
+ *
+ * <pre>
+ *                    
+-------------------------------------------------------------+
+ *                    |                Keyed Backend Meta Information          
     |
+ *  Meta Information  
+--------------------+-----+----------------------------------+
+ *                    |    State Meta 0    | ... |           State Meta M      
     |
+ *                    
+-------------------------------------------------------------+
+ *                    |                       State ID (short)                 
     |
+ *  State 0           
+--------------------+-----+----------------------------------+
+ *                    | State (K,V) pair 0 | ... | State (K,V) pair X (flipped 
MSB) |
+ *                    
+--------------------+-----+----------------------------------+
+ *                    |                       State ID (short)                 
     |
+ *  State 1           
+--------------------+-----+----------------------------------+
+ *                    | State (K,V) pair 0 | ... | State (K,V) pair X (flipped 
MSB) |
+ *                    
+--------------------+-----+----------------------------------+
+ *                    |                       State ID (short)                 
     |
+ *                    
+--------------------+-----+----------------------------------+
+ *  State M           | State (K,V) pair 0 | ... | State (K,V) pair X (flipped 
MSB) |
+ *                    
+--------------------+-----+----------------------------------+
+ *                    |                END_OF_KEY_GROUP_MARK (0xFFFF)          
     |
+ *                    
+-------------------------------------------------------------+
+ * </pre>
+ *
+ * <p>Additionally the format of the (K,V) pairs differs slightly depending on 
the type of the state
+ * object:
+ *
+ * <pre>
+ * +------------------+-------------------------------------+
+ * |    ValueState    |    [CompositeKey(KG, K, NS), SV]    |
+ * |------------------+-------------------------------------+
+ * |    ListState     |    [CompositeKey(KG, K, NS), SV]    |
+ * +------------------+-------------------------------------+
+ * |     MapState     | [CompositeKey(KG, K, NS) :: UK, UV] |
+ * +------------------+-------------------------------------+
+ * | AggregatingState |    [CompositeKey(KG, K, NS), SV]    |
+ * +------------------+-------------------------------------+
+ * |   ReducingState  |    [CompositeKey(KG, K, NS), SV]    |
+ * |                  +-------------------------------------+
+ * |                  |    [CompositeKey(KG, K, NS), SV]    |
+ * +------------------+-------------------------------------+
+ * |      Timers      |    [KG :: TS :: K :: NS, (empty)]   |
+ * +------------------+-------------------------------------+
+ * </pre>
+ *
+ * <p>For detailed information see FLIP-41: 
https://cwiki.apache.org/confluence/x/VJDiBg
+ *
+ * @param <K> The data type of the key.
+ */
+@Internal
+public class FullSnapshotRestoreOperation<K>
+        implements RestoreOperation<ThrowingIterator<SavepointRestoreResult>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FullSnapshotRestoreOperation.class);
+
+    private final KeyGroupRange keyGroupRange;
+    private final ClassLoader userCodeClassLoader;
+    private final Collection<KeyedStateHandle> restoreStateHandles;
+    private final StateSerializerProvider<K> keySerializerProvider;
+
+    private boolean isKeySerializerCompatibilityChecked;
+
+    public FullSnapshotRestoreOperation(
+            KeyGroupRange keyGroupRange,
+            ClassLoader userCodeClassLoader,
+            Collection<KeyedStateHandle> restoreStateHandles,
+            StateSerializerProvider<K> keySerializerProvider) {
+        this.keyGroupRange = keyGroupRange;
+        this.userCodeClassLoader = userCodeClassLoader;
+        this.restoreStateHandles =
+                
restoreStateHandles.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        this.keySerializerProvider = keySerializerProvider;
+    }
+
+    @Override
+    public ThrowingIterator<SavepointRestoreResult> restore()
+            throws IOException, StateMigrationException {
+        return new ThrowingIterator<SavepointRestoreResult>() {
+
+            private final Iterator<KeyedStateHandle> keyedStateHandlesIterator 
=
+                    restoreStateHandles.iterator();
+
+            @Override
+            public boolean hasNext() {
+                return keyedStateHandlesIterator.hasNext();
+            }
+
+            @Override
+            public SavepointRestoreResult next() throws IOException, 
StateMigrationException {
+                KeyedStateHandle keyedStateHandle = 
keyedStateHandlesIterator.next();
+                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
+                    throw unexpectedStateHandleException(
+                            KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
+                }
+                KeyGroupsStateHandle groupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
+                return restoreKeyGroupsInStateHandle(groupsStateHandle);
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+
+    private SavepointRestoreResult restoreKeyGroupsInStateHandle(
+            @Nonnull KeyGroupsStateHandle keyedStateHandle)
+            throws IOException, StateMigrationException {
+        FSDataInputStream currentStateHandleInStream = 
keyedStateHandle.openInputStream();
+        KeyedBackendSerializationProxy<K> serializationProxy =
+                readMetaData(new 
DataInputViewStreamWrapper(currentStateHandleInStream));
+        KeyGroupsIterator groupsIterator =
+                new KeyGroupsIterator(
+                        keyGroupRange,
+                        keyedStateHandle,
+                        currentStateHandleInStream,
+                        serializationProxy.isUsingKeyGroupCompression()
+                                ? SnappyStreamCompressionDecorator.INSTANCE
+                                : 
UncompressedStreamCompressionDecorator.INSTANCE);
+
+        return new SavepointRestoreResult(
+                serializationProxy.getStateMetaInfoSnapshots(), 
groupsIterator);
+    }
+
+    private KeyedBackendSerializationProxy<K> readMetaData(DataInputView 
dataInputView)
+            throws IOException, StateMigrationException {
+        // isSerializerPresenceRequired flag is set to false, since for the 
RocksDB state backend,
+        // deserialization of state happens lazily during runtime; we depend 
on the fact
+        // that the new serializer for states could be compatible, and 
therefore the restore can
+        // continue
+        // without old serializers required to be present.
+        KeyedBackendSerializationProxy<K> serializationProxy =
+                new KeyedBackendSerializationProxy<>(userCodeClassLoader);
+        serializationProxy.read(dataInputView);
+        if (!isKeySerializerCompatibilityChecked) {
+            // fetch current serializer now because if it is incompatible, we 
can't access
+            // it anymore to improve the error message
+            TypeSerializer<K> currentSerializer = 
keySerializerProvider.currentSchemaSerializer();
+            // check for key serializer compatibility; this also reconfigures 
the
+            // key serializer to be compatible, if it is required and is 
possible
+            TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
+                    
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(
+                            serializationProxy.getKeySerializerSnapshot());
+            if (keySerializerSchemaCompat.isCompatibleAfterMigration()
+                    || keySerializerSchemaCompat.isIncompatible()) {
+                throw new StateMigrationException(
+                        "The new key serializer ("
+                                + currentSerializer
+                                + ") must be compatible with the previous key 
serializer ("
+                                + 
keySerializerProvider.previousSchemaSerializer()
+                                + ").");
+            }
+
+            isKeySerializerCompatibilityChecked = true;
+        }
+
+        return serializationProxy;
+    }
+
+    private static class KeyGroupsIterator implements 
ThrowingIterator<KeyGroup> {
+        @Nonnull private final KeyGroupRange keyGroupRange;
+        @Nonnull private final Iterator<Tuple2<Integer, Long>> keyGroups;
+        @Nonnull private final FSDataInputStream currentStateHandleInStream;
+        @Nonnull private final StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
+        @Nonnull private final KeyGroupsStateHandle 
currentKeyGroupsStateHandle;
+
+        private KeyGroupsIterator(
+                @Nonnull KeyGroupRange keyGroupRange,
+                @Nonnull KeyGroupsStateHandle currentKeyGroupsStateHandle,
+                @Nonnull FSDataInputStream currentStateHandleInStream,
+                @Nonnull StreamCompressionDecorator 
keygroupStreamCompressionDecorator) {
+            this.keyGroupRange = keyGroupRange;
+            this.keyGroups = 
currentKeyGroupsStateHandle.getGroupRangeOffsets().iterator();
+            this.currentStateHandleInStream = currentStateHandleInStream;
+            this.keygroupStreamCompressionDecorator = 
keygroupStreamCompressionDecorator;
+            this.currentKeyGroupsStateHandle = currentKeyGroupsStateHandle;
+            LOG.info("Starting to restore from state handle: {}.", 
currentKeyGroupsStateHandle);
+        }
+
+        public boolean hasNext() {
+            return keyGroups.hasNext();
+        }
+
+        public KeyGroup next() throws IOException {
+            Tuple2<Integer, Long> keyGroupOffset = keyGroups.next();
+            int keyGroup = keyGroupOffset.f0;
+
+            // Check that restored key groups all belong to the backend
+            Preconditions.checkState(
+                    keyGroupRange.contains(keyGroup), "The key group must 
belong to the backend");
+
+            long offset = keyGroupOffset.f1;
+            if (0L != offset) {
+                currentStateHandleInStream.seek(offset);
+                InputStream compressedKgIn =
+                        
keygroupStreamCompressionDecorator.decorateWithCompression(
+                                currentStateHandleInStream);
+                DataInputViewStreamWrapper compressedKgInputView =
+                        new DataInputViewStreamWrapper(compressedKgIn);
+                return new KeyGroup(keyGroup, new 
KeyGroupEntriesIterator(compressedKgInputView));
+            } else {
+                return new KeyGroup(keyGroup, new KeyGroupEntriesIterator());
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            currentStateHandleInStream.close();
+            LOG.info("Finished restoring from state handle: {}.", 
currentKeyGroupsStateHandle);
+        }
+    }
+
+    private static class KeyGroupEntriesIterator implements 
ThrowingIterator<KeyGroupEntry> {
+        private final DataInputViewStreamWrapper kgInputView;
+        private Integer currentKvStateId;
+
+        private KeyGroupEntriesIterator(@Nonnull DataInputViewStreamWrapper 
kgInputView)
+                throws IOException {
+            this.kgInputView = kgInputView;
+            this.currentKvStateId = END_OF_KEY_GROUP_MARK & 
kgInputView.readShort();
+        }
+
+        // creates an empty iterator
+        private KeyGroupEntriesIterator() {
+            this.kgInputView = null;
+            this.currentKvStateId = null;
+        }
+
+        public boolean hasNext() {
+            return currentKvStateId != null;
+        }
+
+        public KeyGroupEntry next() throws IOException {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(kgInputView);
+            byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(kgInputView);
+            final int entryStateId = currentKvStateId;
+            if (hasMetaDataFollowsFlag(key)) {
+                // clear the signal bit in the key to make it ready for 
insertion
+                // again
+                clearMetaDataFollowsFlag(key);
+                // TODO this could be aware of keyGroupPrefixBytes and write 
only
+                // one byte if possible
+                currentKvStateId = END_OF_KEY_GROUP_MARK & 
kgInputView.readShort();
+                if (END_OF_KEY_GROUP_MARK == currentKvStateId) {
+                    currentKvStateId = null;
+                }
+            }
+
+            return new KeyGroupEntry(entryStateId, key, value);
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (kgInputView != null) {
+                kgInputView.close();
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroup.java
new file mode 100644
index 0000000..f321ca5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroup.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.restore;
+
+import org.apache.flink.annotation.Internal;
+
+/** Part of a savepoint representing data for a single key group. */
+@Internal
+public class KeyGroup {
+    private final int keyGroupId;
+    private final ThrowingIterator<KeyGroupEntry> keyGroupEntries;
+
+    KeyGroup(int keyGroupId, ThrowingIterator<KeyGroupEntry> keyGroupEntries) {
+        this.keyGroupId = keyGroupId;
+        this.keyGroupEntries = keyGroupEntries;
+    }
+
+    public int getKeyGroupId() {
+        return keyGroupId;
+    }
+
+    public ThrowingIterator<KeyGroupEntry> getKeyGroupEntries() {
+        return keyGroupEntries;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroupEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroupEntry.java
new file mode 100644
index 0000000..afc2c72
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/KeyGroupEntry.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.restore;
+
+import org.apache.flink.annotation.Internal;
+
+/** Part of a savepoint representing data for a single state entry in a key 
group. */
+@Internal
+public class KeyGroupEntry {
+    private final int kvStateId;
+    private final byte[] key;
+    private final byte[] value;
+
+    KeyGroupEntry(int kvStateId, byte[] key, byte[] value) {
+        this.kvStateId = kvStateId;
+        this.key = key;
+        this.value = value;
+    }
+
+    public int getKvStateId() {
+        return kvStateId;
+    }
+
+    public byte[] getKey() {
+        return key;
+    }
+
+    public byte[] getValue() {
+        return value;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/SavepointRestoreResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/SavepointRestoreResult.java
new file mode 100644
index 0000000..2f46ee5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/SavepointRestoreResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.restore;
+
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+import java.util.List;
+
+/** A result from restoring a single {@link 
org.apache.flink.runtime.state.KeyedStateHandle}. */
+public class SavepointRestoreResult {
+    private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+    private final ThrowingIterator<KeyGroup> restoredKeyGroups;
+
+    public SavepointRestoreResult(
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            ThrowingIterator<KeyGroup> groupsIterator) {
+        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.restoredKeyGroups = groupsIterator;
+    }
+
+    public List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots() {
+        return stateMetaInfoSnapshots;
+    }
+
+    public ThrowingIterator<KeyGroup> getRestoredKeyGroups() {
+        return restoredKeyGroups;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/ThrowingIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/ThrowingIterator.java
new file mode 100644
index 0000000..f2bd986
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/restore/ThrowingIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.StateMigrationException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Modified {@link java.util.Iterator} interface that lets the implementation 
throw an {@link
+ * IOException}.
+ *
+ * @param <E> the type of elements returned by this iterator
+ * @see java.util.Iterator
+ */
+@Internal
+public interface ThrowingIterator<E> extends Closeable {
+    /**
+     * Returns {@code true} if the iteration has more elements. (In other 
words, returns {@code
+     * true} if {@link #next} would return an element rather than throwing an 
exception.)
+     *
+     * @return {@code true} if the iteration has more elements
+     */
+    boolean hasNext();
+
+    /**
+     * Returns the next element in the iteration.
+     *
+     * @return the next element in the iteration
+     * @throws NoSuchElementException if the iteration has no more elements
+     */
+    E next() throws IOException, StateMigrationException;
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index d6c066b..6c98d9b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -18,28 +18,21 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.StateSerializerProvider;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
+import org.apache.flink.runtime.state.restore.KeyGroup;
+import org.apache.flink.runtime.state.restore.KeyGroupEntry;
+import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
+import org.apache.flink.runtime.state.restore.ThrowingIterator;
 import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -52,38 +45,17 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import static 
org.apache.flink.runtime.state.FullSnapshotUtil.END_OF_KEY_GROUP_MARK;
-import static 
org.apache.flink.runtime.state.FullSnapshotUtil.clearMetaDataFollowsFlag;
-import static 
org.apache.flink.runtime.state.FullSnapshotUtil.hasMetaDataFollowsFlag;
-import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Encapsulates the process of restoring a RocksDB instance from a full 
snapshot. */
 public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperation<K> {
-    /** Current key-groups state handle from which we restore key-groups. */
-    private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-    /** Current input stream we obtained from currentKeyGroupsStateHandle. */
-    private FSDataInputStream currentStateHandleInStream;
-    /** Current data input view that wraps currentStateHandleInStream. */
-    private DataInputView currentStateHandleInView;
-    /**
-     * Current list of ColumnFamilyHandles for all column families we restore 
from
-     * currentKeyGroupsStateHandle.
-     */
-    private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
-    /**
-     * The compression decorator that was used for writing the state, as 
determined by the meta
-     * data.
-     */
-    private StreamCompressionDecorator keygroupStreamCompressionDecorator;
-
+    private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
     /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
     private final long writeBatchSize;
 
@@ -124,6 +96,12 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
                 writeBufferManagerCapacity);
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no 
negative.");
         this.writeBatchSize = writeBatchSize;
+        this.savepointRestoreOperation =
+                new FullSnapshotRestoreOperation<>(
+                        keyGroupRange,
+                        userCodeClassLoader,
+                        restoreStateHandles,
+                        keySerializerProvider);
     }
 
     /** Restores all key-groups data that is referenced by the passed state 
handles. */
@@ -131,60 +109,32 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
     public RocksDBRestoreResult restore()
             throws IOException, StateMigrationException, RocksDBException {
         openDB();
-        for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
-            if (keyedStateHandle != null) {
-
-                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
-                    throw unexpectedStateHandleException(
-                            KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
-                }
-                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) 
keyedStateHandle;
-                restoreKeyGroupsInStateHandle();
+        try (ThrowingIterator<SavepointRestoreResult> restore =
+                savepointRestoreOperation.restore()) {
+            while (restore.hasNext()) {
+                applyRestoreResult(restore.next());
             }
         }
         return new RocksDBRestoreResult(
                 this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, 
null, null);
     }
 
-    /** Restore one key groups state handle. */
-    private void restoreKeyGroupsInStateHandle()
-            throws IOException, StateMigrationException, RocksDBException {
-        try {
-            logger.info("Starting to restore from state handle: {}.", 
currentKeyGroupsStateHandle);
-            currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-            cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-            currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-            restoreKVStateMetaData();
-            restoreKVStateData();
-            logger.info("Finished restoring from state handle: {}.", 
currentKeyGroupsStateHandle);
-        } finally {
-            if 
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
-                IOUtils.closeQuietly(currentStateHandleInStream);
-            }
-        }
-    }
-
-    /**
-     * Restore the KV-state / ColumnFamily meta data for all key-groups 
referenced by the current
-     * state handle.
-     */
-    private void restoreKVStateMetaData() throws IOException, 
StateMigrationException {
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                readMetaData(currentStateHandleInView);
-
-        this.keygroupStreamCompressionDecorator =
-                serializationProxy.isUsingKeyGroupCompression()
-                        ? SnappyStreamCompressionDecorator.INSTANCE
-                        : UncompressedStreamCompressionDecorator.INSTANCE;
-
+    private void applyRestoreResult(SavepointRestoreResult 
savepointRestoreResult)
+            throws IOException, RocksDBException, StateMigrationException {
         List<StateMetaInfoSnapshot> restoredMetaInfos =
-                serializationProxy.getStateMetaInfoSnapshots();
-        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-
-        for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
-            RocksDbKvStateInfo registeredStateCFHandle =
-                    getOrRegisterStateColumnFamilyHandle(null, 
restoredMetaInfo);
-            
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
+                savepointRestoreResult.getStateMetaInfoSnapshots();
+        List<ColumnFamilyHandle> columnFamilyHandles =
+                restoredMetaInfos.stream()
+                        .map(
+                                stateMetaInfoSnapshot -> {
+                                    RocksDbKvStateInfo registeredStateCFHandle 
=
+                                            
getOrRegisterStateColumnFamilyHandle(
+                                                    null, 
stateMetaInfoSnapshot);
+                                    return 
registeredStateCFHandle.columnFamilyHandle;
+                                })
+                        .collect(Collectors.toList());
+        try (ThrowingIterator<KeyGroup> keyGroups = 
savepointRestoreResult.getRestoredKeyGroups()) {
+            restoreKVStateData(keyGroups, columnFamilyHandles);
         }
     }
 
@@ -192,60 +142,19 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
      * Restore the KV-state / ColumnFamily data for all key-groups referenced 
by the current state
      * handle.
      */
-    private void restoreKVStateData() throws IOException, RocksDBException {
+    private void restoreKVStateData(
+            ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle> 
columnFamilies)
+            throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
                 new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-            for (Tuple2<Integer, Long> keyGroupOffset :
-                    currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                int keyGroup = keyGroupOffset.f0;
-
-                // Check that restored key groups all belong to the backend
-                Preconditions.checkState(
-                        keyGroupRange.contains(keyGroup),
-                        "The key group must belong to the backend");
-
-                long offset = keyGroupOffset.f1;
-                // not empty key-group?
-                if (0L != offset) {
-                    currentStateHandleInStream.seek(offset);
-                    try (InputStream compressedKgIn =
-                            
keygroupStreamCompressionDecorator.decorateWithCompression(
-                                    currentStateHandleInStream)) {
-                        DataInputViewStreamWrapper compressedKgInputView =
-                                new DataInputViewStreamWrapper(compressedKgIn);
-                        // TODO this could be aware of keyGroupPrefixBytes and 
write only one byte
-                        // if possible
-                        int kvStateId = compressedKgInputView.readShort();
-                        ColumnFamilyHandle handle =
-                                
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                        // insert all k/v pairs into DB
-                        boolean keyGroupHasMoreKeys = true;
-                        while (keyGroupHasMoreKeys) {
-                            byte[] key =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            byte[] value =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            if (hasMetaDataFollowsFlag(key)) {
-                                // clear the signal bit in the key to make it 
ready for insertion
-                                // again
-                                clearMetaDataFollowsFlag(key);
-                                writeBatchWrapper.put(handle, key, value);
-                                // TODO this could be aware of 
keyGroupPrefixBytes and write only
-                                // one byte if possible
-                                kvStateId =
-                                        END_OF_KEY_GROUP_MARK & 
compressedKgInputView.readShort();
-                                if (END_OF_KEY_GROUP_MARK == kvStateId) {
-                                    keyGroupHasMoreKeys = false;
-                                } else {
-                                    handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                }
-                            } else {
-                                writeBatchWrapper.put(handle, key, value);
-                            }
-                        }
+            while (keyGroups.hasNext()) {
+                KeyGroup keyGroup = keyGroups.next();
+                try (ThrowingIterator<KeyGroupEntry> groupEntries = 
keyGroup.getKeyGroupEntries()) {
+                    while (groupEntries.hasNext()) {
+                        KeyGroupEntry groupEntry = groupEntries.next();
+                        ColumnFamilyHandle handle = 
columnFamilies.get(groupEntry.getKvStateId());
+                        writeBatchWrapper.put(handle, groupEntry.getKey(), 
groupEntry.getValue());
                     }
                 }
             }

Reply via email to