This is an automated email from the ASF dual-hosted git repository.

klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a4250d248e [FLINK-29095][state] Improve logging in SharedStateRegistry
9a4250d248e is described below

commit 9a4250d248e93f3e87b211df98ce3d3c66aabca0
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Tue Aug 30 18:04:32 2022 +0200

    [FLINK-29095][state] Improve logging in SharedStateRegistry
---
 .../runtime/state/SharedStateRegistryImpl.java     | 122 ++++++++++++---------
 1 file changed, 70 insertions(+), 52 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index 2e16c528997..3a710428509 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -79,12 +79,12 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
 
     @Override
     public StreamStateHandle registerReference(
-            SharedStateRegistryKey registrationKey,
-            StreamStateHandle state,
-            long checkpointID,
-            boolean preventDiscardingCreatedCheckpoint) {
+            final SharedStateRegistryKey registrationKey,
+            final StreamStateHandle newHandle,
+            final long checkpointID,
+            final boolean preventDiscardingCreatedCheckpoint) {
 
-        checkNotNull(state);
+        checkNotNull(newHandle, "State handle should not be null.");
 
         StreamStateHandle scheduledStateDeletion = null;
         SharedStateEntry entry;
@@ -95,60 +95,78 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
             entry = registeredStates.get(registrationKey);
 
             if (entry == null) {
-                // Additional check that should never fail, because only state 
handles that are not
-                // placeholders should
-                // ever be inserted to the registry.
                 checkState(
-                        !isPlaceholder(state),
+                        !isPlaceholder(newHandle),
                         "Attempt to reference unknown state: " + 
registrationKey);
 
-                entry = new SharedStateEntry(state, checkpointID);
+                LOG.trace(
+                        "Registered new shared state {} under key {}.", 
newHandle, registrationKey);
+                entry = new SharedStateEntry(newHandle, checkpointID);
                 registeredStates.put(registrationKey, entry);
-                LOG.trace("Registered new shared state {} under key {}.", 
entry, registrationKey);
 
-            } else {
-                // Delete if this is a real duplicate.
-                // Note that task (backend) is not required to re-upload state
-                // if the confirmation notification was missing.
-                // However, it's also not required to use exactly the same 
handle or placeholder
-                if (!Objects.equals(state, entry.stateHandle)) {
-                    if (entry.confirmed || isPlaceholder(state)) {
-                        scheduledStateDeletion = state;
-                    } else {
-                        // Old entry is not in a confirmed checkpoint yet, and 
the new one differs.
-                        // This might result from (omitted KG range here for 
simplicity):
-                        // 1. Flink recovers from a failure using a checkpoint 
1
-                        // 2. State Backend is initialized to UID xyz and a 
set of SST: { 01.sst }
-                        // 3. JM triggers checkpoint 2
-                        // 4. TM sends handle: "xyz-002.sst"; JM registers it 
under "xyz-002.sst"
-                        // 5. TM crashes; everything is repeated from (2)
-                        // 6. TM recovers from CP 1 again: backend UID "xyz", 
SST { 01.sst }
-                        // 7. JM triggers checkpoint 3
-                        // 8. TM sends NEW state "xyz-002.sst"
-                        // 9. JM discards it as duplicate
-                        // 10. checkpoint completes, but a wrong SST file is 
used
-                        // So we use a new entry and discard the old one:
-                        scheduledStateDeletion = entry.stateHandle;
-                        entry.stateHandle = state;
-                    }
-                    LOG.trace(
-                            "Identified duplicate state registration under key 
{}. New state {} was determined to "
-                                    + "be an unnecessary copy of existing 
state {} and will be dropped.",
-                            registrationKey,
-                            state,
-                            entry.stateHandle);
-                }
+                // no further handling
+                return entry.stateHandle;
+
+            } else if (entry.stateHandle == newHandle) {
+                // might be a bug but state backend is not required to use a 
place-holder
+                LOG.info(
+                        "Duplicated registration under key {} with the same 
object: {}",
+                        registrationKey,
+                        newHandle);
+            } else if (Objects.equals(entry.stateHandle, newHandle)) {
+                // might be a bug but state backend is not required to use a 
place-holder
+                LOG.info(
+                        "Duplicated registration under key {} with the new 
object: {}.",
+                        registrationKey,
+                        newHandle);
+            } else if (isPlaceholder(newHandle)) {
                 LOG.trace(
-                        "Updating last checkpoint for {} from {} to {}",
+                        "Duplicated registration under key {} with a 
placeholder (normal case)",
+                        registrationKey);
+                scheduledStateDeletion = newHandle;
+            } else if (entry.confirmed) {
+                LOG.info(
+                        "Duplicated registration under key {} of a new state: 
{}. "
+                                + "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
+                                + "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
                         registrationKey,
-                        entry.lastUsedCheckpointID,
-                        checkpointID);
-                entry.advanceLastUsingCheckpointID(checkpointID);
-                if (preventDiscardingCreatedCheckpoint) {
-                    entry.preventDiscardingCreatedCheckpoint();
-                }
+                        newHandle);
+                scheduledStateDeletion = newHandle;
+            } else {
+                // Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
+                // This might result from (omitted KG range here for 
simplicity):
+                // 1. Flink recovers from a failure using a checkpoint 1
+                // 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
+                // 3. JM triggers checkpoint 2
+                // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
+                // 5. TM crashes; everything is repeated from (2)
+                // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
+                // 7. JM triggers checkpoint 3
+                // 8. TM sends NEW state "xyz-002.sst"
+                // 9. JM discards it as duplicate
+                // 10. checkpoint completes, but a wrong SST file is used
+                // So we use a new entry and discard the old one:
+                LOG.info(
+                        "Duplicated registration under key {} of a new state: 
{}. "
+                                + "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
+                                + "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+                        registrationKey,
+                        newHandle);
+                scheduledStateDeletion = entry.stateHandle;
+                entry.stateHandle = newHandle;
             }
-        }
+
+            LOG.trace(
+                    "Updating last checkpoint for {} from {} to {}",
+                    registrationKey,
+                    entry.lastUsedCheckpointID,
+                    checkpointID);
+            entry.advanceLastUsingCheckpointID(checkpointID);
+
+            if (preventDiscardingCreatedCheckpoint) {
+                entry.preventDiscardingCreatedCheckpoint();
+            }
+        } // end of synchronized (registeredStates)
 
         scheduleAsyncDelete(scheduledStateDeletion);
         return entry.stateHandle;
@@ -246,7 +264,7 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
     private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
         // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
         if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-            LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
+            LOG.debug("Scheduled delete of state handle {}.", 
streamStateHandle);
             AsyncDisposalRunnable asyncDisposalRunnable =
                     new AsyncDisposalRunnable(streamStateHandle);
             try {

Reply via email to