[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2018-05-10 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
Because the old code is too outdated, I have updated the PR, reimplementing 
RocksDBInternalTimerService from scratch. Some problems mentioned in the 
previous comments may still exist, but I think we can start a new round of 
reviewing.


---


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2018-05-02 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
@StefanRRichter Sorry for the delayed response.  I am working on it and 
shall update the PR by this weekend.


---


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2017-11-29 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
Very sorry for the delay. I was engaged at the work in the past months, 
making flink capable of the terrible data flows in Singles Day. 

RocksDBInternalTimerService is among the improvements done. 
But we adopt a very different implementation since the initial 
implementation presented here has several problems:
* The initial implementation requires other rocksdb instances than the one 
used in RocksDBKeyedStateBackend, which makes the resource configuration very 
difficult. 
* The snapshotting of RocksDBInternalTimerService here is very inefficient. 
Though an asynchronous and incremental implementation is available, it will 
duplicate much code in RocksDBKeyedStateBackend.

We address these problem by introducing `SecondaryKeyedState`s which 
provide non-keyed access methods to the data inside a key group. Similar to 
normal keyed state, secondary keyed states are partitioned in to key groups and 
are also stored in the backends. Hence these secondary states can also benefit 
from asynchronous and incremental snapshotting in `RocksDBKeyedStateBackend`. 

What do you think of the changes ? @StefanRRichter 

 


---


[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...

2017-05-12 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3859


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed synchr...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3859
  
I noticed that FLINK-6504 is also fixed in 
https://github.com/apache/flink/pull/3870 , close this pr and let us address 
all problems of incremental checkpointing there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116172093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -18,91 +18,137 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
 
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
  * maintain the reference count of {@link SharedStateHandle}s which are 
shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
  */
 public class SharedStateRegistry {
 
private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
/** All registered state objects by an artificial key */
-   private final Map<String, SharedStateRegistry.SharedStateEntry> 
registeredStates;
+   private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
+
+   /** Executor for async state deletion */
+   private final Executor asyncDisposalExecutor;
 
public SharedStateRegistry() {
this.registeredStates = new HashMap<>();
+   this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
--- End diff --

I prefer not to use another asynchronous executor here.

In my initial implementation of `SharedStateRegistry`, unreferenced shared 
states are not discarded immediately and are returned in a list. These 
unreferenced shared states then are discarded outside the synchronization 
scope. Given that the completed checkpoints are already discarded in an 
asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more 
used in practice), we can avoid the usage of another asynchronous executor 
here. 

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161754
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -922,6 +940,39 @@ void releaseResources(boolean canceled) {
}
}
}
+
+   /**
+* A placeholder state handle for shared state that will 
replaced by an original that was
+* created in a previous checkpoint. So we don't have to send 
the handle twice, e.g. in
+* case of {@link ByteStreamStateHandle}.
+*/
+   private static final class PlaceholderStreamStateHandle 
implements StreamStateHandle {
--- End diff --

I think we can move `PlaceholderStreamStateHandle` out so that it can be 
used by other state backends. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161318
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount == 1);
+   SharedStateRegistry.Result result =
+   stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+   // We update our reference with the result from the 
registry, to prevent the following
+   // problem:
+   // A previous checkpoint n has already registered the 
state. This can happen if a
+   // following checkpoint (n + x) wants to reference the 
same state before the backend got
+   // notified that checkpoint n completed. In this case, 
the shared registry did
+   // deduplication and returns the previous reference.
+   newSstFileEntry.setValue(result.getReference());
}
 
-   for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
oldSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+   for (Map.Entry<String, StreamStateHandle> oldSstFileName : 
registeredSstFiles.entrySet()) {
--- End diff --

Similar to the previous comment, `oldSstFileName` can be renamed to 
`unregisteredSstFileEntry` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161230
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

I think it's better to rename `newSstFileEntry`to 
`unregisteredSstFileEntry`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161117
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount == 1);
+   SharedStateRegistry.Result result =
+   stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+   // We update our reference with the result from the 
registry, to prevent the following
+   // problem:
+   // A previous checkpoint n has already registered the 
state. This can happen if a
+   // following checkpoint (n + x) wants to reference the 
same state before the backend got
+   // notified that checkpoint n completed. In this case, 
the shared registry did
+   // deduplication and returns the previous reference.
+   newSstFileEntry.setValue(result.getReference());
}
 
-   for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
oldSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+   for (Map.Entry<String, StreamStateHandle> oldSstFileName : 
registeredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+   SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount > 1);
+   // Again we update our state handle with the result 
from the registry, thus replacing
+   // placeholder state handles with the originals.
+   oldSstFileName.setValue(result.getReference());
}
 
+   // Migrate state from unregistered to registered, so that it 
will not count as private state
+   // for #discardState() from now.
+   registeredSstFiles.putAll(unregisteredSstFiles);
+   unregisteredSstFiles.clear();
+
registered = true;
}
 
@Override
public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
 
