ricky2129 opened a new issue, #10675:
URL: https://github.com/apache/seatunnel/issues/10675

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   ### What we observed
                         
   A CDC job was running normally with checkpoints completing at regular 
intervals. At some point during a rolling restart, checkpoint completion logs 
stopped entirely. From that point onward, the coordinator logs only showed 
`MessageDelayedEvent` entries — records being streamed from the source — but no 
`pending checkpoint notify finished` or `start notify checkpoint completed` 
entries appeared at all.
                                                                                
                                                                                
           
   Despite this, the job showed as **RUNNING** in both the REST API 
(`/running-jobs`) and the UI. Data was continuing to flow to the sink. There 
was no error, no indication anything was wrong.      
   
   This continued for several days across multiple coordinator restarts. Each 
time a new master came up, the job appeared in the running jobs list with 
RUNNING status.No checkpoint was ever written to S3 after the initial failure 
point.
   
     During the next rolling restart:
     - The new master called `restoreAllRunningJobFromMasterNodeSwitch()`
     - Found the jobId in `runningJobInfoIMap`
     - Read the last checkpoint from S3 — which pointed to a binlog file from 
the original failure, now purged from MySQL
     - Job failed immediately with:
   
     IllegalStateException: The connector is trying to read binlog starting at
     file=mysql-bin-changelog.XXXXXX, pos=XXXXXXX
     but this is no longer available on the server.
     Reconfigure the connector to use a snapshot when needed.
   
   ### Anticipated root cause
   
   When a job fails and the coordinator starts `cleanJob()`, the sequence is:
   
     1. Coordinator writes terminal state (`FAILED`) to `runningJobStateIMap`
     2. `cleanJob()` is triggered → eventually calls `removeJobIMap()` which 
removes the jobId from `runningJobInfoIMap`
   
     If the coordinator pod is killed (OOM, rolling restart, eviction) 
**between step 1 and step 2**, the jobId remains as a zombie in 
`runningJobInfoIMap` with `FAILED`
     state in `runningJobStateIMap`.
   
   On the next master switch, `restoreJobFromMasterActiveSwitch()` does:
   
     ```java
     Object jobState = runningJobStateIMap.get(jobId);
     if (jobState == null) {
         runningJobInfoIMap.remove(jobId);
         return;
     }
     // proceeds to restore — no terminal state check
   
   Since FAILED != null, it proceeds to restore the job. A new JobMaster is 
created, the job appears as RUNNING in the API and UI, but the underlying 
checkpoint coordinator may immediately fail again (same root cause), leaving 
the job in a state where:
     - Worker tasks run without an active checkpoint coordinator
     - No checkpoint barriers are injected
     - No checkpoints are written to S3
     - The job shows RUNNING in the UI indefinitely
   
   The last written checkpoint on S3 remains frozen at the pre-failure 
position. On any subsequent master switch or rollout, this stale checkpoint is 
used for restoration, leading to the binlog-purged error above.
   
   Fix
   
     ```
   In CoordinatorService.restoreJobFromMasterActiveSwitch(), add a terminal 
state check after the existing null check:
   
     if (jobState == null) {
         runningJobInfoIMap.remove(jobId);
         return;                                                                
                                                                                
           
     }
                                                                                
                                                                                
           
     // NEW: do not restore jobs already in terminal state                      
                                                                                
         
     // coordinator was killed before cleanJob() could remove them from IMap
     if (jobState instanceof JobStatus && ((JobStatus) jobState).isEndState()) {
         logger.warning(String.format(
             "Job %s is in terminal state %s, skipping restore and cleaning up 
IMap",
             jobId, jobState));
         runningJobInfoIMap.remove(jobId);
         return;
     }
   
   ```
   This covers FAILED, CANCELED, FINISHED, SAVEPOINT_DONE, UNKNOWABLE.
   
   Additional observation — missing UI visibility
   
   There is currently no way in the SeaTunnel UI or REST API to see when the 
last checkpoint completed for a running job (e.g. "last checkpointed 30s ago"). 
This makes it impossible to detect the silent no-checkpoint scenario described 
above — the job appears healthy with RUNNING status while checkpointing has 
completely stopped.
   
   A lastCheckpointTime or secondsSinceLastCheckpoint field exposed via 
/running-jobs or the UI would allow operators to immediately isolate jobs where 
checkpointing has silently stopped, rather than discovering the problem only at 
the next rollout when the stale checkpoint causes a restoration failure.
   
   ### SeaTunnel Version
   
   **SeaTunnel version:** 2.3.12 (Zeta engine)
   **Environment:** Kubernetes, S3 checkpoint `storage```
   
   ### SeaTunnel Config
   
   ```conf
   can provide if asked
   ```
   
   ### Running Command
   
   ```shell
   kubernetes deployment
   ```
   
   ### Error Exception
   
   ```log
   mentioned the required in the bug details
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to