[FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/353d6004
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/353d6004
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/353d6004

Branch: refs/heads/release-1.3
Commit: 353d60045b40412f4d3715688fb2bc33f2ac30b8
Parents: f1e059b
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed May 24 10:15:19 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Wed May 24 14:54:20 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 51 ++++++++++--------
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../test/checkpointing/RescalingITCase.java     | 55 +++++++++++++-------
 3 files changed, 65 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 51255ab..053c820 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -155,7 +155,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
-       private Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+       private final Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
 
        /**
         * Map of state names to their corresponding restored state meta info.
@@ -163,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * TODO this map can be removed when eager-state registration is in 
place.
         * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
         */
-       private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
restoredKvStateMetaInfos;
+       private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
 
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
@@ -172,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final boolean enableIncrementalCheckpointing;
 
        /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints */
-       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles 
= new TreeMap<>();
+       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
        /** The identifier of the last completed checkpoint */
        private long lastCompletedCheckpointId = -1;
@@ -221,8 +221,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw new IOException("Error cleaning RocksDB data 
directory.", e);
                }
 
-               keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE 
+ 1) ? 2 : 1;
-               kvStateInformation = new HashMap<>();
+               this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
+               this.kvStateInformation = new HashMap<>();
+               this.restoredKvStateMetaInfos = new HashMap<>();
+               this.materializedSstFiles = new TreeMap<>();
        }
 
        /**
@@ -249,6 +251,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
 
                                kvStateInformation.clear();
+                               restoredKvStateMetaInfos.clear();
 
                                try {
                                        db.close();
@@ -826,7 +829,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
                        // use the last completed checkpoint as the comparison 
base.
-                       baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+                       synchronized (stateBackend.materializedSstFiles) {
+                               baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+                       }
 
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
@@ -885,7 +890,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
 
 
-                       synchronized (stateBackend.asyncSnapshotLock) {
+                       synchronized (stateBackend.materializedSstFiles) {
                                
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
                        }
 
@@ -943,6 +948,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
 
+               // clear all meta data
+               kvStateInformation.clear();
+               restoredKvStateMetaInfos.clear();
+
                try {
                        if (restoreState == null || restoreState.isEmpty()) {
                                createDB();
@@ -964,7 +973,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        public void notifyCheckpointComplete(long completedCheckpointId) {
-               synchronized (asyncSnapshotLock) {
+               synchronized (materializedSstFiles) {
                        if (completedCheckpointId < lastCompletedCheckpointId) {
                                return;
                        }
@@ -1125,13 +1134,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
                                        
serializationProxy.getStateMetaInfoSnapshots();
-
                        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-                       rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new 
HashMap<>(restoredMetaInfos.size());
+                       //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
 
                        for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
 
-                               if 
(!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName()))
 {
+                               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+                                       
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
+
+                               if (registeredColumn == null) {
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
                                                
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
                                                
rocksDBKeyedStateBackend.columnOptions);
@@ -1147,14 +1158,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(
-                                               stateMetaInfo.getName(),
-                                               new Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo));
+                                       registeredColumn = new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, 
?>>(columnFamily, stateMetaInfo);
+                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
 
-                                       
currentStateHandleKVStateColumnFamilies.add(columnFamily);
                                } else {
                                        // TODO with eager state registration 
in place, check here for serializer migration strategies
                                }
+                               
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
                        }
                }
 
@@ -1313,8 +1323,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                List<ColumnFamilyDescriptor> 
columnFamilyDescriptors = new ArrayList<>();
 
-                               stateBackend.restoredKvStateMetaInfos = new 
HashMap<>(stateMetaInfoSnapshots.size());
-
                                for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
 
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
@@ -1424,7 +1432,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
 
                                        // use the restore sst files as the 
base for succeeding checkpoints
-                                       
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
+                                       synchronized 
(stateBackend.materializedSstFiles) {
+                                               
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
+                                       }
 
                                        stateBackend.lastCompletedCheckpointId 
= restoreStateHandle.getCheckpointId();
                                }
@@ -1890,11 +1900,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                Preconditions.checkState(1 == namedStates.size(), "Only one 
element expected here.");
                DataInputView inputView = 
namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
 
-               // clear k/v state information before filling it
-               kvStateInformation.clear();
-
-               restoredKvStateMetaInfos = new HashMap<>(namedStates.size());
-
                // first get the column family mapping
                int numColumns = inputView.readInt();
                Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new 
HashMap<>(numColumns);

http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ada6377..d4ba204 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -399,7 +399,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                .isRequiresMigration()) {
 
                                                // TODO replace with state 
migration; note that key hash codes need to remain the same after migration
-                                               throw new RuntimeException("The 
new key serializer is not compatible to read previous keys. " +
+                                               throw new 
IllegalStateException("The new key serializer is not compatible to read 
previous keys. " +
                                                        "Aborting now since 
state migration is currently not available");
                                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 88dd1dd..9df0d1a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -57,10 +56,12 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -81,15 +82,23 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/**
- * TODO : parameterize to test all different state backends!
- */
+@RunWith(Parameterized.class)
 public class RescalingITCase extends TestLogger {
 
        private static final int numTaskManagers = 2;
        private static final int slotsPerTaskManager = 2;
        private static final int numSlots = numTaskManagers * 
slotsPerTaskManager;
 
+       @Parameterized.Parameters
+       public static Object[] data() {
+               return new Object[]{"filesystem", "rocksdb"};
+       }
+
+       @Parameterized.Parameter
+       public String backend;
+
+       private String currentBackend = null;
+
        enum OperatorCheckpointMethod {
                NON_PARTITIONED, CHECKPOINTED_FUNCTION, 
CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
        }
@@ -99,25 +108,32 @@ public class RescalingITCase extends TestLogger {
        @ClassRule
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @BeforeClass
-       public static void setup() throws Exception {
-               Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slotsPerTaskManager);
+       @Before
+       public void setup() throws Exception {
+               // detect parameter change
+               if (currentBackend != backend) {
+                       shutDownExistingCluster();
+
+                       currentBackend = backend;
 
-               final File checkpointDir = temporaryFolder.newFolder();
-               final File savepointDir = temporaryFolder.newFolder();
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
-               config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-               
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
-               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       final File savepointDir = temporaryFolder.newFolder();
 
-               cluster = new TestingCluster(config);
-               cluster.start();
+                       config.setString(CoreOptions.STATE_BACKEND, 
currentBackend);
+                       
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
+                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
+
+                       cluster = new TestingCluster(config);
+                       cluster.start();
+               }
        }
 
        @AfterClass
-       public static void teardown() {
+       public static void shutDownExistingCluster() {
                if (cluster != null) {
                        cluster.shutdown();
                        cluster.awaitTermination();
@@ -867,6 +883,7 @@ public class RescalingITCase extends TestLogger {
 
        private static class StateSourceBase extends 
RichParallelSourceFunction<Integer> {
 
+               private static final long serialVersionUID = 
7512206069681177940L;
                private static volatile CountDownLatch workStartedLatch = new 
CountDownLatch(1);
 
                protected volatile int counter = 0;
@@ -959,7 +976,7 @@ public class RescalingITCase extends TestLogger {
                private static final long serialVersionUID = 
-359715965103593462L;
                private static final int NUM_PARTITIONS = 7;
 
-               private ListState<Integer> counterPartitions;
+               private transient ListState<Integer> counterPartitions;
                private boolean broadcast;
 
                private static int[] CHECK_CORRECT_SNAPSHOT;

Reply via email to