ming li created FLINK-21990:
-------------------------------

             Summary: Double check Task status when perform checkpoint.
                 Key: FLINK-21990
                 URL: https://issues.apache.org/jira/browse/FLINK-21990
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.11.0
            Reporter: ming li


We need to double check Task status when making Checkpoint. Otherwise, after a 
Task failed, the checkpoint may still be made successfully.



For example, I try to throw an exception at 17:10:24.069, get the lock at 
17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 
17:10:24.373.
{code:java}
17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed 
(2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - throw expected 
exception
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - sleep 300 ms
17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.373 [jobmanager-future-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).

{code}
 

 

>From the code point of view, we only judged the state of the task at the 
>beginning, and when the lock was obtained, we directly started to make the 
>Checkpoint.
{code:java}
private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetricsBuilder checkpointMetrics)
        throws Exception {

    if (isRunning) {
        actionExecutor.runThrowing(
                () -> {//do checkpoint});
        return true;
    } else {
        ...
    }
}{code}
However, during the period of acquiring the lock, the task state is likely to 
change. Compared with the Flink 1.9 version code, the 1.9 version judges the 
task status after acquiring the lock.

 
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics,
      boolean advanceToEndOfTime) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   final long checkpointId = checkpointMetaData.getCheckpointId();

   synchronized (lock) {
      if (isRunning) {
         //do checkpoint
      } else {
        ...
      }
 }{code}
 

Therefore, I think we need to double check the task status to avoid the 
situation where the task fails but the Checkpoint can still succeed in the 
process of acquiring the lock.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to