ricky2129 commented on issue #10675:
URL: https://github.com/apache/seatunnel/issues/10675#issuecomment-4165327043
@davidzollo @dybyte
## Follow-up: orphan worker running without checkpoints after coordinator
death
While tracing the full failure chain for this incident, we found a second
related gap that explains
why jobs appeared RUNNING in the UI while producing zero checkpoints for
days.
---
### What happens
When the coordinator pod is killed mid-cleanup (the race condition
described in this issue), the
worker's Debezium reader never receives `CancelTaskOperation`. That
operation is sent from the
coordinator thread via `PhysicalVertex.noticeTaskExecutionServiceCancel()`
— when the pod dies,
the RPC is never delivered. The worker runs indefinitely as an orphan:
data flows to the sink, but
with no checkpoint coordinator alive, no barriers are injected and no S3
writes happen. The
checkpoint position is frozen at the moment the coordinator died.
In our incident a CDC job ran as an orphan for ~14 days — data continued
flowing to the sink
the entire time, but zero checkpoint files were written to S3.
---
### Root cause in the code
`SeaTunnelServer.memberRemoved()` only acts on coordinator nodes:
```java
@Override
public void memberRemoved(MembershipServiceEvent event) {
try {
if (isMasterNode()) {
this.getCoordinatorService().memberRemoved(event);
}
// worker nodes do nothing here
} catch (SeaTunnelEngineException e) {
LOGGER.severe("Error when handle member removed event", e);
}
}
TaskExecutionService has no membership listener. Workers have zero
reaction to coordinator
departure.
---
Proposed fix direction
The cleanest approach avoids guessing "was the departed member the
master?" by tracking which
coordinator deployed each task group:
1. In DeployTaskOperation.runInternal(), pass getCallerAddress()
(confirmed available on
Hazelcast 5.1 Operation base class) into deployTask()
2. Store the deploying coordinator's address in TaskGroupContext
3. Add cancelTaskGroupsDeployedBy(Address address) to TaskExecutionService
4. In SeaTunnelServer.memberRemoved(), add a worker branch:
} else {
// Worker: cancel task groups deployed by the departed coordinator
if (taskExecutionService != null) {
taskExecutionService.cancelTaskGroupsDeployedBy(event.getMember().getAddress());
}
}
▎ Note on null guard: MASTER-only nodes do not call startWorker() so
taskExecutionService
▎ is null. Standby master nodes have isMasterNode() == false and would
otherwise NPE without
▎ this check.
▎ Timing verified: From MembershipManager.java source —
tryStartMastershipClaim() calls
▎ setMasterAddress() before sendMembershipEventNotifications() is
dispatched. By the time
▎ memberRemoved fires, getMasterAddress() already returns the new master.
---
Interaction with this issue's zombie fix
The orphan fix is only fully effective when combined with the terminal
state check fix proposed
here. After the worker cancels its task groups, notifyTaskStatusToMaster
sends CANCELED to
the new master. If the new master hasn't restored the job yet, it throws
JobNotFoundException
and the notification is silently dropped (the retry loop exits on
JobNotFoundException). Without
the terminal state check, the zombie entry remains in runningJobInfoIMap
and gets restored on
the next master switch. With both fixes together, the zombie is
permanently cleaned from the IMap
and the worker's task group is stopped cleanly.
---
Would love to hear your thoughts on the fix direction — particularly
whether tracking the deployer
address in TaskGroupContext is the right pattern or if there's a cleaner
approach in the existing
architecture.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]