John Lonergan created FLINK-17464:
-------------------------------------

             Summary: Stanalone HA Cluster crash with non-recoverable cluster 
state - need to wipe cluster to recover service
                 Key: FLINK-17464
                 URL: https://issues.apache.org/jira/browse/FLINK-17464
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.10.0
            Reporter: John Lonergan


When recovering job graphs after a failover of the JobManager, or after a 
restart of the cluster, the HA Cluster can get into a state where it cannot be 
restarted and the only resoluton we have identified is to destroy the 
Zookkeeper job graph store.

This happens when any job graph that is being recovered throws an exception 
during recovery on the master. 

Whilst we encountered this issues on a sink that extends "InitialiseOnMaster" 
we believe the vulnerability is generic in nature and the unrecolverable 
problems encountered will occur if the application code throws any exception 
for any reason during recovery on the main line. 

These application exceptions propagate up to the JobManager ClusterEntryPoint 
class at which point the JM leader does a system.exit. If there are remaining 
JobManagers then they will also follow leader election and also encounter the 
same sequence of events. Ultimately all JM's exit and then all TM's fail also. 

The entire cluster is destroyed.

Because these events happen during job graph recovery then merely attempt a 
restart of the cluster will fail leaving the only option as destroying the job 
graph state. 

If one is running a shared cluster with many jobs then this is effectively a 
DOS and results in prolonged down time as code or data changes are necessary to 
work around the issue.

--

Of course if the same issue were to occur during job submission using the CLI 
then we do not see the cluster crashing or the cluster being corrupted.

Our feeling is that the job graph recovery process ought to behave in a similar 
fashion to the job submission processes.

If a job submission fails then the job is recorded as failed and there is no 
further impact on the cluster. However, if job recovery fails then the entire 
cluster is taken down, and may as we have seen, become inoperable.

We feel that a failure to restore a single job graph ought merely to result in 
the job being recorded as failed. It should not result in a cluster-wide impact.

We do not understand the logic of the design in this space. However, if the 
existing logic was for the benefit of single job clusters then this is a poor 
result for multi job clusters. In which case we ought to be able to configure a 
cluster for "multi-job mode" so that job graph recovery is "sandboxed"  and 
doesn't take out the entire cluster.


---

It is easy to demonstrate the problem using the built in Flink streaming Word 
Count example.
In order for this to work you configure the job to write a single output file 
and also write this to HDFS not to a local disk. 
You will note that the class FileOutputFormat extends InitializeOnMaster and 
the initializeGlobal() function executes only when the file is on HDFS, not on 
local disk.
When this functon runs it will generate an exception if the output already 
exists.
Therefore to demonstrate the issues do the following:

- configure the job to write a single file to HDFS
- configure the job to to read a large file so that the job takes some time to 
execute and we have time to complete the next few steps bnefore the job 
finishes.
- run the job on a HA cluster with two JM nodes
- wait for the job to start and the output file to be created
- kill the leader JM before the job has finished 
- observe JM failover occuring ... 
- recovery during failover will NOT suceed because the recovery of the Word 
Count job will fail due to the presence of the output file
- observe all JM's and TM's ultimately terminating

Once the cluster has outright failed then try and restart it.

During restart the cluster will detect the presence of job graphs in Zk and 
attempt to restore them. This however, is doomed due to the same vulnerability 
that causes the global outage above.

-------

For operability Flink needs a mod such that the job graph recovery process is 
entirely sandboxed and failure of a given job during job graph recovery ought 
to result merely in a failed job and not a failed cluster.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to