Ufuk Celebi created FLINK-10439:
-----------------------------------

             Summary: Race condition during job suspension
                 Key: FLINK-10439
                 URL: https://issues.apache.org/jira/browse/FLINK-10439
             Project: Flink
          Issue Type: Bug
          Components: Distributed Coordination
    Affects Versions: 1.7.0
            Reporter: Ufuk Celebi
         Attachments: master-logs.log, race-job-suspension.png, worker-logs.log

When a {{JobMaster}} in an HA setup looses leadership, it suspends the 
execution of its job via {{JobMaster.suspend(Exception, Time)}}. This operation 
involves transitioning to the {{SUSPENDING}} job state and cancelling all 
running tasks. In some executions it may happen that the job does *not* reach 
the terminal {{SUSPENDED}} job state.

This is due to the fact that suspending the job stops related RPC endpoints 
such as the {{JobMaster}} or {{SlotPool}} (in {{JobMaster.suspend(Exception, 
Time)}} and {{JobMaster.suspendExecution( Exception)}}) immediately after 
suspending. Whenever this happens *before* the {{TaskExecutor}} instances have 
cancelled or failed the respective tasks, the job does not transition to 
{{SUSPENDED}}, because the {{ExecutionGraph}} does not receive all 
{{Execution}} state transitions.

In practice, this should not happen frequently due the fact that {{JobMaster}} 
and {{TaskExecutor}} instances are notified about the loss of leadership (or 
loss of ZooKeeper connection or similar events) around the same time. In this 
scenario, the {{TaskExecutor}} instances proactively fail the executing tasks 
and notify the {{JobMaster}}. All in all, the impact of this is limited by the 
fact that a new {{JobMaster}} leader will eventually recover the job.

*Steps to reproduce*:
- Start ZooKeeper
- Start a Flink cluster in HA mode and submit job
- Stop ZooKeeper

In some executions you will find that the job does not reach the terminal state 
{{SUSPENDED}}. Furthermore, you may see log messages similar to the following 
in this case:
{code}
The rpc endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not 
been started yet. Discarding message 
org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing is 
started.
{code}

I've attached a logs of a local run that does not transition to {{SUSPENDED}} 
and a sequence diagram of what I think may be a problematic timing.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to