This is an automated email from the ASF dual-hosted git repository. klion26 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9a4250d248e [FLINK-29095][state] Improve logging in SharedStateRegistry 9a4250d248e is described below commit 9a4250d248e93f3e87b211df98ce3d3c66aabca0 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Tue Aug 30 18:04:32 2022 +0200 [FLINK-29095][state] Improve logging in SharedStateRegistry --- .../runtime/state/SharedStateRegistryImpl.java | 122 ++++++++++++--------- 1 file changed, 70 insertions(+), 52 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java index 2e16c528997..3a710428509 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java @@ -79,12 +79,12 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { @Override public StreamStateHandle registerReference( - SharedStateRegistryKey registrationKey, - StreamStateHandle state, - long checkpointID, - boolean preventDiscardingCreatedCheckpoint) { + final SharedStateRegistryKey registrationKey, + final StreamStateHandle newHandle, + final long checkpointID, + final boolean preventDiscardingCreatedCheckpoint) { - checkNotNull(state); + checkNotNull(newHandle, "State handle should not be null."); StreamStateHandle scheduledStateDeletion = null; SharedStateEntry entry; @@ -95,60 +95,78 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { entry = registeredStates.get(registrationKey); if (entry == null) { - // Additional check that should never fail, because only state handles that are not - // placeholders should - // ever be inserted to the registry. checkState( - !isPlaceholder(state), + !isPlaceholder(newHandle), "Attempt to reference unknown state: " + registrationKey); - entry = new SharedStateEntry(state, checkpointID); + LOG.trace( + "Registered new shared state {} under key {}.", newHandle, registrationKey); + entry = new SharedStateEntry(newHandle, checkpointID); registeredStates.put(registrationKey, entry); - LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey); - } else { - // Delete if this is a real duplicate. - // Note that task (backend) is not required to re-upload state - // if the confirmation notification was missing. - // However, it's also not required to use exactly the same handle or placeholder - if (!Objects.equals(state, entry.stateHandle)) { - if (entry.confirmed || isPlaceholder(state)) { - scheduledStateDeletion = state; - } else { - // Old entry is not in a confirmed checkpoint yet, and the new one differs. - // This might result from (omitted KG range here for simplicity): - // 1. Flink recovers from a failure using a checkpoint 1 - // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } - // 3. JM triggers checkpoint 2 - // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" - // 5. TM crashes; everything is repeated from (2) - // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } - // 7. JM triggers checkpoint 3 - // 8. TM sends NEW state "xyz-002.sst" - // 9. JM discards it as duplicate - // 10. checkpoint completes, but a wrong SST file is used - // So we use a new entry and discard the old one: - scheduledStateDeletion = entry.stateHandle; - entry.stateHandle = state; - } - LOG.trace( - "Identified duplicate state registration under key {}. New state {} was determined to " - + "be an unnecessary copy of existing state {} and will be dropped.", - registrationKey, - state, - entry.stateHandle); - } + // no further handling + return entry.stateHandle; + + } else if (entry.stateHandle == newHandle) { + // might be a bug but state backend is not required to use a place-holder + LOG.info( + "Duplicated registration under key {} with the same object: {}", + registrationKey, + newHandle); + } else if (Objects.equals(entry.stateHandle, newHandle)) { + // might be a bug but state backend is not required to use a place-holder + LOG.info( + "Duplicated registration under key {} with the new object: {}.", + registrationKey, + newHandle); + } else if (isPlaceholder(newHandle)) { LOG.trace( - "Updating last checkpoint for {} from {} to {}", + "Duplicated registration under key {} with a placeholder (normal case)", + registrationKey); + scheduledStateDeletion = newHandle; + } else if (entry.confirmed) { + LOG.info( + "Duplicated registration under key {} of a new state: {}. " + + "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " + + "Discarding the new state and keeping the old one which is included into a completed checkpoint", registrationKey, - entry.lastUsedCheckpointID, - checkpointID); - entry.advanceLastUsingCheckpointID(checkpointID); - if (preventDiscardingCreatedCheckpoint) { - entry.preventDiscardingCreatedCheckpoint(); - } + newHandle); + scheduledStateDeletion = newHandle; + } else { + // Old entry is not in a confirmed checkpoint yet, and the new one differs. + // This might result from (omitted KG range here for simplicity): + // 1. Flink recovers from a failure using a checkpoint 1 + // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } + // 3. JM triggers checkpoint 2 + // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" + // 5. TM crashes; everything is repeated from (2) + // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } + // 7. JM triggers checkpoint 3 + // 8. TM sends NEW state "xyz-002.sst" + // 9. JM discards it as duplicate + // 10. checkpoint completes, but a wrong SST file is used + // So we use a new entry and discard the old one: + LOG.info( + "Duplicated registration under key {} of a new state: {}. " + + "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " + + "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", + registrationKey, + newHandle); + scheduledStateDeletion = entry.stateHandle; + entry.stateHandle = newHandle; } - } + + LOG.trace( + "Updating last checkpoint for {} from {} to {}", + registrationKey, + entry.lastUsedCheckpointID, + checkpointID); + entry.advanceLastUsingCheckpointID(checkpointID); + + if (preventDiscardingCreatedCheckpoint) { + entry.preventDiscardingCreatedCheckpoint(); + } + } // end of synchronized (registeredStates) scheduleAsyncDelete(scheduledStateDeletion); return entry.stateHandle; @@ -246,7 +264,7 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { - LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); + LOG.debug("Scheduled delete of state handle {}.", streamStateHandle); AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle); try {