Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161117 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount > 1); + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); } + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + registeredSstFiles.putAll(unregisteredSstFiles); + unregisteredSstFiles.clear(); + registered = true; } @Override public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- We should not unregister those sst files that are not registered before.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---