[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3359 Because the old code is too outdated, I have updated the PR, reimplementing RocksDBInternalTimerService from scratch. Some problems mentioned in the previous comments may still exist, but I think we can start a new round of reviewing. ---
[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3359 @StefanRRichter Sorry for the delayed response. I am working on it and shall update the PR by this weekend. ---
[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3359 Very sorry for the delay. I was engaged at the work in the past months, making flink capable of the terrible data flows in Singles Day. RocksDBInternalTimerService is among the improvements done. But we adopt a very different implementation since the initial implementation presented here has several problems: * The initial implementation requires other rocksdb instances than the one used in RocksDBKeyedStateBackend, which makes the resource configuration very difficult. * The snapshotting of RocksDBInternalTimerService here is very inefficient. Though an asynchronous and incremental implementation is available, it will duplicate much code in RocksDBKeyedStateBackend. We address these problem by introducing `SecondaryKeyedState`s which provide non-keyed access methods to the data inside a key group. Similar to normal keyed state, secondary keyed states are partitioned in to key groups and are also stored in the backends. Hence these secondary states can also benefit from asynchronous and incremental snapshotting in `RocksDBKeyedStateBackend`. What do you think of the changes ? @StefanRRichter ---
[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3859 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed synchr...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3859 I noticed that FLINK-6504 is also fixed in https://github.com/apache/flink/pull/3870 , close this pr and let us address all problems of incremental checkpointing there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116172093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * maintain the reference count of {@link SharedStateHandle}s which are shared - * among different checkpoints. - * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ - private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates; + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 --- End diff -- I prefer not to use another asynchronous executor here. In my initial implementation of `SharedStateRegistry`, unreferenced shared states are not discarded immediately and are returned in a list. These unreferenced shared states then are discarded outside the synchronization scope. Given that the completed checkpoints are already discarded in an asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more used in practice), we can avoid the usage of another asynchronous executor here. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161754 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -922,6 +940,39 @@ void releaseResources(boolean canceled) { } } } + + /** +* A placeholder state handle for shared state that will replaced by an original that was +* created in a previous checkpoint. So we don't have to send the handle twice, e.g. in +* case of {@link ByteStreamStateHandle}. +*/ + private static final class PlaceholderStreamStateHandle implements StreamStateHandle { --- End diff -- I think we can move `PlaceholderStreamStateHandle` out so that it can be used by other state backends. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161318 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) { --- End diff -- Similar to the previous comment, `oldSstFileName` can be renamed to `unregisteredSstFileEntry` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161230 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- I think it's better to rename `newSstFileEntry`to `unregisteredSstFileEntry`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161117 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount > 1); + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); } + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + registeredSstFiles.putAll(unregisteredSstFiles); + unregisteredSstFiles.clear(); + registered = true; } @Override public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- We should not unregister those sst files that are not registered before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3859#discussion_r116148624 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) { if (canceled) { List statesToDiscard = new ArrayList<>(); - statesToDiscard.add(metaStateHandle); - statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(newSstFiles.values()); + synchronized (this) { --- End diff -- Yes, i agree. The key point here is to make sure the stopping of the materialization thread. Synchronization does little help here. So i prefer to remove synchronization here, what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3859: [FLINK-6504] [FLINK-6467] [checkpoints] Add needed...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3859 [FLINK-6504] [FLINK-6467] [checkpoints] Add needed synchronization for RocksDBIncrementalSnapshotOperation This pull request adds missing synchronization for the access to the following variables: 1. `materializedSstFiles` in `RocksDBKeyedStateBackend`: The variable may be accessed simultaneously by the processing thread (read) and the materialization threads (write). Now we use `asynchronousSnapshotLock` to prevent concurrent access. 2. `newSstFiles`, `oldSstFiles` and `metaStateHandle` in `RocksDBIncrementalSnapshotOperation`: These variables may be accessed by both the cancel thread and the materialization thread. Though the materialization thread are supposed to be stopped when `releaseResources()` is executed, we add synchronization here to prevent potential conflicts. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6504 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3859.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3859 commit 1c1a08e0f7db5ec8663a46c495468583983c7291 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-05-10T01:55:34Z Add needed synchronization for RocksDBIncrementalSnapshotOperation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3801 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3801 Hi @StefanRRichter Thanks a lot for you pointing out the problem the suggestion for the fix. I have updated the PR as suggested. A `CloseableRegistry` is used to track opened i/o streams now. And the opened i/o streams are closed first in the cancellation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3801 @StefanRRichter Thanks a lot for your review. I have updated the pull request as suggested. The following changes are made 1. Remove the checkpoint type for incremental checkpoints. Now the support for incremental checkpointing becomes a configurable feature in `RocksDBKeyedStateBackend`, just like asynchronous checkpointing in `HeapKeyedStateBackend`. Incremental checkpointing will be performed if the feature is enabled and the checkpoint to perform is not a savepoint. 2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` and do some refactoring. 3. Allow `KeyedStateHandle` to register shared states. 4. Maintain the information of last completed checkpoint with the notification of `AbstractStreamOperator`. 5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources in both full and incremental checkpointing. 6. Parameterize `PartitionedStateCheckpointingITCase` to test the snapshotting and restoring with different backend settings. It's appreciated if you can take a look at these changes. Any comment is welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703946 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List<KeyedBackendSerializationProxy.StateMetaInfo> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName)
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703775 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map<String, StreamStateHandle> baseSstFiles; + + private List<KeyedBackendSerializationProxy.StateMetaInfo> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map<String, StreamStateHandle> sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map<String, StreamStateHandle> miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerialization
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565991 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- I attempted to put the restore state in the constructor as we discussed. But it turns out impossible. All state backends should be registered in the task so that the backends can be closed when the task is canceled. If we put the restoring in the constructor of the backends, the construction of the backends may be blocked (e.g., due to the access to HDFS). Since the construction is not completed yet, the backend will not be registered and hence will not be closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565968 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map<String, StreamStateHandle> sstFiles; + + private final Map<String, StreamStateHandle> miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map<String, StreamStateHandle> sstFiles, + Map<String, StreamStateHandle> miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map<String, StreamStateHandle> getSstFiles() { + return sstFiles; + } + + public Map<String, StreamStateHandle> getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc fil
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114500597 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map<String, StreamStateHandle> baseSstFiles; + + private List<KeyedBackendSerializationProxy.StateMetaInfo> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map<String, StreamStateHandle> sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map<String, StreamStateHandle> miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerialization
[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3801 Hi @gyfora I am very happy to hear from you. The following are the answers to your questions. Kindly let me know if you have any idea of them. 1. The incremental checkpoints supports rescaling. It's true that the implementation checkpoints files directly for multiple key groups together. But in the cases where the degree of parallelism changes, the files will be passed to all the state backends whose key groups are in the files. Then the backends will iterate over all the key-value pairs in the files and pick up those kv pairs that belong to them. 2. In the cases we restore from a full snapshot (which is formatted as key-value pairs), the next incremental checkpoint will contain all the files. It may seem a little bit inefficient because i intend to make each checkpoint self-contained. Given that full snapshots and incremental snapshots are in different formats, we have to take a "full" incremental snapshot as the base for following checkpoints. 3. That is a very good question. It will be flexible that users can choose the scheme of checkpoints (say one full checkpoint after n incremental checkpoints). But i think making every checkpoint incremental is acceptable because incremental checkpoints are more efficient in most cases. Those backends which do not support incremental checkpointing can still take full snapshotting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3801 [FLINK-6364] [checkpoints] Implement incremental checkpointing in RocksDBKeyedStateBackend This is the initial implementation of incremental checkpointing in RocksDBKeyedStateBackend. Changes include 1. Add a new `CheckpointType` for incremental checkpoints. 2. Call the `restore()` method for all `KeyedStateBackend` if the restore state handle is null or empty. 3. Implement `RocksDBIncrementalSnapshotOperation` and `RocksDBIncrementalRestoreOperation` which supports incremental snapshotting and restoring with/without parallelism changes, respectively. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6364 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3801.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3801 commit c5340a2186ecfbc48c46048db5adb11af83db511 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-04-29T15:44:36Z Implement incremental checkpointing in RocksDBKeyedStateBackend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3524 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3359 @vpernin Thanks very much for your attention. The PR is supposed to work on 1.3-SNAPSHOT, but it's not testable now due to some known bugs. Besides, i want to add support for asynchronous snapshots of timers in this pull request. Currently, the snapshots for timers are taken synchronously --- no stream record can be processed before the snapshots are taken. In our tests where there are millions of timers, it takes approximately several seconds to complete the snapshotting. The performance, hence, is significantly degraded when the checkpoint frequency is large. To allow asynchronous snapshotting, we need some refactoring on how internal timer services are restored and snapshotted. Now `InternalTimerService` s, similar to keyed states, are stored in `KeyedStateBackend`. That way, we can benefit from the optimizations made on the snapshotting of keyed states, taking snapshots asynchronously (and incrementally in the near future). I am working on this work right now. It's appreciated that you could help test the feature when it is done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen I have updated the PR, making the following changes: 1. Add a method called `discardSharedStatesOnFail()` in `CompositeStateHandle`. This method is called when the pending checkpoint fails to complete. That way, we do not need to register shared states once an acknowledge message is received. All shared states are registered only when the pending checkpoint completes. 2. Add `SharedStateHandle` and refactor `SharedStateRegistry` as suggested. 3. Both registration and unregistration of shared states now are taken place in `CompletedCheckpoint`. What do you think of these changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108870133 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java --- @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final int num = 5; + final int num = 5; - // - insert - - System.out.println("begin insert"); + // - insert - + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - // - read (attempt 1) - + // - read (attempt 1) - - final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; - final long beginGet1 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet1 = System.nanoTime(); + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); - // - read (attempt 2) - + // - read (attempt 2) - - final long beginGet2 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet2 = System.nanoTime(); + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); - // - compact - - System.out.println("compacting..."); - final long beginCompact = System.nanoTime(); - rocksDB.compactRange(); - final long endCompact = System.nanoTime(); + // - compact - + System.out.println("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); - System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); - // - read (attempt 3) - + //
[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108869236 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java --- @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); - final Unsafe unsafe = MemoryUtils.UNSAFE; - final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; - final int num = 5; - System.out.println("begin insert"); + final int num = 5; + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - - final byte[] resultHolder = new byte[num * valueBytes.length]; - - final long beginGet = System.nanoTime(); - - final RocksIterator iterator = rocksDB.newIterator(); - int pos = 0; - - // seek to start - unsafe.putInt(keyTemplate, offset, 0); - iterator.seek(keyTemplate); - - // mark end - unsafe.putInt(keyTemplate, offset, -1); - - // iterate - while (iterator.isValid()) { - byte[] currKey = iterator.key(); - if (samePrefix(keyBytes, currKey)) { - byte[] currValue = iterator.value(); - System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); - pos += currValue.length; - iterator.next(); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); } - else { - break; + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + + final byte[] resultHolder = new byte[num * valueBytes.length]; + + final long beginGet = System.nanoTime(); + + final RocksIterator iterator = rocksDB.newIterator(); --- End diff -- The iterator should be closed once it's not used. So it's better to use try-with-resources here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108868849 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java --- @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final int num = 5; + final int num = 5; - // - insert - - System.out.println("begin insert"); + // - insert - + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - // - read (attempt 1) - + // - read (attempt 1) - - final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; - final long beginGet1 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet1 = System.nanoTime(); + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); - // - read (attempt 2) - + // - read (attempt 2) - - final long beginGet2 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet2 = System.nanoTime(); + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); - // - compact - - System.out.println("compacting..."); - final long beginCompact = System.nanoTime(); - rocksDB.compactRange(); - final long endCompact = System.nanoTime(); + // - compact - + System.out.println("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); - System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); - // - read (attempt 3) - + //
[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108869459 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java --- @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); - final Unsafe unsafe = MemoryUtils.UNSAFE; - final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; - final int num = 5; - System.out.println("begin insert"); + final int num = 5; + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - - final byte[] resultHolder = new byte[num * valueBytes.length]; - - final long beginGet = System.nanoTime(); - - final RocksIterator iterator = rocksDB.newIterator(); - int pos = 0; - - // seek to start - unsafe.putInt(keyTemplate, offset, 0); - iterator.seek(keyTemplate); - - // mark end - unsafe.putInt(keyTemplate, offset, -1); - - // iterate - while (iterator.isValid()) { - byte[] currKey = iterator.key(); - if (samePrefix(keyBytes, currKey)) { - byte[] currValue = iterator.value(); - System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); - pos += currValue.length; - iterator.next(); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); } - else { - break; + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + + final byte[] resultHolder = new byte[num * valueBytes.length]; + + final long beginGet = System.nanoTime(); + + final RocksIterator iterator = rocksDB.newIterator(); + int pos = 0; + + // seek to start + unsafe.putInt(keyTemplate, offset, 0); + iterator.seek(keyTemplate); + + // mark end + unsafe.putInt(keyTemplate, offset, -1); + + // iterate + while (iterator.isValid()) { + byte[] currKey = iterator.key(); + if (samePrefix(keyBytes, currKey)) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); +
[GitHub] flink pull request #3652: [FLINK-6210] close RocksDB in ListViaMergeSpeedMin...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108868469 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java --- @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { --- End diff -- I think it's better to use try-with-resources here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen Thanks very much for your valuable comments. The following are some of my thoughts. * Now the registration of shared states is put in `CheckpointCoordinator` because it's needed whenever a `PendingCheckpoint` receives a state handle or a `CompletedCheckpoint` is recovered. But I think it does make sense to put both the registration and unregistration of shared states in the same place. I will update the PR so that the logics are put in `PendingCheckpoint`s and `CompletedCheckpoint`s. * When a `SubtaskState` is not successfully added to the `PendingCheckpoint`, the state objects in the `SubtaskState` should be correctly deleted. The discarding of these `SubtaskState`s varies in different cases. In the case where the `PendingCheckpoint` fails, the `SubtaskState` should delete both its private states and shared states. But in the case where the `CompletedCheckpoint` is subsumed, the `SubtaskState` should delete those unreferenced shared states (possibly created by others) instead of its shared states. By registering the shared states first, we can unify the implementation in the two cases. Those shared states in the failed `PendingCheckpoint` are always not referenced by other checkpoints. So they can be correctly discarded by the registry when the `PendingCheckpoint` unregisters its shared states, just like a subsumed `CompletedCheckpoint` does. Another choice is refactoring the interface of `CompositeStateHandle`. Three methods, namely `onComplete()`, `onFail()` and `onSubsume()`, will be provided. A`CompositeStateHandle` can implement these methods to correctly deal with its states under these cases. What do you think? * It's a good idea to introduce `SharedStateHandle` for shared states. It can improve the performance and allow safety checks. I will add it in the update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3524#discussion_r108826346 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@code SharedStateRegistry} will be deployed in the + * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * maintain the reference count of those state objects shared among different + * checkpoints. Each shared state object must be identified by a unique key. + */ +public class SharedStateRegistry implements Serializable { + + private static final long serialVersionUID = -8357254413007773970L; + + /** All registered state objects */ + private final Map<String, Tuple2<StateObject, Integer>> registeredStates = new HashMap<>(); + + /** All state objects that are not referenced any more */ + private transient final List discardedStates = new ArrayList<>(); + + /** +* Register the state in the registry +* +* @param key The key of the state to register +* @param state The state to register +*/ + public void register(String key, StateObject state) { + Tuple2<StateObject, Integer> stateAndRefCnt = registeredStates.get(key); --- End diff -- This method is always called by `registerAll()`, so I omitted the synchronization here. I will add the synchronization missed here so that the method can correctly work when the assumption is not satisfied. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3524#discussion_r108826017 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@code SharedStateRegistry} will be deployed in the + * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * maintain the reference count of those state objects shared among different + * checkpoints. Each shared state object must be identified by a unique key. + */ +public class SharedStateRegistry implements Serializable { --- End diff -- I made it serializable due to the serialization issues in powermock. In some tests for `CompletedCheckpointStore`, I mock some `SubtaskState`s and attempt to verify that they can correctly register their shared states when the `CompletedCheckpointStore` is recovered. To allow the storage of the tests work, all classes related to the mocked objects should be serializable. With your changes made in the storage of `CompletedCheckpoint`, I think we can make the tests work without requiring `SharedStateRegistry` to be serializable. But at prior to that, i prefer to keep `SharedStateRegistry` serializable so that the tests can correctly work now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3558: [FLINK-6096][checkpoint] Refactor the migration of...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3558#discussion_r108615377 --- Diff: flink-runtime/src/main/java/org/apache/flink/migration/v0/SavepointV0.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.v0; + +import org.apache.flink.migration.v0.runtime.TaskStateV0; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Savepoint version 0. + * + * This format was introduced with Flink 1.1.0. + * + * checkpointId: long + * numTaskStates: int + * |jobVertexID: long[2] + * |parallelism: int + * |numSubtaskStates: int + * ||subtaskIndex: int + * ||serializedValueLength: int + * ||serializedValue: byte[] (null if serializedValueLength is -1) + * ||subtaskStateSize: long + * ||subtaskStateDuration: long + * |numKeyGroupStates: int + * ||subtaskIndex: int + * ||serializedValueLength: int + * ||serializedValue: byte[] (null if serializedValueLength is -1) + * ||keyGroupStateSize: long + * ||keyGroupStateDuration: long + * + */ +@Deprecated +@SuppressWarnings("deprecation") +public class SavepointV0 implements Savepoint { + + /** The classes that are migrated in SavepointV0 */ + public static final Map<String, String> MigrationMapping = new HashMap<String, String>() {{ + + /* migrated state descriptors */ + put("org.apache.flink.api.common.state.StateDescriptor", --- End diff -- @StefanRRichter +1 for option b) I planned to use a tool to automatically create the mapping, but i found it very difficult to obtain the changes in the savepoints. It seems we must find all migrated classes manually. I agree with you that the maintenance of the mapping is error-prone. The usage of a naming scheme convention will help us to avoid typo in new class names. But we still have to carefully find all classes that are changed in the savepoints. I think it's necessary for us. We must keep in mind what are changed in the savepoint format. In my opinion, the number of migrated classes will be very small in the future. Most changes should happen in the serialization formats of actual states (e.g., the snapshots of keyed states and timer services) instead of the state handles. Therefore, there will be few efforts needed to maintain the mapping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3531 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks for your work. I will close the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter I updated the PR as suggested. Very appreciated for your hard work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen I have updated the PR as suggested. Changes include 1. Make `StateRegistry` to be `SharedStateRegistry` where only shared states are registered. Now the `discardState()` method is supposed to delete those private states in the checkpoint. 2. `SharedStateRegistry` now is deployed by the `CheckpointCoordinator`. The state handle will register its shared states once it is received by the coordinator. In another words, all shared states in a completed checkpoint are registered when the checkpoint is added into the `CompletedCheckpointStore`. All checkpoints (including savepoints) will unregister shared states when they are removed from the store. Savepoints should not contain any shared state. Therefore the unregistration will not discard any state in the savepoints. 3. When recovering from failures or restarting from a savepoint, the `CheckpointCoordinator` will rebuild the registry with the checkpoints recovered in the `CompletedCheckpointStore`. 4. Related tests are added to ensure correctness. 5. The conflicts with the master branch are resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107704681 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java --- @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) /** * -* @return the internal key-group range to offsets metadata +* @return the start key group in the key-group range of this handle */ - public KeyGroupRangeOffsets getGroupRangeOffsets() { - return groupRangeOffsets; + public int getStartKeyGroup() { --- End diff -- I have removed all pass-through methods except `getGroupRangeOffsets()` because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to get the iterator for key groups and their offsets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks a lot for your comments. I have updated the pull request as suggested, making the type of raw keyed states to be `KeyedStateHandle` as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646547 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( --- End diff -- +1. Will update the PR as suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646429 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); --- End diff -- The idea is great! It does make sense to allow a `KeyedStateHandle` to create a new `KeyedStateHandle` to the states of the sub range. I will update the PR as suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 Hi @StephanEwen The main reason is that we must have methods to delete those unshared objects in failed `PendingCheckpoint`s. The `discardState()` method is called when either the `PendingCheckpoint` fails or the `CompletedCheckpoint` is subsumed. Under current settings, the `discardState()` is supposed to delete only those unshared objects, and the shared objects are deleted by the `StateRegistry`. Hence, we must register those state handles once they are received so that their shared objects can be correctly deleted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3521 @StephanEwen I added two tests to ensure that the checkpoints are not in the store when exceptions are thrown. The methods to mock the exceptions of subsuming may be a little tricky. Do you have any suggestion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen Thanks a lot for your valuable comments. I will update the PR as suggested. * I think it's a good idea that we make the `StateRegistry` into `SharedStateRegistry`. That will need (1) the state handle must not discard registered objects in the `discardState` method and (2) the state handle has to register shared object once it is received by the coordinator (now the state handle does not register their objects before its checkpoint completes). * Now that state handles have to register their objects once they are received by the coordinator, we should move `SharedStateRegister` from `CompletedCheckpointStore` to `CheckpointCoordinator`. * It's better for `StateRegistry` directly discard an object once its reference count is 0. I used a list to collect discarded objects because I want to make the changes in the discarding of completed checkpoint as few as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3558: [FLINK-6096][checkpoint] Refactor the migration of...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3558 [FLINK-6096][checkpoint] Refactor the migration of old-versioned savepoints 1. The migrated classes in `SavepointV0` are moved to the package `org.apache.flink.migration.v0`. In the future, each deprecated savepoint version will have its own migration package. 2. A mapping is deployed in `SavepointV0` to record all deprecated classes. `MigrationInstantiationUtil` now will use the mapping to correctly deserialized those deprecated classes. 3. Unused methods in migrated classes are removed. 4. The formats in old snapshots are added in the comments to help understand the restoring. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6096 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3558.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3558 commit b5f9d73703372c0d71da2c84cdd11dce16641b28 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-17T08:56:00Z Refactor the migration of old-versioned savepoints --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3380 I prefer to throw more detailed exceptions e.g. `IncompatibleTypeSerializerException`, `StateAccessException` and `StateNotFoundException`. They all are extended from `FlinkRuntimeException`. Users can get more information from these exceptions if they catch the exceptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3531 [FLINK-6034][checkpoint] Add KeyedStateHandle for the snapshots in keyed streams ## Changes - Add `KeyedStateHandle` for the snapshots in keyed streams. `KeyGroupsStateHandle` now is one of its implementation. - Distribute `KeyedStateHandle`s to subtasks with their key group range. A `KeyedStateHandle` will be assigned to all subtasks whose key group range overlap with its range. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6034 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3531.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3531 commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-14T11:04:37Z Add KeyedStateHandle for the snapshots in keyed streams --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3524: [FLINK-6014][checkpoint] Allow the registration of...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3524 [FLINK-6014][checkpoint] Allow the registration of state objects in checkpoints - Introduce `CompositeStateHandle` which is composed of a collection of `StateObject`s and can register these `StateObject`s in `StateRegistry`. - Use `StateRegistry` to maintain the reference count of the `StateObject`s in completed checkpoints. A `StateObject` is deleted only if its reference count is zero. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6014 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3524.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3524 commit a1796f2a0538c02a2154459fef8f931e14e18fca Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-13T11:23:47Z Allow registration of state objects in checkpoints commit de272a1f78356b940a3b9421dad70937596c3a94 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-13T11:29:21Z remove unnecessary synchronization in StateRegistry --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3521: [FLINK-6027][checkpoint] Ignore the exception thro...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3521 [FLINK-6027][checkpoint] Ignore the exception thrown by the subsuming of completed checkppoints The exception thrown during the subsuming of old checkpoints now will be ignored. Now, `CompletedCheckpointStore#addCheckpoint` will throw exceptions only when the completed checkpoint is not written in the store. In such cases, the coordinator is safe to delete the states in the checkpoint because we are impossible to recover from the checkpoint. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6027 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3521.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3521 commit e89e947e74693ef1d5fdcfaebdc1818b138f2fd1 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-13T09:03:42Z Ignore the exception thrown by the subsuming of completed checkppoints commit 9ba89c42ed4751c68cf9520032e40dc6e857212c Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-13T10:04:30Z Change the log level to WARNING --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3462: [FLINK-5917][state] Remove size() method from MapS...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3462 [FLINK-5917][state] Remove size() method from MapState The `size()` method is removed from `MapState` because its implementation is costly in the backends. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5917 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3462.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3462 commit 6906b15ff593f46e106348aa1f5772e6b78efe74 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-03-03T02:27:11Z Remove size() method from MapState --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103612613 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -428,6 +450,9 @@ CheckpointTriggerResult triggerCheckpoint( catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + if(numUnsuccessful > maxUnsuccessfulCheckpoints) { --- End diff -- You are right. I missed it. Sorry for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103605788 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -428,6 +450,9 @@ CheckpointTriggerResult triggerCheckpoint( catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + if(numUnsuccessful > maxUnsuccessfulCheckpoints) { --- End diff -- Here the counter records the total number of failed attempts. Since a streaming job is intended to run a quite long time, the number of failed attempts will eventually exceed the limit. We should use a different counter here which is reset once a pending checkpoint successfully completes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103605271 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) { if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception("Failed to trigger checkpoint")); } + if(numUnsuccessful > maxUnsuccessfulCheckpoints) { + return failExecution(executions); + } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } + private CheckpointTriggerResult failExecution(Execution[] executions) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + for (Execution execution : executions) { + // fail the graph + execution.fail(new Throwable("The number of max unsuccessful checkpoints attempts exhausted")); --- End diff -- I think it's not good here to fail the executions one by one. We should call `ExecutionGraph#fail` to fail the execution graph. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103604470 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -121,6 +121,8 @@ /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; + /** The maximum number of unsuccessful checkpoints */ + private final int maxUnsuccessfulCheckpoints; --- End diff -- I think `failed` is a better word than `unsuccessful`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3336 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3380 I think we may borrow some ideas from Java. For example, the methods in `Map` do not throw any exception in their signatures. But the interfaces define a set of specific `RuntimeException` that can be thrown by the implementation such as `ClassCastException`. Maybe we can do similarly. What do you think? @StephanEwen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3387: [FLINK-5863][queryable state] Add the serializatio...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3387 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3387: [FLINK-5863][queryable state] Add the serialization of li...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3387 The PR is just some cleaning of the code. Now that we are planning to refactor the implementation, I agree to close the PR and I am very willing to contribute to the FLIP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3380 I like the idea of find some more "specific" exceptions. Flink can define some specific Exceptions like `StateAccessException`. That may help better understand the code. I am also thinking that it's better not to put any exception in the signature of user-facing interfaces like `State`. All exceptions thrown by these methods are `RuntimeException` which are caught and handled by Flink. It's because these methods provided by Flink are supposed to work properly and do not throw any exception. Actually, users can do few things even if they can catch the exceptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3336 @aljoscha Thanks a lot for your hard work. I have fixed the typos in the documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3387: [FLINK-5863][queryable state] Add the serializatio...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3387 [FLINK-5863][queryable state] Add the serialization of list states in KvStateRequestSerializer 1. Add `serializeList()` in `KvStateRequestSerialization` 2. Modify the unit tests of `KvStateRequestSerialization`, without the access to protected methods. 3. Move `KvStateRequestSerializationRocksDBTest` from package `org.apache.flink.test.query` to `org.apache.flink.contrib.streaming.state`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5863 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3387 commit 5fc51b5f233ec62e143c1938b11c677b2260575b Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-22T03:56:24Z Add the serialization of list states in KvStateRequestSerializer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3380: [FLINK-5865][state] Throw original exception in th...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3380 [FLINK-5865][state] Throw original exception in the states The wrapping of `RuntimeException` is removed so that we can avoid redundant stack printed in the log. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5865 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3380.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3380 commit 0928c44063d5c893b24c24f21050ee643159fb36 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-21T16:20:25Z Throw original exception in RocksDB states --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3336 @StefanRRichter Very thanks for your work. I have rebased the pull request and resolved the conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102225285 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -382,11 +342,26 @@ private UV deserializeUserValue(byte[] rawValueBytes) { this.rawValueBytes = rawValueBytes; this.deleted = false; } - + + public void remove() { + deleted = true; + rawValueBytes = null; + + try { + db.remove(columnFamily, writeOptions, rawKeyBytes); + } catch (RocksDBException e) { + throw new RuntimeException("Error while removing data from RocksDB.", e); --- End diff -- I modify the method signature because I find, except `ValueState`, the methods in other states all throw `Exception`. I think it's okay because `MapState` is a common interface which has no idea of the implementation. The implementation of these methods, however, should throw some specific exception like `IOException` or `RocksDBException`. I think it's reasonable. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3336 I have added the documentation for `MapState`. You may take a look to see if it's properly written. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3336 @StefanRRichter I have updated the pull request as suggested. Now the map serializer supports the serialization of null values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102153318 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102138099 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java --- @@ -484,6 +487,71 @@ public static Throwable deserializeServerFailure(ByteBuf buf) throws IOException return null; } } + + /** +* Serializes all values of the Iterable with the given serializer. +* +* @param entries Key-value pairs to serialize +* @param keySerializer Serializer for UK +* @param valueSerializer Serializer for UV +* @param Type of the keys +* @param Type of the values +* @return Serialized values or null if values null or empty +* @throws IOException On failure during serialization +*/ + public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) throws IOException { + if (entries != null) { + Iterator<Map.Entry<UK, UV>> it = entries.iterator(); + + if (it.hasNext()) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + + while (it.hasNext()) { + Map.Entry<UK, UV> entry = it.next(); + + keySerializer.serialize(entry.getKey(), dos); + valueSerializer.serialize(entry.getValue(), dos); + } + + return dos.getCopyOfBuffer(); + } else { + return null; --- End diff -- The function is unused now. I will delete it in the update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102135289 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java --- @@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws Exception { KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, LongSerializer.INSTANCE); } + + /** +* Tests map serialization utils. +*/ + @Test + public void testMapSerialization() throws Exception { + final long key = 0L; + + // objects for heap state list serialisation + final HeapKeyedStateBackend longHeapKeyedStateBackend = + new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + ClassLoader.getSystemClassLoader(), + 1, new KeyGroupRange(0, 0) + ); + longHeapKeyedStateBackend.setCurrentKey(key); + + final InternalMapState<VoidNamespace, Long, String> mapState = longHeapKeyedStateBackend.createMapState( + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + testMapSerialization(key, mapState); + } + + /** +* Verifies that the serialization of a map using the given map state +* matches the deserialization with {@link KvStateRequestSerializer#deserializeList}. +* +* @param key +* key of the map state +* @param mapState +* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance +* +* @throws Exception +*/ + public static void testMapSerialization( + final long key, + final InternalMapState<VoidNamespace, Long, String> mapState) throws Exception { + + TypeSerializer userKeySerializer = LongSerializer.INSTANCE; + TypeSerializer userValueSerializer = StringSerializer.INSTANCE; + mapState.setCurrentNamespace(VoidNamespace.INSTANCE); + + // List + final int numElements = 10; + + final Map<Long, String> expectedValues = new HashMap<>(); + for (int i = 0; i < numElements; i++) { + final long value = ThreadLocalRandom.current().nextLong(); --- End diff -- I prefer to use `ThreadLocalRandom.current()` which is also used in other tests in this file. Though it makes difficult to reproduce the case, it may help to find corner cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102129362 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, RocksDBException { } @Override - protected <N, T> InternalValueState<N, T> createValueState( + public <N, T> InternalValueState<N, T> createValueState( --- End diff -- It is mainly due to the unit tests in `KvStateRequestSerializerTest` which need the accessors to `InternalKvState`. A better choice to use `getPartitionState()` to obtain a user-facing state and convert it to an internal state. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102128355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java --- @@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend keyedStateBackend, ExecutionC } } + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(executionConfig); + MapState<UK, UV> originalState = getPartitionedState(stateProperties); + return new UserFacingMapState<>(originalState); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); --- End diff -- Currently, `KeyedStateStore#getState()` does not throw exception in its declaration. `RuntimeException` is the only exception that can be thrown. Since the modification to the interface will affect user code (users will have to deal with thrown exceptions), I am not sure it's okay to modify the function declaration in `KeyedStateStore`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127867 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127767 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102126863 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125445 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125062 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; --- End diff -- To be honest, i have no idea why we can't put `writeOptions` in base class. We put it in `AbstractRocksDBState` and do not come across any problem in our production environment. Maybe @aljoscha is more familiar with the problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r101987352 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be + * added, updated and retrieved. + * + * The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param Type of the keys in the state. + * @param Type of the values in the state. + */ +@PublicEvolving +public interface MapState<UK, UV> extends AppendingState<Map<UK, UV>, Iterable<Map.Entry<UK, UV>>> { --- End diff -- I agree that it's `MultiMapState`, instead of `MapState`, that is supposed to be an `AppendingState`. I will update the interface hierarchy, making `MapState` not an `AppendingState`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3359 [FLINK-5544][streaming] Add InternalTimerService implemented in RocksDB - Refactor the methods defined in `InternalTimerService`. Some common implementation in `HeapInternalTimerService` now is moved in `InternalTimerService`. - Implement `RocksDBInternalTimerService` which stores timers in RocksDB and sorts them with an in-momory heap. - Implement `InternalTimerServiceTestBase` to verify the implementation of `InternalTimerService`. - Update `AbstractStreamOperator` to allow the usage of customized `InternalTimerService`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5544 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3359.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3359 commit 341fd97c47336d4f87cea997e134af68f8ef5265 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-20T09:55:40Z Add InternalTimerService implemented in RocksDB --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r101936792 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be + * added, updated and retrieved. + * + * The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param Type of the keys in the state. + * @param Type of the values in the state. + */ +@PublicEvolving +public interface MapState<UK, UV> extends AppendingState<Map<UK, UV>, Iterable<Map.Entry<UK, UV>>> { --- End diff -- `MapState` provides the `add` method which puts a collection of key-value pairs into the state. Though the semantics may be a little different in existing `AppendingState`s, I think it's okay for `MapState` to be an `AppendingState` because the interface does not enforce any restriction on the modification of previous data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3336 [FLINK-4856][state] Add MapState in KeyedState 1. Add `MapState` and `MapStateDescriptor` 2. Implementation of `MapState` in `HeapKeyedStateBackend` and `RocksDBKeyedStateBackend`. 3. Add accessors to `MapState` in `RuntimeContext` You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-4856 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3336 commit 430b4f596acbff0a9dfdc20fbb2430a8fad819f9 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-17T03:19:18Z Add MapState in KeyedState --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3305 @tillrohrmann Both `org.apache.flink.api.common.typeutils.base` and `"org.apache.flink.api.common.typeinfo` are in the module `flink-core`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3305 @StephanEwen @tillrohrmann I found a problem that is the packages `ListTypeInfo` and `ListTypeSerializer` locate. Now `ListTypeInfo` is put in package "org.apache.flink.api.java.typeutils" and `ListSerializer` is put in package "org.apache.flink.api.common.typeutils.base". But i think it's better to put "ListTypeInfo" into package "org.apache.flink.api.common.typeinfo". What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3305 @tillrohrmann Thanks for your review. Sorry for the reformatted code. It seems that my IDE will automatically reformat all the files I've edited. I will revert the reformatted code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3305: [FLINK-5790][StateBackend] Use list types when ListStateD...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3305 @StephanEwen Thanks a lot for your comments. I have updated the code as suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/2768 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Close the pull request because the state descriptor now is refactored with the introduction of composited serializers (See [FLINK-5790](https://issues.apache.org/jira/browse/FLINK-5790)). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3305#discussion_r100967108 --- Diff: flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java --- @@ -47,9 +47,16 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr // the flink package may be at position 0 (regular class) or position 2 (array) final int flinkPackagePos; - if ((flinkPackagePos = className.indexOf(FLINK_BASE_PACKAGE)) == 0 || - (flinkPackagePos == 2 && className.startsWith(ARRAY_PREFIX))) - { + if (className.contains("org.apache.flink.runtime.state.ArrayListSerializer")) { --- End diff -- The code here is a little tricky. I think we should use a Map to record all modified classes and their corresponding backups. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3305 [FLINK-5790][StateBackend] Use list types when ListStateDescriptor extends StateDescriptor 1. Now the state serializer, instead of the element serializer, is stored in `ListStateDescriptor`. 2. `ArrayListTypeInfo` is introduced to help create serializers with the element type. 3. `ArrayListSerializer` is moved to the package org.apache.flink.api.common.typeutils.base to avoid cyclic dependencies. 4. Old implementation of `ListStateDescriptor` is kept in the migration package for back compatibility. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5790 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3305 commit e8e11b7965365178453ab6eab78c6d5ac98f3537 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-14T05:39:30Z Use list types when ListStateDescriptor extends StateDescriptor commit ba8cdc919fc2e66b3e81f6f8566140bef53a9b96 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2017-02-14T06:22:25Z Support back compatibility for ListStateDescriptor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha That way, it's very confusing that a `ReadableState` is not a `State`. Hence I made `State` read-only and introduced the `UpdatableState` interface who extends `State` with the method `clear()`. These changes (mainly the introduction of the `get()` method) are intended to remove the duplicated code. As they have little relationship with the implementation of map states. I think it's okay not to change these interfaces now. But I prefer to rethink the state hierarchy in the near future because there exists too much duplicated code now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @StephanEwen Thanks a lot for your comments. **Removing `clear()` from `State`** This change is suggested by @aljoscha who wants to let broadcast states share the same interface (see the discussion in [FLINK-5023](https://issues.apache.org/jira/browse/FLINK-5023)) . As mentioned, the broadcast states are read-only in some cases. Hence it's suggested not to provide the `clear()` method in the base `State` interface. **Changing the `State` interface** The `State` interface is typed because I want to provide the `get()` method for all states so that we can retrieve the data in the state (under the current key for keyed states). The functionality is already provided by all states except `ValueState` who provides the same functionality with the `value()` method. Providing the method for all states can help reduce some duplicated code in the implementation. It also makes sense for read-only states mentioned above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Despite the changes in the state descriptors, the Flink jobs can restore from old versioned snapshots now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3053: [FLINK-5400] Add accessor to folding states in Run...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3053 [FLINK-5400] Add accessor to folding states in RuntimeContext - Add accessors in RuntimeContext and KeyedStateStore - Fix errors in the comments for reducing states in RuntimeContext and KeyedStateStore You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5400 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3053.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3053 commit f9d733a60f4809049e778045144528e8aff4a951 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2016-12-30T03:25:05Z Add accessor to folding states in RuntimeContext --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 I rebased the branch to resolve the conflicts with the master branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 I moved default value from `SimpleStateDescriptor` to `ValueStateDescriptor`. Now only `ValueStateDescriptor`s have default values. The serialization methods may contain some duplicated code, but i think it's acceptable. I also modify the implementation of `HeapReducingState`s. The first value will be copied before being put into the heap. @aljoscha What do you think of these changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Oh... I added another field to make the code more clear, but I did not notice the serialization problem. Thanks very much for your reminder. Your solution does work though the concept of "defaultValue" in folding states is a little confusing. Another solution to let `FoldingStateDescriptor` implement its own serialization method. And I prefer this one. What do you think? @aljoscha --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2768: [FLINK-5023 & FLINK-5024] Add SimpleStateDescripto...
GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/2768 [FLINK-5023 & FLINK-5024] Add SimpleStateDescriptor to clarify the concepts Changes in the definition of `State` and `StateDescriptor`: - Add `get()` in the `State` interface. - Remove type serializers of state values from `StateDescriptor`s. - Add `SimpleStateDescriptor` to simplify the construction of `ValueStateDescriptor`, `ReducingStateDescriptor` and `FoldingStateDescriptor`. - Changes the definition of `KeyedStateBackend` and `AbstractKeyedStateBackend` accordingly. - Modify the implementation of `ListStateDescriptor` accordingly. Changes in HeapStateBackend: - Let `AbstractHeapState` not implement the `State` interface. The `clear()` method now is removed from `AbstractHeapState`. - Add `HeapSimpleState` to simplify the implementation of `HeapValueState`, `HeapReducingState` and `HeapFoldingState`. - Change the implementation of `HeapValueState`, `HeapReducingState` and `HeapFoldingState` accordingly. Changes in RocksDBStateBackend: - Let `AbstractRocksDBState` not implement the `State` interface, removing the `clear()` method. Now, `AbstractRocksDBState` does not depend on the types of `State` and `StateDescriptor` any more. - Add `RocksDBSimpleState` to simplify the implementation of `RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState`. - Change the implementation of `RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState` accordingly. Others: - Update the usage of `State`s in the implementation of window operators. - Update the usage of `State`s in unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5023 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2768 commit 007a93b454e693bc3662f540ac5f33e899ce9058 Author: xiaogang.sxg <xiaogang@alibaba-inc.com> Date: 2016-11-08T02:38:22Z Refactor the interface of State and StateDescriptor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/2377 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/2377#discussion_r75481109 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java --- @@ -248,4 +374,40 @@ void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, I public boolean isConnected() { return resourceManager != null; } + + + /** +* Cancel the current job and notify all listeners the job's cancellation. +* +* @param cause Cause for the cancelling. +*/ + private void cancelAndClearEverything(Throwable cause) { + // currently, nothing to do here + } + + // + // Utility classes + // + private class JobMasterLeaderContender implements LeaderContender { --- End diff -- I used to make JobMaster implement the LeaderContender interface. But after checking Stephan's implementation of TaskExecutor, I modified my implementation :) JobMaster is not only a contender for JM's leader, but also a listener of RM's leader. If we let JM directly implement the `LeaderContender` interface, then we should also make JM implement the `LeaderRetrieval` interface. Note that the two interfaces both have the method called `handleError`. The implementation of `handleError` will be very difficult because we have to check the causes of the exception: if the JM's contention for leadership fails, the JM should kill itself. But in the cases where the listener of RM fails, JM can just wait there for the RM's recovery. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---