ricky2129 commented on issue #10675:
URL: https://github.com/apache/seatunnel/issues/10675#issuecomment-4165327043

   @davidzollo @dybyte 
   ## Follow-up: orphan worker running without checkpoints after coordinator 
death                                                                           
            
                                                                                
        
     While tracing the full failure chain for this incident, we found a second 
related gap that explains                                                       
            
     why jobs appeared RUNNING in the UI while producing zero checkpoints for 
days.
                                                                                
                                                                                
           
     ---                                                                        
        
                                            
     ### What happens
   
     When the coordinator pod is killed mid-cleanup (the race condition 
described in this issue), the
     worker's Debezium reader never receives `CancelTaskOperation`. That 
operation is sent from the
     coordinator thread via `PhysicalVertex.noticeTaskExecutionServiceCancel()` 
— when the pod dies,
     the RPC is never delivered. The worker runs indefinitely as an orphan: 
data flows to the sink, but
     with no checkpoint coordinator alive, no barriers are injected and no S3 
writes happen. The
     checkpoint position is frozen at the moment the coordinator died.
   
     In our incident a CDC job ran as an orphan for ~14 days — data continued 
flowing to the sink
     the entire time, but zero checkpoint files were written to S3.
   
     ---
   
     ### Root cause in the code
   
     `SeaTunnelServer.memberRemoved()` only acts on coordinator nodes:
   
     ```java
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
         try {
             if (isMasterNode()) {
                 this.getCoordinatorService().memberRemoved(event);
             }
             // worker nodes do nothing here
         } catch (SeaTunnelEngineException e) {
             LOGGER.severe("Error when handle member removed event", e);
         }
     }
   
     TaskExecutionService has no membership listener. Workers have zero 
reaction to coordinator
     departure.
   
     ---
     Proposed fix direction
   
     The cleanest approach avoids guessing "was the departed member the 
master?" by tracking which
     coordinator deployed each task group:
   
     1. In DeployTaskOperation.runInternal(), pass getCallerAddress() 
(confirmed available on
     Hazelcast 5.1 Operation base class) into deployTask()
     2. Store the deploying coordinator's address in TaskGroupContext
     3. Add cancelTaskGroupsDeployedBy(Address address) to TaskExecutionService
     4. In SeaTunnelServer.memberRemoved(), add a worker branch:
   
     } else {
         // Worker: cancel task groups deployed by the departed coordinator
         if (taskExecutionService != null) {
             
taskExecutionService.cancelTaskGroupsDeployedBy(event.getMember().getAddress());
         }
     }
   
     ▎ Note on null guard: MASTER-only nodes do not call startWorker() so 
taskExecutionService
     ▎ is null. Standby master nodes have isMasterNode() == false and would 
otherwise NPE without
     ▎ this check.
   
     ▎ Timing verified: From MembershipManager.java source — 
tryStartMastershipClaim() calls
     ▎ setMasterAddress() before sendMembershipEventNotifications() is 
dispatched. By the time
     ▎ memberRemoved fires, getMasterAddress() already returns the new master.
   
     ---
     Interaction with this issue's zombie fix
   
     The orphan fix is only fully effective when combined with the terminal 
state check fix proposed
     here. After the worker cancels its task groups, notifyTaskStatusToMaster 
sends CANCELED to
     the new master. If the new master hasn't restored the job yet, it throws 
JobNotFoundException
     and the notification is silently dropped (the retry loop exits on 
JobNotFoundException). Without
     the terminal state check, the zombie entry remains in runningJobInfoIMap 
and gets restored on
     the next master switch. With both fixes together, the zombie is 
permanently cleaned from the IMap
     and the worker's task group is stopped cleanly.
   
     ---
     Would love to hear your thoughts on the fix direction — particularly 
whether tracking the deployer
     address in TaskGroupContext is the right pattern or if there's a cleaner 
approach in the existing
     architecture.
     ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to