Ryan van Huuksloot created FLINK-39513:
------------------------------------------

             Summary: CheckpointCoordinator.restoreInitialCheckpointIfPresent 
ignores allowNonRestoredState
                 Key: FLINK-39513
                 URL: https://issues.apache.org/jira/browse/FLINK-39513
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 2.2.0
            Reporter: Ryan van Huuksloot


`CheckpointCoordinator.restoreInitialCheckpointIfPresent` hardcodes 
`allowNonRestoredState=false` when calling 
`restoreLatestCheckpointedStateInternal`, so the HA-checkpoint-restore path 
rejects any checkpoint state that cannot be mapped to an operator in the 
current JobGraph regardless of the user's 
`execution.savepoint.ignore-unclaimed-state` (or 
`execution.state-recovery.ignore-unclaimed-state`) setting.

The current code on master:
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1733|https://github.com/apache/flink/blob/b12302d87e53df5bd191918faad4dda80fbcba85/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1733]

The method was introduced in [FLINK-20222] under the documented assumption
`// JobManager failover means JobGraphs match exactly.` That assumption held 
for pure JM failover, where the JobGraph is guaranteed identical across the 
restart. It no longer holds for the broader set of scenarios that invoke this 
method today — most notably JM startups into a cluster with stale HA checkpoint 
metadata from a modified previous JobGraph (e.g., Flink Kubernetes Operator 
`last-state` upgrades after a non-graceful shutdown, or crash-retries 
mid-upgrade).

The savepoint-restore path (`CheckpointCoordinator.restoreSavepoint`) in the 
same class correctly reads `allowNonRestoredState` from 
`SavepointRestoreSettings` and honors it. The checkpoint-restore path should 
follow the same pattern.

Observed failure in the JobManager:

    IllegalStateException: There is no operator for the state <operator-id-hash>
        at StateAssignmentOperation.checkStateMappingCompleteness
        at CheckpointCoordinator.restoreLatestCheckpointedStateInternal
        at CheckpointCoordinator.restoreInitialCheckpointIfPresent
        at DefaultExecutionGraphFactory.createAndRestoreExecutionGraph

followed by a JM crashloop.

*Proposed fix*: parameterize `allowNonRestoredState` on 
`restoreInitialCheckpointIfPresent` (the same shape as 
`restoreLatestCheckpointedStateToAll`, which already takes the flag as a 
parameter) and have the single production caller, 
`DefaultExecutionGraphFactory.createAndRestoreExecutionGraph`, read 
`SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE` from the 
`Configuration` and pass it in.

*Related Ticket*: [FLINK-30638] — same user-facing symptom filed against the 
Kubernetes Operator component. The runtime root cause was correctly identified 
in the comments on that ticket by but the ticket was left with a workaround 
(switching to savepoint upgrade mode) rather than a fix. This ticket tracks the 
underlying runtime bug.

Bug report courtesy of: Omid Hemmati



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to