zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1247564430


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
                 LOG.trace(
                         "Duplicated registration under key {} with a 
placeholder (normal case)",
                         registrationKey);
-                scheduledStateDeletion = newHandle;
-            } else if (entry.confirmed) {
-                LOG.info(
-                        "Duplicated registration under key {} of a new state: 
{}. "
-                                + "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-                                + "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-                        registrationKey,
-                        newHandle);
-                scheduledStateDeletion = newHandle;
             } else {
-                // Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-                // This might result from (omitted KG range here for 
simplicity):
-                // 1. Flink recovers from a failure using a checkpoint 1
-                // 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-                // 3. JM triggers checkpoint 2
-                // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-                // 5. TM crashes; everything is repeated from (2)
-                // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-                // 7. JM triggers checkpoint 3
-                // 8. TM sends NEW state "xyz-002.sst"
-                // 9. JM discards it as duplicate
-                // 10. checkpoint completes, but a wrong SST file is used
-                // So we use a new entry and discard the old one:
+                // might be a bug expect the StreamStateHandleWrapper used by
+                // ChangelogStateBackendHandleImpl
                 LOG.info(
-                        "Duplicated registration under key {} of a new state: 
{}. "
-                                + "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-                                + "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+                        "the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
                         registrationKey,
                         newHandle);
-                scheduledStateDeletion = entry.stateHandle;
-                entry.stateHandle = newHandle;
+                if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   Test passed after rebase latest master branch :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to