[FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/353d6004 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/353d6004 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/353d6004 Branch: refs/heads/release-1.3 Commit: 353d60045b40412f4d3715688fb2bc33f2ac30b8 Parents: f1e059b Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed May 24 10:15:19 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Wed May 24 14:54:20 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 51 ++++++++++-------- .../state/heap/HeapKeyedStateBackend.java | 2 +- .../test/checkpointing/RescalingITCase.java | 55 +++++++++++++------- 3 files changed, 65 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 51255ab..053c820 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -155,7 +155,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ - private Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation; + private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation; /** * Map of state names to their corresponding restored state meta info. @@ -163,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ - private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; + private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -172,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final boolean enableIncrementalCheckpointing; /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ - private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>(); + private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -221,8 +221,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new IOException("Error cleaning RocksDB data directory.", e); } - keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; - kvStateInformation = new HashMap<>(); + this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; + this.kvStateInformation = new HashMap<>(); + this.restoredKvStateMetaInfos = new HashMap<>(); + this.materializedSstFiles = new TreeMap<>(); } /** @@ -249,6 +251,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); try { db.close(); @@ -826,7 +829,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); // use the last completed checkpoint as the comparison base. - baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + synchronized (stateBackend.materializedSstFiles) { + baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + } // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry @@ -885,7 +890,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { - synchronized (stateBackend.asyncSnapshotLock) { + synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } @@ -943,6 +948,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); + try { if (restoreState == null || restoreState.isEmpty()) { createDB(); @@ -964,7 +973,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override public void notifyCheckpointComplete(long completedCheckpointId) { - synchronized (asyncSnapshotLock) { + synchronized (materializedSstFiles) { if (completedCheckpointId < lastCompletedCheckpointId) { return; } @@ -1125,13 +1134,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - if (!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName())) { + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); + + if (registeredColumn == null) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), rocksDBKeyedStateBackend.columnOptions); @@ -1147,14 +1158,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - rocksDBKeyedStateBackend.kvStateInformation.put( - stateMetaInfo.getName(), - new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo)); + registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - currentStateHandleKVStateColumnFamilies.add(columnFamily); } else { // TODO with eager state registration in place, check here for serializer migration strategies } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); } } @@ -1313,8 +1323,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); - stateBackend.restoredKvStateMetaInfos = new HashMap<>(stateMetaInfoSnapshots.size()); - for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( @@ -1424,7 +1432,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // use the restore sst files as the base for succeeding checkpoints - stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); + } stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } @@ -1890,11 +1900,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Preconditions.checkState(1 == namedStates.size(), "Only one element expected here."); DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader); - // clear k/v state information before filling it - kvStateInformation.clear(); - - restoredKvStateMetaInfos = new HashMap<>(namedStates.size()); - // first get the column family mapping int numColumns = inputView.readInt(); Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns); http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index ada6377..d4ba204 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -399,7 +399,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } http://git-wip-us.apache.org/repos/asf/flink/blob/353d6004/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 88dd1dd..9df0d1a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -57,10 +56,12 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; @@ -81,15 +82,23 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -/** - * TODO : parameterize to test all different state backends! - */ +@RunWith(Parameterized.class) public class RescalingITCase extends TestLogger { private static final int numTaskManagers = 2; private static final int slotsPerTaskManager = 2; private static final int numSlots = numTaskManagers * slotsPerTaskManager; + @Parameterized.Parameters + public static Object[] data() { + return new Object[]{"filesystem", "rocksdb"}; + } + + @Parameterized.Parameter + public String backend; + + private String currentBackend = null; + enum OperatorCheckpointMethod { NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED } @@ -99,25 +108,32 @@ public class RescalingITCase extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - @BeforeClass - public static void setup() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); + @Before + public void setup() throws Exception { + // detect parameter change + if (currentBackend != backend) { + shutDownExistingCluster(); + + currentBackend = backend; - final File checkpointDir = temporaryFolder.newFolder(); - final File savepointDir = temporaryFolder.newFolder(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); - config.setString(CoreOptions.STATE_BACKEND, "filesystem"); - config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); - config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); + final File checkpointDir = temporaryFolder.newFolder(); + final File savepointDir = temporaryFolder.newFolder(); - cluster = new TestingCluster(config); - cluster.start(); + config.setString(CoreOptions.STATE_BACKEND, currentBackend); + config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); + + cluster = new TestingCluster(config); + cluster.start(); + } } @AfterClass - public static void teardown() { + public static void shutDownExistingCluster() { if (cluster != null) { cluster.shutdown(); cluster.awaitTermination(); @@ -867,6 +883,7 @@ public class RescalingITCase extends TestLogger { private static class StateSourceBase extends RichParallelSourceFunction<Integer> { + private static final long serialVersionUID = 7512206069681177940L; private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1); protected volatile int counter = 0; @@ -959,7 +976,7 @@ public class RescalingITCase extends TestLogger { private static final long serialVersionUID = -359715965103593462L; private static final int NUM_PARTITIONS = 7; - private ListState<Integer> counterPartitions; + private transient ListState<Integer> counterPartitions; private boolean broadcast; private static int[] CHECK_CORRECT_SNAPSHOT;