-   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-   stateRegistry.unregister(new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+   for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

We should not unregister those sst files that are not registered before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...

2017-05-11 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3859#discussion_r116148624
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -911,9 +915,11 @@ void releaseResources(boolean canceled) {
if (canceled) {
List statesToDiscard = new 
ArrayList<>();
 
-   statesToDiscard.add(metaStateHandle);
-   statesToDiscard.addAll(miscFiles.values());
-   statesToDiscard.addAll(newSstFiles.values());
+   synchronized (this) {
--- End diff --

Yes, i agree. The key point here is to make sure the stopping of the 
materialization thread. Synchronization does little help here. So i prefer to 
remove synchronization here, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...

2017-05-09 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3859

[FLINK-6504] [FLINK-6467] [checkpoints] Add needed synchronization for 
RocksDBIncrementalSnapshotOperation

This pull request adds missing synchronization for the access to the 
following  variables:
1. `materializedSstFiles` in `RocksDBKeyedStateBackend`: The variable may 
be accessed simultaneously by the processing thread (read) and the 
materialization threads (write). Now we use `asynchronousSnapshotLock` to 
prevent concurrent access.
2. `newSstFiles`, `oldSstFiles` and `metaStateHandle` in 
`RocksDBIncrementalSnapshotOperation`: These variables may be accessed by both 
the cancel thread and the materialization thread. Though the materialization 
thread are supposed to be stopped when `releaseResources()` is executed, we add 
synchronization here to prevent potential conflicts.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6504

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3859.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3859


commit 1c1a08e0f7db5ec8663a46c495468583983c7291
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-05-10T01:55:34Z

Add needed synchronization for RocksDBIncrementalSnapshotOperation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-08 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3801


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

2017-05-05 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3801
  
Hi @StefanRRichter Thanks a lot for you pointing out the problem the 
suggestion for the fix. I have updated the PR as suggested. A 
`CloseableRegistry` is used to track opened i/o streams now. And the opened i/o 
streams are closed first in the cancellation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

2017-05-04 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3801
  
@StefanRRichter  Thanks a lot for your review. I have updated the pull 
request as suggested. The following changes are made

1. Remove the checkpoint type for incremental checkpoints. Now the support 
for incremental checkpointing becomes a configurable feature in 
`RocksDBKeyedStateBackend`, just like asynchronous checkpointing in 
`HeapKeyedStateBackend`. Incremental checkpointing will be performed if the 
feature is enabled and the checkpoint to perform is not a savepoint.

2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` 
and do some refactoring.

3. Allow `KeyedStateHandle` to register shared states.

4. Maintain the information of last completed checkpoint with the 
notification of `AbstractStreamOperator`.

5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources 
in both full and incremental checkpointing.

6. Parameterize `PartitionedStateCheckpointingITCase` to test the 
snapshotting and restoring with different backend settings.

It's appreciated if you can take a look at these changes. Any comment is 
welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-04 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703946
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List<KeyedBackendSerializationProxy.StateMetaInfo> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map<String, StreamStateHandle> sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry<String, StreamStateHandle> 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName)

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703775
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map<String, StreamStateHandle> baseSstFiles;
+
+   private List<KeyedBackendSerializationProxy.StateMetaInfo> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerialization

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565991
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

I attempted to put the restore state in the constructor as we discussed. 
But it turns out impossible. 

All state backends should be registered in the task so that the backends 
can be closed when the task is canceled.  If we put the restoring in the 
constructor of the backends, the construction of the backends may be blocked 
(e.g., due to the access to HDFS). Since the construction is not completed yet, 
the backend will not be registered and hence will not be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565968
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map<String, StreamStateHandle> sstFiles;
+
+   private final Map<String, StreamStateHandle> miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map<String, StreamStateHandle> sstFiles,
+   Map<String, StreamStateHandle> miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map<String, StreamStateHandle> getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map<String, StreamStateHandle> getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc fil

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114500597
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map<String, StreamStateHandle> baseSstFiles;
+
+   private List<KeyedBackendSerializationProxy.StateMetaInfo> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerialization

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

2017-05-02 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3801
  
Hi @gyfora I am very happy to hear from you. The following are the answers 
to your questions. Kindly let me know if you have any idea of them.

1. The incremental checkpoints supports rescaling. It's true that the 
implementation checkpoints files directly for multiple key groups together. But 
in the cases where the degree of parallelism changes, the files will be passed 
to all the state backends whose key groups are in the files. Then the backends 
will iterate over all the key-value pairs in the files and pick up those kv 
pairs that belong to them.

2.  In the cases we restore from a full snapshot (which is formatted as 
key-value pairs), the next incremental checkpoint will contain all the files. 
It may seem a little bit inefficient because i intend to make each checkpoint 
self-contained. Given that full snapshots and incremental snapshots are in 
different formats, we have to take a "full" incremental snapshot as the base 
for following checkpoints.

3. That is a very good question. It will be flexible that users can choose 
the scheme of checkpoints (say one full checkpoint after n incremental 
checkpoints).  But i think making every checkpoint incremental is acceptable 
because incremental checkpoints are more  efficient in most cases. Those 
backends which do not support incremental checkpointing can still take full 
snapshotting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-04-29 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3801

[FLINK-6364] [checkpoints] Implement incremental checkpointing in 
RocksDBKeyedStateBackend

This is the initial implementation of incremental checkpointing in 
RocksDBKeyedStateBackend. Changes include 
1. Add a new `CheckpointType` for incremental checkpoints.
2. Call the `restore()` method for all `KeyedStateBackend` if the restore 
state handle is null or empty.
3. Implement `RocksDBIncrementalSnapshotOperation` and 
`RocksDBIncrementalRestoreOperation` which supports incremental snapshotting 
and restoring with/without parallelism changes, respectively.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3801


commit c5340a2186ecfbc48c46048db5adb11af83db511
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-04-29T15:44:36Z

Implement incremental checkpointing in RocksDBKeyedStateBackend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-04-23 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3524


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2017-04-11 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
@vpernin Thanks very much for your attention. The PR is supposed to work on 
1.3-SNAPSHOT, but it's not testable now due to some known bugs. 

Besides, i want to add support for asynchronous snapshots of timers in this 
pull request. Currently, the snapshots for timers are taken synchronously --- 
no stream record can be processed before the snapshots are taken. In our tests 
where there are millions of timers, it takes approximately several seconds to 
complete the snapshotting. The performance, hence, is significantly degraded 
when the checkpoint frequency is large.

To allow asynchronous snapshotting, we need some refactoring on how 
internal timer services are restored and snapshotted. Now 
`InternalTimerService` s, similar to keyed states, are stored in 
`KeyedStateBackend`. That way, we can benefit from the optimizations made on 
the snapshotting of keyed states, taking snapshots asynchronously (and 
incrementally in the near future).

I am working on this work right now.  It's appreciated that you could help 
test the feature when it is done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-04-07 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen I have updated the PR, making the following changes:
1. Add a method called `discardSharedStatesOnFail()` in 
`CompositeStateHandle`.  This method is called when the pending checkpoint 
fails to complete. That way, we do not need to register shared states once an 
acknowledge message is received.  All shared states are registered only when 
the pending checkpoint completes.
2. Add `SharedStateHandle` and refactor `SharedStateRegistry` as suggested. 
3. Both registration and unregistration of shared states now are taken 
place in `CompletedCheckpoint`.

What do you think of these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...

2017-03-30 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3652#discussion_r108870133
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 ---
@@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception 
{
 
final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
 
-   final String key = "key";
-   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+   try {
+   final String key = "key";
+   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-   final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+   final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-   final int num = 5;
+   final int num = 5;
 
-   // - insert -
-   System.out.println("begin insert");
+   // - insert -
+   System.out.println("begin insert");
 
-   final long beginInsert = System.nanoTime();
-   for (int i = 0; i < num; i++) {
-   rocksDB.merge(write_options, keyBytes, valueBytes);
-   }
-   final long endInsert = System.nanoTime();
-   System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
+   final long beginInsert = System.nanoTime();
+   for (int i = 0; i < num; i++) {
+   rocksDB.merge(write_options, keyBytes, 
valueBytes);
+   }
+   final long endInsert = System.nanoTime();
+   System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
 
-   // - read (attempt 1) -
+   // - read (attempt 1) -
 
-   final byte[] resultHolder = new byte[num * (valueBytes.length + 
2)];
-   final long beginGet1 = System.nanoTime();
-   rocksDB.get(keyBytes, resultHolder);
-   final long endGet1 = System.nanoTime();
+   final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
+   final long beginGet1 = System.nanoTime();
+   rocksDB.get(keyBytes, resultHolder);
+   final long endGet1 = System.nanoTime();
 
-   System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
+   System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
 
-   // - read (attempt 2) -
+   // - read (attempt 2) -
 
-   final long beginGet2 = System.nanoTime();
-   rocksDB.get(keyBytes, resultHolder);
-   final long endGet2 = System.nanoTime();
+   final long beginGet2 = System.nanoTime();
+   rocksDB.get(keyBytes, resultHolder);
+   final long endGet2 = System.nanoTime();
 
-   System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
+   System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
 
-   // - compact -
-   System.out.println("compacting...");
-   final long beginCompact = System.nanoTime();
-   rocksDB.compactRange();
-   final long endCompact = System.nanoTime();
+   // - compact -
+   System.out.println("compacting...");
+   final long beginCompact = System.nanoTime();
+   rocksDB.compactRange();
+   final long endCompact = System.nanoTime();
 
-   System.out.println("end compaction - duration: " + ((endCompact 
- beginCompact) / 1_000_000) + " ms");
+   System.out.println("end compaction - duration: " + 
((endCompact - beginCompact) / 1_000_000) + " ms");
 
-   // - read (attempt 3) -
+   // 

[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...

2017-03-30 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3652#discussion_r108869236
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
 ---
@@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception 
{
 
final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
 
-   final String key = "key";
-   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+   try {
+   final String key = "key";
+   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-   final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+   final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-   final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+   final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
 
-   final Unsafe unsafe = MemoryUtils.UNSAFE;
-   final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;
+   final Unsafe unsafe = MemoryUtils.UNSAFE;
+   final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
 
-   final int num = 5;
-   System.out.println("begin insert");
+   final int num = 5;
+   System.out.println("begin insert");
 
-   final long beginInsert = System.nanoTime();
-   for (int i = 0; i < num; i++) {
-   unsafe.putInt(keyTemplate, offset, i);
-   rocksDB.put(write_options, keyTemplate, valueBytes);
-   }
-   final long endInsert = System.nanoTime();
-   System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
-
-   final byte[] resultHolder = new byte[num * valueBytes.length];
-
-   final long beginGet = System.nanoTime();
-
-   final RocksIterator iterator = rocksDB.newIterator();
-   int pos = 0;
-
-   // seek to start
-   unsafe.putInt(keyTemplate, offset, 0);
-   iterator.seek(keyTemplate);
-
-   // mark end
-   unsafe.putInt(keyTemplate, offset, -1);
-
-   // iterate
-   while (iterator.isValid()) {
-   byte[] currKey = iterator.key();
-   if (samePrefix(keyBytes, currKey)) {
-   byte[] currValue = iterator.value();
-   System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
-   pos += currValue.length;
-   iterator.next();
+   final long beginInsert = System.nanoTime();
+   for (int i = 0; i < num; i++) {
+   unsafe.putInt(keyTemplate, offset, i);
+   rocksDB.put(write_options, keyTemplate, 
valueBytes);
}
-   else {
-   break;
+   final long endInsert = System.nanoTime();
+   System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
+
+   final byte[] resultHolder = new byte[num * 
valueBytes.length];
+
+   final long beginGet = System.nanoTime();
+
+   final RocksIterator iterator = rocksDB.newIterator();
--- End diff --

The iterator should be closed once it's not used. So it's better to use 
try-with-resources here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...

2017-03-30 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3652#discussion_r108868849
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 ---
@@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception 
{
 
final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
 
-   final String key = "key";
-   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+   try {
+   final String key = "key";
+   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-   final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+   final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-   final int num = 5;
+   final int num = 5;
 
-   // - insert -
-   System.out.println("begin insert");
+   // - insert -
+   System.out.println("begin insert");
 
-   final long beginInsert = System.nanoTime();
-   for (int i = 0; i < num; i++) {
-   rocksDB.merge(write_options, keyBytes, valueBytes);
-   }
-   final long endInsert = System.nanoTime();
-   System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
+   final long beginInsert = System.nanoTime();
+   for (int i = 0; i < num; i++) {
+   rocksDB.merge(write_options, keyBytes, 
valueBytes);
+   }
+   final long endInsert = System.nanoTime();
+   System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
 
-   // - read (attempt 1) -
+   // - read (attempt 1) -
 
-   final byte[] resultHolder = new byte[num * (valueBytes.length + 
2)];
-   final long beginGet1 = System.nanoTime();
-   rocksDB.get(keyBytes, resultHolder);
-   final long endGet1 = System.nanoTime();
+   final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
+   final long beginGet1 = System.nanoTime();
+   rocksDB.get(keyBytes, resultHolder);
+   final long endGet1 = System.nanoTime();
 
-   System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
+   System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
 
-   // - read (attempt 2) -
+   // - read (attempt 2) -
 
-   final long beginGet2 = System.nanoTime();
-   rocksDB.get(keyBytes, resultHolder);
-   final long endGet2 = System.nanoTime();
+   final long beginGet2 = System.nanoTime();
+   rocksDB.get(keyBytes, resultHolder);
+   final long endGet2 = System.nanoTime();
 
-   System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
+   System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
 
-   // - compact -
-   System.out.println("compacting...");
-   final long beginCompact = System.nanoTime();
-   rocksDB.compactRange();
-   final long endCompact = System.nanoTime();
+   // - compact -
+   System.out.println("compacting...");
+   final long beginCompact = System.nanoTime();
+   rocksDB.compactRange();
+   final long endCompact = System.nanoTime();
 
-   System.out.println("end compaction - duration: " + ((endCompact 
- beginCompact) / 1_000_000) + " ms");
+   System.out.println("end compaction - duration: " + 
((endCompact - beginCompact) / 1_000_000) + " ms");
 
-   // - read (attempt 3) -
+   // 

[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...

2017-03-30 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3652#discussion_r108869459
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
 ---
@@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception 
{
 
final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
 
-   final String key = "key";
-   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+   try {
+   final String key = "key";
+   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-   final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+   final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+   final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-   final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+   final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
 
-   final Unsafe unsafe = MemoryUtils.UNSAFE;
-   final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;
+   final Unsafe unsafe = MemoryUtils.UNSAFE;
+   final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
 
-   final int num = 5;
-   System.out.println("begin insert");
+   final int num = 5;
+   System.out.println("begin insert");
 
-   final long beginInsert = System.nanoTime();
-   for (int i = 0; i < num; i++) {
-   unsafe.putInt(keyTemplate, offset, i);
-   rocksDB.put(write_options, keyTemplate, valueBytes);
-   }
-   final long endInsert = System.nanoTime();
-   System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
-
-   final byte[] resultHolder = new byte[num * valueBytes.length];
-
-   final long beginGet = System.nanoTime();
-
-   final RocksIterator iterator = rocksDB.newIterator();
-   int pos = 0;
-
-   // seek to start
-   unsafe.putInt(keyTemplate, offset, 0);
-   iterator.seek(keyTemplate);
-
-   // mark end
-   unsafe.putInt(keyTemplate, offset, -1);
-
-   // iterate
-   while (iterator.isValid()) {
-   byte[] currKey = iterator.key();
-   if (samePrefix(keyBytes, currKey)) {
-   byte[] currValue = iterator.value();
-   System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
-   pos += currValue.length;
-   iterator.next();
+   final long beginInsert = System.nanoTime();
+   for (int i = 0; i < num; i++) {
+   unsafe.putInt(keyTemplate, offset, i);
+   rocksDB.put(write_options, keyTemplate, 
valueBytes);
}
-   else {
-   break;
+   final long endInsert = System.nanoTime();
+   System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
+
+   final byte[] resultHolder = new byte[num * 
valueBytes.length];
+
+   final long beginGet = System.nanoTime();
+
+   final RocksIterator iterator = rocksDB.newIterator();
+   int pos = 0;
+
+   // seek to start
+   unsafe.putInt(keyTemplate, offset, 0);
+   iterator.seek(keyTemplate);
+
+   // mark end
+   unsafe.putInt(keyTemplate, offset, -1);
+
+   // iterate
+   while (iterator.isValid()) {
+   byte[] currKey = iterator.key();
+   if (samePrefix(keyBytes, currKey)) {
+   byte[] currValue = iterator.value();
+   System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
+   

[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...

2017-03-30 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3652#discussion_r108868469
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 ---
@@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception 
{
 
final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
 
-   final String key = "key";
-   final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+   try {
--- End diff --

I think it's better to use try-with-resources here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-29 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen  Thanks very much for your valuable comments. The following 
are some of my thoughts.

* Now the registration of shared states is put in `CheckpointCoordinator` 
because it's needed whenever a `PendingCheckpoint` receives a state handle or a 
`CompletedCheckpoint` is recovered. But I think it does make sense to put both 
the registration and unregistration of shared states in the same place. I will 
update the PR so that the logics are put in `PendingCheckpoint`s and 
`CompletedCheckpoint`s.

* When a `SubtaskState` is not successfully added to the 
`PendingCheckpoint`, the state objects in the `SubtaskState` should be 
correctly deleted. The discarding of these `SubtaskState`s varies in different 
cases. In the case where the `PendingCheckpoint` fails, the `SubtaskState` 
should delete both its private states and shared states. But in the case where 
the `CompletedCheckpoint` is subsumed, the `SubtaskState` should delete those 
unreferenced shared states (possibly created by others) instead of its shared 
states.   

  By registering the shared states first, we can unify the implementation 
in the two cases. Those shared states in the failed `PendingCheckpoint` are 
always not referenced by other checkpoints. So they can be correctly discarded 
by the registry when the `PendingCheckpoint` unregisters its shared states, 
just like a subsumed `CompletedCheckpoint` does.

  Another choice is refactoring the interface of `CompositeStateHandle`. 
Three methods, namely `onComplete()`, `onFail()` and `onSubsume()`, will be 
provided. A`CompositeStateHandle` can implement these methods to correctly deal 
with its states under these cases.  What do you think?

* It's a good idea to introduce `SharedStateHandle` for shared states. It 
can improve the performance and allow safety checks. I will add it in the 
update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108826346
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
+
+   private static final long serialVersionUID = -8357254413007773970L;
+
+   /** All registered state objects */
+   private final Map<String, Tuple2<StateObject, Integer>> 
registeredStates = new HashMap<>();
+
+   /** All state objects that are not referenced any more */
+   private transient final List discardedStates = new 
ArrayList<>();
+
+   /**
+* Register the state in the registry
+*
+* @param key The key of the state to register
+* @param state The state to register
+*/
+   public void register(String key, StateObject state) {
+   Tuple2<StateObject, Integer> stateAndRefCnt = 
registeredStates.get(key);
--- End diff --

This method is always called by `registerAll()`, so I omitted the 
synchronization here.  I will add the synchronization missed here so that the 
method can correctly work when the assumption is not satisfied.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-29 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3524#discussion_r108826017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code SharedStateRegistry} will be deployed in the 
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * maintain the reference count of those state objects shared among 
different
+ * checkpoints. Each shared state object must be identified by a unique 
key. 
+ */
+public class SharedStateRegistry implements Serializable {
--- End diff --

I made it serializable due to the serialization issues in powermock. In 
some tests for `CompletedCheckpointStore`, I mock some `SubtaskState`s and 
attempt to verify that they can correctly register their shared states when the 
`CompletedCheckpointStore` is recovered. To allow the storage of the tests 
work, all classes related to the mocked objects should be serializable. 

With your changes made in the storage of `CompletedCheckpoint`, I think we 
can make the tests work without requiring `SharedStateRegistry` to be 
serializable. But at prior to that, i prefer to keep `SharedStateRegistry` 
serializable so that the tests can correctly work now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3558: [FLINK-6096][checkpoint] Refactor the migration of...

2017-03-29 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3558#discussion_r108615377
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/migration/v0/SavepointV0.java ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.v0;
+
+import org.apache.flink.migration.v0.runtime.TaskStateV0;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Savepoint version 0.
+ *
+ * This format was introduced with Flink 1.1.0.
+ * 
+ * checkpointId: long
+ * numTaskStates: int
+ * |jobVertexID: long[2]
+ * |parallelism: int
+ * |numSubtaskStates: int
+ * ||subtaskIndex: int
+ * ||serializedValueLength: int
+ * ||serializedValue: byte[] (null if serializedValueLength is 
-1)
+ * ||subtaskStateSize: long
+ * ||subtaskStateDuration: long
+ * |numKeyGroupStates: int
+ * ||subtaskIndex: int
+ * ||serializedValueLength: int
+ * ||serializedValue: byte[] (null if serializedValueLength is 
-1)
+ * ||keyGroupStateSize: long
+ * ||keyGroupStateDuration: long
+ * 
+ */
+@Deprecated
+@SuppressWarnings("deprecation")
+public class SavepointV0 implements Savepoint {
+
+   /** The classes that are migrated in SavepointV0 */
+   public static final Map<String, String> MigrationMapping = new 
HashMap<String, String>() {{
+
+   /* migrated state descriptors */
+   put("org.apache.flink.api.common.state.StateDescriptor",
--- End diff --

@StefanRRichter +1 for option b)

I planned to use a tool to automatically create the mapping, but i found it 
very difficult to obtain the changes in the savepoints. It seems we must find 
all migrated classes manually. 

I agree with you that the maintenance of the mapping is error-prone. The 
usage of a naming scheme convention will help us to avoid typo in new class 
names. But we still have to carefully find all classes that are changed in the 
savepoints. I think it's necessary for us. We must keep in mind what are 
changed in the savepoint format. 

In my opinion, the number of migrated classes will be very small in the 
future. Most changes should happen in the serialization formats of actual 
states (e.g., the snapshots of keyed states and timer services) instead of the 
state handles. Therefore, there will be few efforts needed to maintain the 
mapping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-28 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-28 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks for your work. I will close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-28 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter I updated the PR as suggested. Very appreciated for your 
hard work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-24 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen I have updated the PR as suggested. Changes include
1. Make `StateRegistry` to be `SharedStateRegistry` where only shared 
states are registered. Now the `discardState()` method is supposed to delete 
those private states in the checkpoint.
2. `SharedStateRegistry` now is deployed by the `CheckpointCoordinator`. 
The state handle will register its shared states once it is received by the 
coordinator. In another words, all shared states in a completed checkpoint are 
registered when the checkpoint is added into the `CompletedCheckpointStore`.  
All checkpoints (including savepoints) will unregister shared states when they 
are removed from the store. Savepoints should not contain any shared state. 
Therefore the unregistration will not discard any state in the savepoints.
3. When recovering from failures or restarting from a savepoint, the 
`CheckpointCoordinator` will rebuild the registry with the checkpoints 
recovered in the `CompletedCheckpointStore`.
4. Related tests are added to ensure correctness.
5. The conflicts with the master branch are resolved.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-23 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107704681
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 ---
@@ -91,10 +98,10 @@ public KeyGroupsStateHandle 
getKeyGroupIntersection(KeyGroupRange keyGroupRange)
 
/**
 *
-* @return the internal key-group range to offsets metadata
+* @return the start key group in the key-group range of this handle
 */
-   public KeyGroupRangeOffsets getGroupRangeOffsets() {
-   return groupRangeOffsets;
+   public int getStartKeyGroup() {
--- End diff --

I have removed all pass-through methods except `getGroupRangeOffsets()` 
because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to 
get the iterator for key groups and their offsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-23 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks a lot for your comments. I have updated the pull 
request as suggested, making the type of raw keyed states to be 
`KeyedStateHandle` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-23 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107646547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
--- End diff --

+1. Will update the PR as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-23 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107646429
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
+   Collection 
keyedStateHandles,
+   KeyGroupRange subtaskKeyGroupRange) {
+
+   List subtaskKeyedStateHandles = new 
ArrayList<>();
+
+   for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+   KeyGroupRange intersection = 
keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
--- End diff --

The idea is great! It does make sense to allow a `KeyedStateHandle` to 
create a new `KeyedStateHandle` to the states of the sub range. I will update 
the PR as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
Hi @StephanEwen The main reason is that we must have methods to delete 
those unshared objects in failed `PendingCheckpoint`s.  The `discardState()` 
method is called when either the `PendingCheckpoint` fails or the 
`CompletedCheckpoint` is subsumed. Under current settings, the `discardState()` 
is supposed to delete only those unshared objects, and the shared objects are 
deleted by the `StateRegistry`.  Hence, we must register those state handles 
once they are received  so that their shared objects can be correctly deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-20 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3521
  
@StephanEwen I added two tests to ensure that the checkpoints are not in 
the store when exceptions are thrown. The methods to mock the exceptions of 
subsuming may be a little tricky. Do you have any suggestion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-19 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen Thanks a lot for your valuable comments. I will update the PR 
as suggested.
* I think it's a good idea that we make the `StateRegistry` into 
`SharedStateRegistry`.  That will need (1) the state handle must not discard 
registered objects in the `discardState` method and (2) the state handle has to 
register shared object once it is received by the coordinator (now the state 
handle does not register their objects before its checkpoint completes).
* Now that state handles have to register their objects once they are 
received by the coordinator, we should move `SharedStateRegister` from 
`CompletedCheckpointStore` to `CheckpointCoordinator`.
* It's better for `StateRegistry` directly discard an object once its 
reference count is 0. I used a list to collect discarded objects because I want 
to make the changes in the discarding of completed checkpoint as few as 
possible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3558: [FLINK-6096][checkpoint] Refactor the migration of...

2017-03-17 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3558

[FLINK-6096][checkpoint] Refactor the migration of old-versioned savepoints

1. The migrated classes in `SavepointV0` are moved to the package 
`org.apache.flink.migration.v0`. In the future, each deprecated savepoint 
version will have its own migration package.
2. A mapping is deployed in `SavepointV0` to record all deprecated classes. 
`MigrationInstantiationUtil` now will use the mapping to correctly deserialized 
those deprecated classes.
3. Unused methods in migrated classes are removed.
4. The formats in old snapshots are added in the comments to help 
understand the restoring.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6096

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3558


commit b5f9d73703372c0d71da2c84cdd11dce16641b28
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-17T08:56:00Z

Refactor the migration of old-versioned savepoints




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...

2017-03-15 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3380
  
I prefer to throw more detailed exceptions e.g. 
`IncompatibleTypeSerializerException`, `StateAccessException` and 
`StateNotFoundException`. They all are extended from `FlinkRuntimeException`.  
Users can get more information from these exceptions if they catch the 
exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-14 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3531

[FLINK-6034][checkpoint] Add KeyedStateHandle for the snapshots in keyed 
streams

## Changes
- Add `KeyedStateHandle` for the snapshots in keyed streams. 
`KeyGroupsStateHandle` now is one of its implementation.
- Distribute `KeyedStateHandle`s to subtasks with their key group range. A 
`KeyedStateHandle` will be assigned to all  subtasks whose key group range 
overlap with its range.
 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6034

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3531


commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-14T11:04:37Z

Add KeyedStateHandle for the snapshots in keyed streams




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...

2017-03-13 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3524

[FLINK-6014][checkpoint] Allow the registration of state objects in 
checkpoints

- Introduce `CompositeStateHandle` which is composed of a collection of 
`StateObject`s and can register these `StateObject`s in `StateRegistry`.
- Use `StateRegistry` to maintain the reference count of the `StateObject`s 
in completed checkpoints. A `StateObject` is deleted only if its reference 
count is zero.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6014

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3524


commit a1796f2a0538c02a2154459fef8f931e14e18fca
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-13T11:23:47Z

Allow registration of state objects in checkpoints

commit de272a1f78356b940a3b9421dad70937596c3a94
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-13T11:29:21Z

remove unnecessary synchronization in StateRegistry




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3521: [FLINK-6027][checkpoint] Ignore the exception thro...

2017-03-13 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3521

[FLINK-6027][checkpoint] Ignore the exception thrown by the subsuming of 
completed checkppoints

The exception thrown during the subsuming of old checkpoints now will be 
ignored. Now, `CompletedCheckpointStore#addCheckpoint` will throw exceptions 
only when the completed checkpoint is not written in the store. In such cases, 
the coordinator is safe to delete the states in the checkpoint because we are 
impossible to recover from the checkpoint.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6027

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3521


commit e89e947e74693ef1d5fdcfaebdc1818b138f2fd1
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-13T09:03:42Z

Ignore the exception thrown by the subsuming of completed checkppoints

commit 9ba89c42ed4751c68cf9520032e40dc6e857212c
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-13T10:04:30Z

Change the log level to WARNING




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3462: [FLINK-5917][state] Remove size() method from MapS...

2017-03-02 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3462

[FLINK-5917][state] Remove size() method from MapState

The `size()` method is removed from `MapState` because its implementation 
is costly in the backends.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5917

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3462


commit 6906b15ff593f46e106348aa1f5772e6b78efe74
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-03-03T02:27:11Z

Remove size() method from MapState




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103612613
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -428,6 +450,9 @@ CheckpointTriggerResult triggerCheckpoint(
catch (Throwable t) {
int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+   if(numUnsuccessful > 
maxUnsuccessfulCheckpoints) {
--- End diff --

You are right. I missed it. Sorry for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103605788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -428,6 +450,9 @@ CheckpointTriggerResult triggerCheckpoint(
catch (Throwable t) {
int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+   if(numUnsuccessful > 
maxUnsuccessfulCheckpoints) {
--- End diff --

Here the counter records the total number of failed attempts. Since a 
streaming job is intended to run a quite long time, the number of failed 
attempts will eventually exceed the limit. We should use a different counter 
here which is reset once a pending checkpoint successfully completes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103605271
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) {
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new 
Exception("Failed to trigger checkpoint"));
}
+   if(numUnsuccessful > 
maxUnsuccessfulCheckpoints) {
+   return failExecution(executions);
+   }
return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
 
} // end trigger lock
}
 
+   private CheckpointTriggerResult failExecution(Execution[] executions) {
+   if (currentPeriodicTrigger != null) {
+   currentPeriodicTrigger.cancel();
+   currentPeriodicTrigger = null;
+   }
+   for (Execution execution : executions) {
+   // fail the graph
+   execution.fail(new Throwable("The number of max 
unsuccessful checkpoints attempts exhausted"));
--- End diff --

I think it's not good here to fail the executions one by one. We should 
call `ExecutionGraph#fail` to fail the execution graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103604470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -121,6 +121,8 @@
 
/** The maximum number of checkpoints that may be in progress at the 
same time */
private final int maxConcurrentCheckpointAttempts;
+   /** The maximum number of unsuccessful checkpoints */
+   private final int maxUnsuccessfulCheckpoints;
--- End diff --

I think `failed` is a better word than `unsuccessful`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-23 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3336


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...

2017-02-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3380
  
I think we may borrow some ideas from Java. For example, the methods in 
`Map` do not throw any exception in their signatures. But the interfaces define 
a set of specific `RuntimeException` that can be thrown by the implementation 
such as `ClassCastException`. Maybe we can do similarly.

What do you think? @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3387: [FLINK-5863][queryable state] Add the serializatio...

2017-02-22 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3387


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3387: [FLINK-5863][queryable state] Add the serialization of li...

2017-02-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3387
  
The PR is just some cleaning of the code. Now that we are planning to 
refactor the implementation, I agree to close the PR and I am very willing to 
contribute to the FLIP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...

2017-02-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3380
  
I like the idea of find some more "specific" exceptions. Flink can define 
some specific Exceptions like `StateAccessException`. That may help better 
understand the code.

I am also thinking that it's better not to put any exception in the 
signature of user-facing interfaces like `State`. All exceptions thrown by 
these methods are `RuntimeException` which are caught and handled by Flink. 
It's because these methods provided by Flink are supposed to work properly and 
do not throw any exception. Actually, users can do few things even if they can 
catch the exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3336
  
@aljoscha Thanks a lot for your hard work. I have fixed the typos in the 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3387: [FLINK-5863][queryable state] Add the serializatio...

2017-02-22 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3387

[FLINK-5863][queryable state] Add the serialization of list states in 
KvStateRequestSerializer

1. Add `serializeList()` in `KvStateRequestSerialization`
2. Modify the unit tests of `KvStateRequestSerialization`, without the 
access to protected methods.
3. Move `KvStateRequestSerializationRocksDBTest` from package 
`org.apache.flink.test.query` to `org.apache.flink.contrib.streaming.state`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5863

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3387


commit 5fc51b5f233ec62e143c1938b11c677b2260575b
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-22T03:56:24Z

Add the serialization of list states in KvStateRequestSerializer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3380: [FLINK-5865][state] Throw original exception in th...

2017-02-21 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3380

[FLINK-5865][state] Throw original exception in the states

The wrapping of `RuntimeException` is removed so that we can avoid 
redundant stack printed in the log.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5865

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3380


commit 0928c44063d5c893b24c24f21050ee643159fb36
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-21T16:20:25Z

Throw original exception in RocksDB states




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-21 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3336
  
@StefanRRichter  Very thanks for your work.  I have rebased the pull 
request and resolved the conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-21 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102225285
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -382,11 +342,26 @@ private UV deserializeUserValue(byte[] rawValueBytes) 
{
this.rawValueBytes = rawValueBytes;
this.deleted = false;
}
-   
+
+   public void remove() {
+   deleted = true;
+   rawValueBytes = null;
+
+   try {
+   db.remove(columnFamily, writeOptions, 
rawKeyBytes);
+   } catch (RocksDBException e) {
+   throw new RuntimeException("Error while 
removing data from RocksDB.", e);
--- End diff --

I modify the method signature because I find, except `ValueState`, the 
methods in other states all throw `Exception`. I think it's okay because 
`MapState` is a common interface which has no idea of the implementation.  The 
implementation  of these methods, however, should throw some specific exception 
like `IOException` or `RocksDBException`. 

I think it's reasonable. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-21 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3336
  
I have added the documentation for `MapState`. You may take a look to see 
if it's properly written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-21 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3336
  
@StefanRRichter I have updated the pull request as suggested. Now the map 
serializer supports the serialization of null values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-21 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102153318
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor<UK, UV> stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102138099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 ---
@@ -484,6 +487,71 @@ public static Throwable 
deserializeServerFailure(ByteBuf buf) throws IOException
return null;
}
}
+   
+   /**
+* Serializes all values of the Iterable with the given serializer.
+*
+* @param entries Key-value pairs to serialize
+* @param keySerializer   Serializer for UK
+* @param valueSerializer Serializer for UV
+* @param Type of the keys
+* @param Type of the values
+* @return Serialized values or null if values 
null or empty
+* @throws IOException On failure during serialization
+*/
+   public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) 
throws IOException {
+   if (entries != null) {
+   Iterator<Map.Entry<UK, UV>> it = entries.iterator();
+
+   if (it.hasNext()) {
+   // Serialize
+   DataOutputSerializer dos = new 
DataOutputSerializer(32);
+
+   while (it.hasNext()) {
+   Map.Entry<UK, UV> entry = it.next();
+
+   keySerializer.serialize(entry.getKey(), 
dos);
+   
valueSerializer.serialize(entry.getValue(), dos);
+   }
+
+   return dos.getCopyOfBuffer();
+   } else {
+   return null;
--- End diff --

The function is unused now. I will delete it in the update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102135289
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 ---
@@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws 
Exception {
KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
}
+   
+   /**
+* Tests map serialization utils.
+*/
+   @Test
+   public void testMapSerialization() throws Exception {
+   final long key = 0L;
+
+   // objects for heap state list serialisation
+   final HeapKeyedStateBackend longHeapKeyedStateBackend =
+   new HeapKeyedStateBackend<>(
+   mock(TaskKvStateRegistry.class),
+   LongSerializer.INSTANCE,
+   ClassLoader.getSystemClassLoader(),
+   1, new KeyGroupRange(0, 0)
+   );
+   longHeapKeyedStateBackend.setCurrentKey(key);
+
+   final InternalMapState<VoidNamespace, Long, String> mapState = 
longHeapKeyedStateBackend.createMapState(
+   VoidNamespaceSerializer.INSTANCE,
+   new MapStateDescriptor<>("test", 
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+   testMapSerialization(key, mapState);
+   }
+
+   /**
+* Verifies that the serialization of a map using the given map state
+* matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
+*
+* @param key
+*  key of the map state
+* @param mapState
+*  map state using the {@link VoidNamespace}, must also be 
a {@link InternalKvState} instance
+*
+* @throws Exception
+*/
+   public static void testMapSerialization(
+   final long key,
+   final InternalMapState<VoidNamespace, Long, String> 
mapState) throws Exception {
+
+   TypeSerializer userKeySerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer userValueSerializer = 
StringSerializer.INSTANCE;
+   mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   // List
+   final int numElements = 10;
+
+   final Map<Long, String> expectedValues = new HashMap<>();
+   for (int i = 0; i < numElements; i++) {
+   final long value = 
ThreadLocalRandom.current().nextLong();
--- End diff --

I prefer to use `ThreadLocalRandom.current()` which is also used in other 
tests in this file. Though it makes difficult to reproduce the case, it may 
help to find corner cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102129362
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, 
RocksDBException {
}
 
@Override
-   protected <N, T> InternalValueState<N, T> createValueState(
+   public <N, T> InternalValueState<N, T> createValueState(
--- End diff --

It is mainly due to the unit tests in `KvStateRequestSerializerTest` which 
need the accessors to `InternalKvState`.  A better choice to use 
`getPartitionState()` to obtain a user-facing state and convert it to an 
internal state. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102128355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 ---
@@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend 
keyedStateBackend, ExecutionC
}
}
 
+   @Override
+   public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
+   requireNonNull(stateProperties, "The state properties must not 
be null");
+   try {
+   
stateProperties.initializeSerializerUnlessSet(executionConfig);
+   MapState<UK, UV> originalState = 
getPartitionedState(stateProperties);
+   return new UserFacingMapState<>(originalState);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
--- End diff --

Currently, `KeyedStateStore#getState()` does not throw exception in its 
declaration. `RuntimeException` is the only exception that can be thrown. Since 
the modification to the interface will affect user code (users will have to 
deal with thrown exceptions), I am not sure it's okay to modify the function 
declaration in `KeyedStateStore`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127867
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor<UK, UV> stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127767
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor<UK, UV> stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102126863
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor<UK, UV> stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125445
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor<UK, UV> stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125062
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+   extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+   implements InternalMapState<N, UK, UV> {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
--- End diff --

To be honest, i have no idea why we can't put `writeOptions` in base class. 
We put it in `AbstractRocksDBState` and  do not come across any problem in our 
production environment. 

Maybe @aljoscha is more familiar with the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r101987352
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link State} interface for partitioned key-value state. The key-value 
pair can be
+ * added, updated and retrieved.
+ *
+ * The state is accessed and modified by user functions, and 
checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ *
+ * The state is only accessible by functions applied on a 
KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the 
value mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ *
+ * @param  Type of the keys in the state.
+ * @param  Type of the values in the state.
+ */
+@PublicEvolving
+public interface MapState<UK, UV> extends AppendingState<Map<UK, UV>, 
Iterable<Map.Entry<UK, UV>>> {
--- End diff --

I agree that it's `MultiMapState`, instead of `MapState`, that is supposed 
to be an `AppendingState`.  

I will update the interface hierarchy, making `MapState` not an 
`AppendingState`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-02-20 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3359

[FLINK-5544][streaming] Add InternalTimerService implemented in RocksDB

- Refactor the methods defined in `InternalTimerService`. Some common 
implementation in `HeapInternalTimerService` now is moved in 
`InternalTimerService`.
- Implement `RocksDBInternalTimerService` which stores timers in RocksDB 
and sorts them with an in-momory heap.
- Implement `InternalTimerServiceTestBase` to verify the implementation of 
`InternalTimerService`.
- Update `AbstractStreamOperator` to allow the usage of customized 
`InternalTimerService`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5544

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3359


commit 341fd97c47336d4f87cea997e134af68f8ef5265
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-20T09:55:40Z

Add InternalTimerService implemented in RocksDB




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-19 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r101936792
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link State} interface for partitioned key-value state. The key-value 
pair can be
+ * added, updated and retrieved.
+ *
+ * The state is accessed and modified by user functions, and 
checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ *
+ * The state is only accessible by functions applied on a 
KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the 
value mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ *
+ * @param  Type of the keys in the state.
+ * @param  Type of the values in the state.
+ */
+@PublicEvolving
+public interface MapState<UK, UV> extends AppendingState<Map<UK, UV>, 
Iterable<Map.Entry<UK, UV>>> {
--- End diff --

`MapState` provides the `add` method which puts a collection of key-value 
pairs into the state. Though the semantics may be a little different in 
existing `AppendingState`s, I think it's okay for `MapState` to be an 
`AppendingState` because the interface does not enforce any restriction on the 
modification of previous data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-16 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3336

[FLINK-4856][state] Add MapState in KeyedState

1. Add `MapState` and `MapStateDescriptor`
2. Implementation of `MapState` in `HeapKeyedStateBackend` and 
`RocksDBKeyedStateBackend`.
3. Add accessors to `MapState` in `RuntimeContext`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-4856

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3336.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3336


commit 430b4f596acbff0a9dfdc20fbb2430a8fad819f9
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-17T03:19:18Z

Add MapState in KeyedState




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...

2017-02-15 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3305
  
@tillrohrmann Both `org.apache.flink.api.common.typeutils.base` and 
`"org.apache.flink.api.common.typeinfo` are in the module `flink-core`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...

2017-02-14 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3305
  
@StephanEwen @tillrohrmann  I found a problem that is the packages 
`ListTypeInfo` and `ListTypeSerializer` locate. Now `ListTypeInfo` is put in 
package "org.apache.flink.api.java.typeutils" and `ListSerializer` is put in 
package "org.apache.flink.api.common.typeutils.base". But i think it's better 
to put "ListTypeInfo" into package "org.apache.flink.api.common.typeinfo". What 
do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...

2017-02-14 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3305
  
@tillrohrmann Thanks for your review. 

Sorry for the reformatted code. It seems that my IDE will automatically 
reformat all the files I've edited. I will revert the reformatted code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...

2017-02-14 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3305
  
@StephanEwen Thanks a lot for your comments. I have updated the code as 
suggested. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor...

2017-02-13 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/2768


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2017-02-13 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Close the pull request because the state descriptor now is refactored with 
the introduction of composited serializers (See 
[FLINK-5790](https://issues.apache.org/jira/browse/FLINK-5790)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...

2017-02-13 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3305#discussion_r100967108
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
 ---
@@ -47,9 +47,16 @@ public ClassLoaderObjectInputStream(InputStream in, 
ClassLoader classLoader) thr
 
// the flink package may be at position 0 (regular 
class) or position 2 (array)
final int flinkPackagePos;
-   if ((flinkPackagePos = 
className.indexOf(FLINK_BASE_PACKAGE)) == 0 ||
-   (flinkPackagePos == 2 && 
className.startsWith(ARRAY_PREFIX)))
-   {
+   if 
(className.contains("org.apache.flink.runtime.state.ArrayListSerializer")) {
--- End diff --

The code here is a little tricky. I think we should use a Map to record all 
modified classes and their corresponding backups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...

2017-02-13 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3305

[FLINK-5790][StateBackend] Use list types when ListStateDescriptor extends 
StateDescriptor

1. Now the state serializer, instead of the element serializer, is stored 
in `ListStateDescriptor`. 
2. `ArrayListTypeInfo` is introduced to help create serializers with the 
element type.
3. `ArrayListSerializer` is moved to the package 
org.apache.flink.api.common.typeutils.base to avoid cyclic dependencies.
4. Old implementation of `ListStateDescriptor` is kept in the migration 
package for back compatibility.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5790

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3305


commit e8e11b7965365178453ab6eab78c6d5ac98f3537
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-14T05:39:30Z

Use list types when ListStateDescriptor extends StateDescriptor

commit ba8cdc919fc2e66b3e81f6f8566140bef53a9b96
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2017-02-14T06:22:25Z

Support back compatibility for ListStateDescriptor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2017-02-06 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha That way, it's very confusing that a `ReadableState` is not a 
`State`.  Hence I made `State` read-only and introduced the `UpdatableState` 
interface who extends `State` with the method `clear()`. 

These changes (mainly the introduction of the `get()` method) are intended 
to remove the duplicated code. As they have little relationship with the 
implementation of map states. I think it's okay not to change these interfaces 
now.

But I prefer to rethink the state hierarchy in the near future because 
there exists too much duplicated code now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2017-02-03 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@StephanEwen  Thanks a lot for your comments. 

**Removing `clear()` from `State`**

This change is suggested by @aljoscha who wants to let broadcast states 
share the same interface (see the discussion in 
[FLINK-5023](https://issues.apache.org/jira/browse/FLINK-5023)) . As mentioned, 
the broadcast states are read-only in some cases. Hence it's suggested not to 
provide the `clear()` method in the base `State` interface.

**Changing the `State` interface**

The `State` interface is typed because I want to provide the `get()` method 
for all states so that we can retrieve the data in the state (under the current 
key for keyed states). The functionality is already provided by all states 
except `ValueState` who provides the same functionality with the `value()` 
method. Providing the method for all states can help reduce some duplicated 
code in the implementation. It also makes sense for read-only states mentioned 
above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2017-01-17 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Despite the changes in the state descriptors, the Flink jobs can restore 
from old versioned snapshots now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3053: [FLINK-5400] Add accessor to folding states in Run...

2016-12-29 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3053

[FLINK-5400] Add accessor to folding states in RuntimeContext

- Add accessors in RuntimeContext and KeyedStateStore
- Fix errors in the comments for reducing states in RuntimeContext and 
KeyedStateStore

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5400

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3053.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3053


commit f9d733a60f4809049e778045144528e8aff4a951
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2016-12-30T03:25:05Z

Add accessor to folding states in RuntimeContext




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-12-01 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
I rebased the branch to resolve the conflicts with the master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-11-28 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
I moved default value from `SimpleStateDescriptor` to 
`ValueStateDescriptor`. Now only `ValueStateDescriptor`s have default values.  
The serialization methods may contain some duplicated code, but i think it's 
acceptable.

I also modify the implementation of `HeapReducingState`s. The first value 
will be copied before being put into the heap.

@aljoscha What do you think of these changes?






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-11-24 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Oh... I added another field to make the code more clear, but I did not 
notice the serialization problem. Thanks very much for your reminder.

Your solution does work though the concept of "defaultValue" in folding 
states is a little confusing. Another solution to let `FoldingStateDescriptor` 
implement its own serialization method. And I prefer this one.

What do you think? @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-11-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2768: [FLINK-5023 & FLINK-5024] Add SimpleStateDescripto...

2016-11-07 Thread shixiaogang
GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/2768

[FLINK-5023 & FLINK-5024] Add SimpleStateDescriptor to clarify the concepts

Changes in the definition of `State` and `StateDescriptor`:
- Add `get()` in the `State` interface.
- Remove type serializers of state values from `StateDescriptor`s.
- Add `SimpleStateDescriptor` to simplify the construction of 
`ValueStateDescriptor`, `ReducingStateDescriptor` and `FoldingStateDescriptor`.
- Changes the definition of `KeyedStateBackend` and 
`AbstractKeyedStateBackend` accordingly.
- Modify the implementation of `ListStateDescriptor` accordingly.

Changes in HeapStateBackend:
- Let `AbstractHeapState` not implement the `State` interface. The 
`clear()` method now is removed from `AbstractHeapState`.
- Add `HeapSimpleState` to simplify the implementation of `HeapValueState`, 
`HeapReducingState` and `HeapFoldingState`.
- Change the implementation of `HeapValueState`, `HeapReducingState` and 
`HeapFoldingState` accordingly.

Changes in RocksDBStateBackend:
- Let `AbstractRocksDBState` not implement the `State` interface, removing 
the `clear()` method. Now, `AbstractRocksDBState` does not depend on the types 
of `State` and `StateDescriptor` any more.
- Add `RocksDBSimpleState` to simplify the implementation of 
`RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState`.
- Change the implementation of `RocksDBValueState`, `RocksDBReducingState` 
and `RocksDBFoldingState` accordingly.

Others:
- Update the usage of `State`s in the implementation of window operators.
- Update the usage of `State`s in unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5023

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2768.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2768


commit 007a93b454e693bc3662f540ac5f33e899ce9058
Author: xiaogang.sxg <xiaogang@alibaba-inc.com>
Date:   2016-11-08T02:38:22Z

Refactor the interface of State and StateDescriptor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

2016-08-22 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/2377


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

2016-08-19 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/2377#discussion_r75481109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
 ---
@@ -248,4 +374,40 @@ void 
finishResourceManagerRegistration(ResourceManagerGateway resourceManager, I
public boolean isConnected() {
return resourceManager != null;
}
+
+
+   /**
+* Cancel the current job and notify all listeners the job's 
cancellation.
+*
+* @param cause Cause for the cancelling.
+*/
+   private void cancelAndClearEverything(Throwable cause) {
+   // currently, nothing to do here
+   }
+
+   // 

+   //  Utility classes
+   // 

+   private class JobMasterLeaderContender implements LeaderContender {
--- End diff --

I used to make JobMaster implement the LeaderContender interface. But after 
checking Stephan's implementation of TaskExecutor, I modified my implementation 
:)

JobMaster is not only a contender for JM's leader, but also a listener of 
RM's leader. If we let JM directly implement the `LeaderContender` interface, 
then we should also make JM implement the `LeaderRetrieval` interface. Note 
that the two interfaces both have the method called `handleError`. The 
implementation of `handleError` will be very difficult because we have to check 
the causes of the exception: if the JM's contention for leadership fails, the 
JM should kill itself. But in the cases where the listener of RM fails, JM can 
just wait there for the RM's recovery. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >