Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168753314 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() { * @param streamFactory The factory that we can use for writing our state to streams. * @param checkpointOptions Options for how to perform this checkpoint. * @return Future to the state handle of the snapshot data. - * @throws Exception + * @throws Exception indicating a problem in the synchronous part of the checkpoint. */ @Override - public RunnableFuture<KeyedStateHandle> snapshot( + public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture<KeyedStateHandle> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection<KeyedStateHandle> restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", - checkpointTimestamp); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); try { - snapshotOperation.takeSnapshot(); - } catch (Exception e) { - snapshotOperation.stop(); - snapshotOperation.releaseResources(true); - throw e; - } - - return new FutureTask<KeyedStateHandle>( - new Callable<KeyedStateHandle>() { - @Override - public KeyedStateHandle call() throws Exception { - return snapshotOperation.materializeSnapshot(); + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else { + KeyedStateHandle firstStateHandle = restoreState.iterator().next(); + if (firstStateHandle instanceof IncrementalKeyedStateHandle + || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } } - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } - - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); - } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture<KeyedStateHandle> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation<K> snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return DoneFuture.nullValue(); + if (!enableIncrementalCheckpointing) { + return; } - snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = - new AbstractAsyncCallableWithResources<KeyedStateHandle>() { - - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } + synchronized (materializedSstFiles) { - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } + lastCompletedCheckpointId = completedCheckpointId; + } + } - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } + private void createDB() throws IOException { + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); + } - @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long startTime = System.currentTimeMillis(); + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - if (isStopped()) { - throw new IOException("RocksDB closed."); - } + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - snapshotOperation.writeDBSnapshot(); + // we add the required descriptor for the default CF in FIRST position, see + // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions)); + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + RocksDB dbRef; - return snapshotOperation.getSnapshotResultStateHandle(); - } - }; + try { + dbRef = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } - LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); - return AsyncStoppableTaskWithCallback.from(ioCallable); + return dbRef; } /** - * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. */ - static final class RocksDBFullSnapshotOperation<K> { - - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend<K> stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final CheckpointStreamFactory checkpointStreamFactory; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private long checkpointId; - private long checkpointTimeStamp; - - private Snapshot snapshot; - private ReadOptions readOptions; - private List<Tuple2<RocksIterator, Integer>> kvStateIterators; - - private CheckpointStreamFactory.CheckpointStateOutputStream outStream; - private DataOutputView outputView; + private static final class RocksDBFullRestoreOperation<K> { - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) throws IOException { + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } + /** Current key-groups state handle from which we restore key-groups. */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream. */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ + private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** - * 1) Create a snapshot object from RocksDB. + * Creates a restore operation object for the given state backend instance. * - * @param checkpointId id of the checkpoint for which we take the snapshot - * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); - this.checkpointId = checkpointId; - this.checkpointTimeStamp = checkpointTimeStamp; - this.snapshot = stateBackend.db.getSnapshot(); + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * Restores all key-groups data that is referenced by the passed state handles. * - * @throws Exception + * @param keyedStateHandles List of all key groups state handles that shall be restored. */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - snapshotCloseableRegistry.registerCloseable(outStream); - outputView = new DataOutputViewStreamWrapper(outStream); - } + public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) + throws IOException, StateMigrationException, RocksDBException { - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException { + rocksDBKeyedStateBackend.createDB(); - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { - Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + restoreKeyGroupsInStateHandle(); + } + } } /** - * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot + * Restore one key groups state handle. */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { - - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + private void restoreKeyGroupsInStateHandle() + throws IOException, StateMigrationException, RocksDBException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { + IOUtils.closeQuietly(currentStateHandleInStream); } } - return null; } /** - * 5) Release the snapshot object for RocksDB and clean up. + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException */ - public void releaseSnapshotResources() { + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - outStream = null; + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - if (null != kvStateIterators) { - for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } + serializationProxy.read(currentStateHandleInView); - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - this.dbLease.close(); - } - - private void writeKVStateMetaData() throws IOException { + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); + currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - int kvStateId = 0; - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); + if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + nameBytes, + rocksDBKeyedStateBackend.columnOptions); - ++kvStateId; - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.getKeySerializer(), - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); + rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - serializationProxy.write(outputView); - } + ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - private void writeKVStateData() throws IOException, InterruptedException { + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - byte[] previousKey = null; - byte[] previousValue = null; - OutputStream kgOutStream = null; - DataOutputView kgOutView = null; + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); + } + } - try { - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; - // handover complete, null out to prevent double close - kvStateIterators = null; + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + boolean keyGroupHasMoreKeys = true; + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } } + } + } + } + } - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. + */ + private static class RocksDBIncrementalRestoreOperation<T> { - assert (!hasMetaDataFollowsFlag(previousKey)); + private final RocksDBKeyedStateBackend<T> stateBackend; - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { + this.stateBackend = stateBackend; + } - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); + /** + * Root method that branches for different implementations of {@link KeyedStateHandle}. + */ + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - setMetaDataFollowsFlagInKey(previousKey); - } + boolean hasExtraKeys = (restoreStateHandles.size() > 1 || + !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange)); - writeKeyValuePair(previousKey, previousValue, kgOutView); + if (hasExtraKeys) { + stateBackend.createDB(); + } - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - kgOutView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - } + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } + if (rawStateHandle instanceof IncrementalKeyedStateHandle) { + restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys); + } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { + Preconditions.checkState(!hasExtraKeys, "Cannot recover from local state after rescaling."); + restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle); + } else { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); } + } + } - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue, kgOutView); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - kgOutStream = null; - } + /** + * Recovery from remote incremental state. + */ + private void restoreInstance( + IncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path temporaryRestoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + + transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); + + if (hasExtraKeys) { + restoreKeyGroupsShardWithTemporaryHelperInstance( + temporaryRestoreInstancePath, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } else { + + // since we transferred all remote state to a local directory, we can use the same code as for + // local recovery. + IncrementalLocalKeyedStateHandle localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( + restoreStateHandle.getBackendIdentifier(), + restoreStateHandle.getCheckpointId(), + new DirectoryStateHandle(temporaryRestoreInstancePath), + restoreStateHandle.getKeyGroupRange(), + restoreStateHandle.getMetaStateHandle(), + restoreStateHandle.getSharedState().keySet()); + + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } } finally { - // this will just close the outer stream - IOUtils.closeQuietly(kgOutStream); + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } } } - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); - } + /** + * Recovery from local incremental state. + */ + private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(localKeyedStateHandle.getMetaDataState()); - static void setMetaDataFollowsFlagInKey(byte[] key) { - key[0] |= FIRST_BIT_IN_BYTE_MASK; - } + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); } - static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } + /** + * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot. + */ + private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors( + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { - private static void checkInterrupted() throws InterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("RocksDB snapshot interrupted."); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateMetaInfoSnapshots.size()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); } + return columnFamilyDescriptors; } - } - private static final class RocksDBIncrementalSnapshotOperation<K> { + /** + * This method implements the core of the restore logic that unifies how local and remote state are recovered. + */ + private void restoreLocalStateIntoFullInstance( + IncrementalLocalKeyedStateHandle restoreStateHandle, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { + // pick up again the old backend id, so the we can reference existing state + stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", + stateBackend.operatorIdentifier, stateBackend.backendUID); + + // create hard links in the instance directory + if (!stateBackend.instanceRocksDBPath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); + } - /** The backend which we snapshot. */ - private final RocksDBKeyedStateBackend<K> stateBackend; + Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory(); + restoreInstanceDirectoryFromPath(restoreSourcePath); - /** Stream factory that creates the outpus streams to DFS. */ - private final CheckpointStreamFactory checkpointStreamFactory; + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - /** Id for the current checkpoint. */ - private final long checkpointId; + stateBackend.db = stateBackend.openDB( + stateBackend.instanceRocksDBPath.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); - /** Timestamp for the current checkpoint. */ - private final long checkpointTimestamp; + // extract and store the default column family which is located at the first index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0); - /** All sst files that were part of the last previously completed checkpoint. */ - private Set<StateHandleID> baseSstFiles; + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - /** The state meta data. */ - private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); - private FileSystem backupFileSystem; - private Path backupPath; + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + new Tuple2<>(columnFamilyHandle, stateMetaInfo)); + } - // Registry for all opened i/o streams - private final CloseableRegistry closeableRegistry = new CloseableRegistry(); + // use the restore sst files as the base for succeeding checkpoints + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put( + restoreStateHandle.getCheckpointId(), + restoreStateHandle.getSharedStateHandleIDs()); + } - // new sst files since the last completed checkpoint - private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); + stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); + } - // handles to the misc files in the current snapshot - private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + /** + * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from + * a local state. + */ + private void restoreInstanceDirectoryFromPath(Path source) throws IOException { - // This lease protects from concurrent disposal of the native rocksdb instance. - private final ResourceGuard.Lease dbLease; + FileSystem fileSystem = source.getFileSystem(); - private StreamStateHandle metaStateHandle = null; + final FileStatus[] fileStatuses = fileSystem.listStatus(source); - private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - long checkpointId, - long checkpointTimestamp) throws IOException { + if (fileStatuses == null) { + throw new IOException("Cannot list file statues. Directory " + source + " does not exist."); + } - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.checkpointId = checkpointId; - this.checkpointTimestamp = checkpointTimestamp; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + File restoreFile = new File(source.getPath(), fileName); + File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName); + if (fileName.endsWith(SST_FILE_SUFFIX)) { + // hardlink'ing the immutable sst-files. + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } else { + // true copy for all other files. + Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } } - private StreamStateHandle materializeStateData(Path filePath) throws Exception { + /** + * Reads Flink's state meta data file from the state handle. + */ + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; try { - final byte[] buffer = new byte[8 * 1024]; - - FileSystem backupFileSystem = backupPath.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerCloseable(inputStream); - - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - closeableRegistry.registerCloseable(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - if (numBytes == -1) { - break; - } + KeyedBackendSerializationProxy<T> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); - outputStream.write(buffer, 0, numBytes); - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + stateBackend.keySerializer) + .isRequiresMigration()) { - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - return result; + return serializationProxy.getStateMetaInfoSnapshots(); } finally { - if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) { + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } - - if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } } } - private StreamStateHandle materializeMetaData() throws Exception { - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - closeableRegistry.registerCloseable(outputStream); + private void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest) throws IOException { - //no need for compression scheme support because sst-files are already compressed - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.keySerializer, - stateMetaInfoSnapshots, - false); + final Map<StateHandleID, StreamStateHandle> sstFiles = + restoreStateHandle.getSharedState(); + final Map<StateHandleID, StreamStateHandle> miscFiles = + restoreStateHandle.getPrivateState(); - DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + transferAllDataFromStateHandles(sstFiles, dest); + transferAllDataFromStateHandles(miscFiles, dest); + } - serializationProxy.write(out); + /** + * Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their + * {@link StateHandleID}. + */ + private void transferAllDataFromStateHandles( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - } finally { - if (outputStream != null) { - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } + for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); } } - void takeSnapshot() throws Exception { + /** + * Copies the file from a single state handle to the given path. + */ + private void copyStateDataHandleData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { - final long lastCompletedCheckpoint; + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); - // use the last completed checkpoint as the comparison base. - synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); - } + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - // save meta data - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { - stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); - } + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerCloseable(outputStream); - // save state data - backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + byte[] buffer = new byte[8 * 1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } - LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } - backupFileSystem = backupPath.getFileSystem(); - if (backupFileSystem.exists(backupPath)) { - throw new IllegalStateException("Unexpected existence of the backup directory."); + if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } } - - // create hard links of living files in the checkpoint path - Checkpoint checkpoint = Checkpoint.create(stateBackend.db); - checkpoint.createCheckpoint(backupPath.getPath()); } - KeyedStateHandle materializeSnapshot() throws Exception { + /** + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. + */ + private void restoreKeyGroupsShardWithTemporaryHelperInstance( + Path restoreInstancePath, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { - stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - // write meta data - metaStateHandle = materializeMetaData(); + try (RocksDB restoreDb = stateBackend.openDB( + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { - // write state data - Preconditions.checkState(backupFileSystem.exists(backupPath)); + final ColumnFamilyHandle defaultColumnFamily = columnFamilyHandles.remove(0); - FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); + Preconditions.checkState(columnFamilyHandles.size() == columnFamilyDescriptors.size()); - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); + try { + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, materializeStateData(filePath)); - } - } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(stateHandleID, fileHandle); - } - } - } + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); - } + if (null == registeredStateMetaInfoEntry) { - return new IncrementalKeyedStateHandle( - stateBackend.backendUID, - stateBackend.keyGroupRange, - checkpointId, - sstFiles, - miscFiles, - metaStateHandle); - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); - void stop() { + registeredStateMetaInfoEntry = + new Tuple2<>( + stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Could not properly close io streams.", e); - } - } - } + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + registeredStateMetaInfoEntry); + } - void releaseResources(boolean canceled) { + ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; - dbLease.close(); + try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Exception on closing registry.", e); - } - } + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } - if (backupPath != null) { - try { - if (backupFileSystem.exists(backupPath)) { + iterator.seek(startKeyGroupPrefixBytes); - LOG.trace("Deleting local RocksDB backup path {}.", backupPath); - backupFileSystem.delete(backupPath, true); - } - } catch (Exception e) { - LOG.warn("Could not properly delete the checkpoint directory.", e); - } - } + while (iterator.isValid()) { - if (canceled) { - Collection<StateObject> statesToDiscard = - new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } - statesToDiscard.add(metaStateHandle); - statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(sstFiles.values()); + if (stateBackend.keyGroupRange.contains(keyGroup)) { + stateBackend.db.put(targetColumnFamilyHandle, + iterator.key(), iterator.value()); + } - try { - StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); - } catch (Exception e) { - LOG.warn("Could not properly discard states.", e); + iterator.next(); + } + } // releases native iterator resources + } + } finally { + + //release native tmp db column family resources + IOUtils.closeQuietly(defaultColumnFamily); + + for (ColumnFamilyHandle flinkColumnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(flinkColumnFamilyHandle); + } } - } + } // releases native tmp db resources } } - @Override - public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - LOG.info("Initializing RocksDB keyed state backend from snapshot."); + // ------------------------------------------------------------------------ + // State factories + // ------------------------------------------------------------------------ - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}.", restoreState); - } + /** + * Creates a column family handle for use with a k/v state. When restoring from a snapshot + * we don't restore the individual k/v states, just the global RocksDB database and the + * list of column families. When a k/v state is first requested we check here whether we + * already have a column family for that and return it or create a new one if it doesn't exist. + * + * <p>This also checks whether the {@link StateDescriptor} for a state matches the one + * that we checkpointed, i.e. is already in the map of column families. + */ + @SuppressWarnings("rawtypes, unchecked") + protected <N, S> ColumnFamilyHandle getColumnFamily( + StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { - // clear all meta data - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = + kvStateInformation.get(descriptor.getName()); - try { - if (restoreState == null || restoreState.isEmpty()) { - createDB(); - } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { - RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - restoreOperation.restore(restoreState); - } else { - RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); - restoreOperation.doRestore(restoreState); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); + + if (stateInfo != null) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); } - } catch (Exception ex) { - dispose(); - throw ex; + + // check compatibility results to determine if state migration is required + CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getNamespaceSerializer(), + null, + restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), + newMetaInfo.getNamespaceSerializer()); + + CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getStateSerializerConfigSnapshot(), + newMetaInfo.getStateSerializer()); + + if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) { + // TODO state migration currently isn't possible. + throw new StateMigrationException("State migration isn't supported, yet."); + } else { + stateInfo.f1 = newMetaInfo; + return stateInfo.f0; + } + } + + byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); + + final ColumnFamilyHandle columnFamily; + + try { + columnFamily = db.createColumnFamily(columnDescriptor); + } catch (RocksDBException e) { + throw new IOException("Error creating ColumnFamilyHandle.", e); } + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple = + new Tuple2<>(columnFamily, newMetaInfo); + Map rawAccess = kvStateInformation; + rawAccess.put(descriptor.getName(), tuple); + return columnFamily; } @Override - public void notifyCheckpointComplete(long completedCheckpointId) { - synchronized (materializedSstFiles) { - if (completedCheckpointId < lastCompletedCheckpointId) { - return; - } + protected <N, T> InternalValueState<N, T> createValueState( + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<T> stateDesc) throws Exception { - materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - lastCompletedCheckpointId = completedCheckpointId; - } + return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); } - private void createDB() throws IOException { - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); - this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); - this.defaultColumnFamily = columnFamilyHandles.get(0); + @Override + protected <N, T> InternalListState<N, T> createListState( + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); } - private RocksDB openDB( - String path, - List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, - List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { + @Override + protected <N, T> InternalReducingState<N, T> createReducingState( + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<T> stateDesc) throws Exception { - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); + return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } - // we add the required descriptor for the default CF in last position. - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); + @Override + protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState( + TypeSerializer<N> namespaceSerializer, + AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { - RocksDB dbRef; + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } - try { - dbRef = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - stateColumnFamilyHandles); - } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); - } + @Override + protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState( + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { - // requested + default CF - Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), - "Not all requested column family handles have been created"); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - return dbRef; + return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState( + TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); } /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. + * Only visible for testing, DO NOT USE. */ - static final class RocksDBFullRestoreOperation<K> { + public File getInstanceBasePath() { + return instanceBasePath; + } - private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; + @Override + public boolean supportsAsynchronousSnapshots() { + return true; + } - /** Current key-groups state handle from which we restore key-groups. */ - private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle. */ - private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream. */ - private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ - private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; - /** The compression decorator that was used for writing the state, as determined by the meta data. */ - private StreamCompressionDecorator keygroupStreamCompressionDecorator; + @VisibleForTesting + @SuppressWarnings("unchecked") + @Override + public int numStateEntries() { + int count = 0; - /** - * Creates a restore operation object for the given state backend instance. - * - * @param rocksDBKeyedStateBackend the state backend into which we restore - */ - public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { - this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) { + try (RocksIterator rocksIterator = db.newIterator(column.f0)) { + rocksIterator.seekToFirst(); + + while (rocksIterator.isValid()) { + count++; + rocksIterator.next(); + } + } + } + + return count; + } + + + + /** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ + @VisibleForTesting + static final class RocksDBMergeIterator implements AutoCloseable { + + private final PriorityQueue<MergeIterator> heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + + private MergeIterator currentSubIterator; + + private static final List<Comparator<MergeIterator>> COMPARATORS; + + static { + int maxBytes = 4; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i; + COMPARATORS.add(new Comparator<MergeIterator>() { + @Override + public int compare(MergeIterator o1, MergeIterator o2) { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.currentKey, o2.currentKey, currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + } + }); + } + } + + RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); + + if (kvStateIterators.size() > 0) { + PriorityQueue<MergeIterator> iteratorPriorityQueue = + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + IOUtils.closeQuietly(rocksIterator); + } + } + + kvStateIterators.clear(); + + this.heap = iteratorPriorityQueue; + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; } /** - * Restores all key-groups data that is referenced by the passed state handles. - * - * @param keyedStateHandles List of all key groups state handles that shall be restored. + * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after + * calls to {@link #next()}. */ - public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, StateMigrationException, RocksDBException { + public void next() { + newKeyGroup = false; + newKVState = false; - rocksDBKeyedStateBackend.createDB(); + final RocksIterator rocksIterator = currentSubIterator.getIterator(); + rocksIterator.next(); - for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (keyedStateHandle != null) { + byte[] oldKey = currentSubIterator.getCurrentKey(); + if (rocksIterator.isValid()) { + currentSubIterator.currentKey = rocksIterator.key(); - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); - } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); + if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + heap.offer(currentSubIterator); + currentSubIterator --- End diff -- Tried, still does't match the highlighted. The correct line is: `1701`.
---