[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-08 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3801


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703946
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.getM

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703775
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   serializ

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114580123
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

Ok, then we can also look at this again later. For now it is ok as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114579898
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc file state.", 
e);
+   }
+
+   if (!registered) {
+   for (String newSstFileName : newSstFileNames) {
+   StreamStateHandle handle = 
sstFiles.get(newSstFileName);
+ 

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565991
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

I attempted to put the restore state in the constructor as we discussed. 
But it turns out impossible. 

All state backends should be registered in the task so that the backends 
can be closed when the task is canceled.  If we put the restoring in the 
constructor of the backends, the construction of the backends may be blocked 
(e.g., due to the access to HDFS). Since the construction is not completed yet, 
the backend will not be registered and hence will not be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565968
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc file state.", 
e);
+   }
+
+   if (!registered) {
+   for (String newSstFileName : newSstFileNames) {
+   StreamStateHandle handle = 
sstFiles.get(newSstFileName);
+

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114504710
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114500597
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   serializ

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114380645
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.g

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114362074
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114284380
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
--- End diff --

I think somehow adding the word `incremental` to the class name would help 
to differentiate this from other state handles created by RocksDB in full 
backups, because that is the distinguishing feature of this handle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114360519
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114318544
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
--- End diff --

I would suggest a small comment on the meaning of this, because it is 
actually quiet important that this tracks which files can be deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114287252
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
 ---
@@ -36,7 +36,8 @@
 */
@Test
public void testOrdinalsAreConstant() {
-   assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
-   assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+   assertEquals(0, 
CheckpointType.INCREMENTAL_CHECKPOINT.ordinal());
+   assertEquals(1, CheckpointType.FULL_CHECKPOINT.ordinal());
+   assertEquals(2, CheckpointType.SAVEPOINT.ordinal());
--- End diff --

The purpose of this test is to ensures that encoding of checkpoint types 
remains stable for compatibility of Flink savepoints, and this changes would 
break the compatibility. In this sense, it should enforce "append-only" for new 
chckpoint types. To fix this problem, `CheckpointType.INCREMENTAL_CHECKPOINT` 
should be encoded as value 2 and the other types must be changed back to their 
old value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114320148
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
--- End diff --

Overall, I wonder how this class impacts larger deployments, where a lot of 
those state handles are send via RPC. I suggest to keep the serialization 
footprint as small as possible. For example, maybe we can eliminate 
`newSstFileNames` by introducing two `Map`: 
`newSstFiles` and `previousSstFiles`, instead of checking against a set. This 
could even simplify some methods, e.g. the registration or deletion code. We 
could then provide a method that delivers an combined iterator over all 
`Entry`from both maps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114363332
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.g

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114354639
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114289498
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
--- End diff --

I wonder if it would make sense to also return something like 
`KeyedStateHandle.EMPTY` here that has a `KeyGroupRange.EMPTY_KEY_GROUP_RANGE` 
to avoid the use of `null` and the corresponding checks in other code parts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114318030
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
--- End diff --

We could check for some notNull preconditions in the ctor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114359825
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114286771
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

I think this would be a good opportunity to switch from lazy restore to 
eagerly passing state handles to the factory method for the keyed state backend 
implementations. The factory, in turn, can eagerly pass the restore state to 
the constructor.
I think this could be done in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114368960
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -137,6 +156,14 @@
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
 
+   /** The sst files materialized in pending checkpoints */
+   private final SortedMap> 
materializedSstFiles = new TreeMap<>();
+
+   /** The identifier of the last completed checkpoint */
+   private final long lastCompletedCheckpointId = -1;
--- End diff --

Currently, this value `lastCompletedCheckpointId` is not maintained at all, 
and also the `materializedSstFiles` is ever-growing. I think the whole feedback 
from the checkpoint coordinator about completed checkpoints is still missing. 
Are you planning to do this in another PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114318333
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
--- End diff --

I think it would be helpful to describe a bit what this is composed of, for 
example what are the misc files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114365154
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.g

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114361888
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
--- End diff --

We could set `outputStream` to `null` before returning, to avoid a second 
call to `close()` in the `finally` clause


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114325070
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -265,9 +281,64 @@ private boolean hasRegisteredState() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
+   if (checkpointOptions.getCheckpointType() == 
CheckpointOptions.CheckpointType.INCREMENTAL_CHECKPOINT) {
+   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
+   } else {
+   return snapshotFully(checkpointId, timestamp, 
streamFactory);
+   }
+   }
+
+   private RunnableFuture snapshotIncrementally(
+   final long checkpointId,
--- End diff --

This method has some amount of duplicated code with `snapshotFully`. My 
suggestion would be target for some common abstract strategy common to all 
checkpointing, e.g. `takeSnapshot`, `materialize`, and `releaseResources`. Then 
we could have a factory that, depending on the checkpoint type, instantiates 
the right implementation. This would also help with a second concern: I can see 
that the code in this class has grown a lot over time, so this could be the 
time to move some aspects like checkpointing strategies into separated classes 
(they are already static inner classes anyways). By splitting a bit between 
processing logic and snapshot/restore logic, we can keep things more modular 
and separation of concerns.

We can also do this cleanup in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114364811
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.g

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114288802
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc file state.", 
e);
+   }
+
+   if (!registered) {
+   for (String newSstFileName : newSstFileNames) {
+   StreamStateHandle handle = 
sstFiles.get(newSstFileName);
+ 

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114321075
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -152,6 +179,10 @@ public RocksDBKeyedStateBackend(
) throws IOException {
 
super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
+
+   this.jobId = jobId;
--- End diff --

We could introduce preconditon checks against `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114283114
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   seria

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114282104
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   serialization

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-02 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114270508
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   serialization

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-04-29 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3801

[FLINK-6364] [checkpoints] Implement incremental checkpointing in 
RocksDBKeyedStateBackend

This is the initial implementation of incremental checkpointing in 
RocksDBKeyedStateBackend. Changes include 
1. Add a new `CheckpointType` for incremental checkpoints.
2. Call the `restore()` method for all `KeyedStateBackend` if the restore 
state handle is null or empty.
3. Implement `RocksDBIncrementalSnapshotOperation` and 
`RocksDBIncrementalRestoreOperation` which supports incremental snapshotting 
and restoring with/without parallelism changes, respectively.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3801


commit c5340a2186ecfbc48c46048db5adb11af83db511
Author: xiaogang.sxg 
Date:   2017-04-29T15:44:36Z

Implement incremental checkpointing in RocksDBKeyedStateBackend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---