[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3801 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703946 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.getM
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703775 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializ
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114580123 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- Ok, then we can also look at this again later. For now it is ok as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114579898 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file state.", e); + } + + if (!registered) { + for (String newSstFileName : newSstFileNames) { + StreamStateHandle handle = sstFiles.get(newSstFileName); +
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565991 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- I attempted to put the restore state in the constructor as we discussed. But it turns out impossible. All state backends should be registered in the task so that the backends can be closed when the task is canceled. If we put the restoring in the constructor of the backends, the construction of the backends may be blocked (e.g., due to the access to HDFS). Since the construction is not completed yet, the backend will not be registered and hence will not be closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565968 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file state.", e); + } + + if (!registered) { + for (String newSstFileName : newSstFileNames) { + StreamStateHandle handle = sstFiles.get(newSstFileName); +
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114504710 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114500597 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializ
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114380645 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.g
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114362074 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114284380 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { --- End diff -- I think somehow adding the word `incremental` to the class name would help to differentiate this from other state handles created by RocksDB in full backups, because that is the distinguishing feature of this handle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114360519 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114318544 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; --- End diff -- I would suggest a small comment on the meaning of this, because it is actually quiet important that this tracks which files can be deleted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114287252 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java --- @@ -36,7 +36,8 @@ */ @Test public void testOrdinalsAreConstant() { - assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal()); - assertEquals(1, CheckpointType.SAVEPOINT.ordinal()); + assertEquals(0, CheckpointType.INCREMENTAL_CHECKPOINT.ordinal()); + assertEquals(1, CheckpointType.FULL_CHECKPOINT.ordinal()); + assertEquals(2, CheckpointType.SAVEPOINT.ordinal()); --- End diff -- The purpose of this test is to ensures that encoding of checkpoint types remains stable for compatibility of Flink savepoints, and this changes would break the compatibility. In this sense, it should enforce "append-only" for new chckpoint types. To fix this problem, `CheckpointType.INCREMENTAL_CHECKPOINT` should be encoded as value 2 and the other types must be changed back to their old value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114320148 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; --- End diff -- Overall, I wonder how this class impacts larger deployments, where a lot of those state handles are send via RPC. I suggest to keep the serialization footprint as small as possible. For example, maybe we can eliminate `newSstFileNames` by introducing two `Map`: `newSstFiles` and `previousSstFiles`, instead of checking against a set. This could even simplify some methods, e.g. the registration or deletion code. We could then provide a method that delivers an combined iterator over all `Entry`from both maps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114363332 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.g
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114354639 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114289498 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; --- End diff -- I wonder if it would make sense to also return something like `KeyedStateHandle.EMPTY` here that has a `KeyGroupRange.EMPTY_KEY_GROUP_RANGE` to avoid the use of `null` and the corresponding checks in other code parts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114318030 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; --- End diff -- We could check for some notNull preconditions in the ctor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114359825 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114286771 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- I think this would be a good opportunity to switch from lazy restore to eagerly passing state handles to the factory method for the keyed state backend implementations. The factory, in turn, can eagerly pass the restore state to the constructor. I think this could be done in a followup PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114368960 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -137,6 +156,14 @@ /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; + /** The sst files materialized in pending checkpoints */ + private final SortedMap> materializedSstFiles = new TreeMap<>(); + + /** The identifier of the last completed checkpoint */ + private final long lastCompletedCheckpointId = -1; --- End diff -- Currently, this value `lastCompletedCheckpointId` is not maintained at all, and also the `materializedSstFiles` is ever-growing. I think the whole feedback from the checkpoint coordinator about completed checkpoints is still missing. Are you planning to do this in another PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114318333 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} --- End diff -- I think it would be helpful to describe a bit what this is composed of, for example what are the misc files? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114365154 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.g
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114361888 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); --- End diff -- We could set `outputStream` to `null` before returning, to avoid a second call to `close()` in the `finally` clause --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114325070 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -265,9 +281,64 @@ private boolean hasRegisteredState() { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { + if (checkpointOptions.getCheckpointType() == CheckpointOptions.CheckpointType.INCREMENTAL_CHECKPOINT) { + return snapshotIncrementally(checkpointId, timestamp, streamFactory); + } else { + return snapshotFully(checkpointId, timestamp, streamFactory); + } + } + + private RunnableFuture snapshotIncrementally( + final long checkpointId, --- End diff -- This method has some amount of duplicated code with `snapshotFully`. My suggestion would be target for some common abstract strategy common to all checkpointing, e.g. `takeSnapshot`, `materialize`, and `releaseResources`. Then we could have a factory that, depending on the checkpoint type, instantiates the right implementation. This would also help with a second concern: I can see that the code in this class has grown a lot over time, so this could be the time to move some aspects like checkpointing strategies into separated classes (they are already static inner classes anyways). By splitting a bit between processing logic and snapshot/restore logic, we can keep things more modular and separation of concerns. We can also do this cleanup in a followup PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114364811 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.g
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114288802 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file state.", e); + } + + if (!registered) { + for (String newSstFileName : newSstFileNames) { + StreamStateHandle handle = sstFiles.get(newSstFileName); +
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114321075 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -152,6 +179,10 @@ public RocksDBKeyedStateBackend( ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + + this.jobId = jobId; --- End diff -- We could introduce preconditon checks against `null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114283114 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + seria
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114282104 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serialization
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114270508 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serialization
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3801 [FLINK-6364] [checkpoints] Implement incremental checkpointing in RocksDBKeyedStateBackend This is the initial implementation of incremental checkpointing in RocksDBKeyedStateBackend. Changes include 1. Add a new `CheckpointType` for incremental checkpoints. 2. Call the `restore()` method for all `KeyedStateBackend` if the restore state handle is null or empty. 3. Implement `RocksDBIncrementalSnapshotOperation` and `RocksDBIncrementalRestoreOperation` which supports incremental snapshotting and restoring with/without parallelism changes, respectively. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6364 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3801.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3801 commit c5340a2186ecfbc48c46048db5adb11af83db511 Author: xiaogang.sxg Date: 2017-04-29T15:44:36Z Implement incremental checkpointing in RocksDBKeyedStateBackend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---