[jira] [Comment Edited] (FLINK-17464) Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe cluster to recover service

2020-06-03 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124924#comment-17124924
 ] 

Stephan Ewen edited comment on FLINK-17464 at 6/3/20, 12:37 PM:


This is a good discussion to have, there are a few thoughts that went into how 
it currently works, some known shortcomings, and different options for the 
future:

*Critical Failures on the Master*

As Till described, it can be helpful to exit the JobManager process if it 
encountered a bad problem. We have seen various cases where the problem was an 
issues with the node's connectivity, and once another master got elected 
leader, the system recovered smoothly.

My feeling is we should keep this, but be careful when we treat something as a 
fatal error that causes such a process kill.

*initializeOnMaster() behavior*

Exceptions from there are user-level exceptions, even though they could be 
related to connectivity issues. It makes sense to not always fail the cluster 
because of that, or at least make it configurable.

To add some more information: This is part of the DataSet API's functionality 
and is gradually being replaced.
 - For pure batch analytics use cases, the Table API is getting enhanced to 
make this convenient and subsume the DataSet API.
 - For re-processing of streams, we are looking to extend the DataStream API to 
be convenient on offline data.
 - The sources have already a new unified batch/streaming interface in 1.11, we 
will most likely propose something similar for the sinks in 1.12

*Isolation of Jobs*

If you want strong isolation of jobs versus each other, running them not on a 
shared session, but as separate applications is a good was to go about this.
 See here for details: 
[https://github.com/apache/flink/blob/master/docs/concepts/flink-architecture.md]
 (docs build bot is currently stuck, hence the markdown link instead of the 
html page)

That way you never have jobs impacting one another.

Depending on how you want to deploy this, if you need an endpoint to be up and 
receive jobs, but still want every job to have its own master process, this can 
also be build with not too much effort: You can have a shared dispatcher (or 
set of dispatchers, for high-availability) and let them spawn the jobs in 
job/application mode so that the JobManagers are separate isolated processes.


was (Author: stephanewen):
This is a good discussion to have, there are a few thoughts that went into how 
it currently works, some known shortcomings, and different options for the 
future:

*Critical Failures on the Master*

As Till described, it can be helpful to exit the JobManager process if it 
encountered a bad problem. We have seen various cases where the problem was an 
issues with the node's connectivity, and once another master got elected 
leader, the system recovered smoothly.

My feeling is we should keep this, but be careful when we treat something as a 
fatal error that causes such a process kill.

*initializeOnMaster() behavior*

Exceptions from there are user-level exceptions, even though they could be 
related to connectivity issues. It makes sense to not fail the cluster because 
of that.

To add some more information: This is part of the DataSet API's functionality 
and is gradually being replaced.
  - For pure batch analytics use cases, the Table API is getting enhanced to 
make this convenient and subsume the DataSet API.
  - For re-processing of streams, we are looking to extend the DataStream API 
to be convenient on offline data.
  - The sources have already a new unified batch/streaming interface in 1.11, 
we will most likely propose something similar for the sinks in 1.12

*Isolation of Jobs*

If you want strong isolation of jobs versus each other, running them not on a 
shared session, but as separate applications is a good was to go about this.
See here for details: 
https://github.com/apache/flink/blob/master/docs/concepts/flink-architecture.md
(docs build bot is currently stuck, hence the markdown link instead of the html 
page)

That way you never have jobs impacting one another.

Depending on how you want to deploy this, if you need an endpoint to be up and 
receive jobs, but still want every job to have its own master process, this can 
also be build with not too much effort: You can have a shared dispatcher (or 
set of dispatchers, for high-availability) and let them spawn the jobs in 
job/application mode so that the JobManagers are separate isolated processes.



> Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe 
> cluster to recover service
> ---
>
> Key: FLINK-17464
> URL: https://issues.apache.org/jira/browse/FLINK-17464
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / 

[jira] [Comment Edited] (FLINK-17464) Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe cluster to recover service

2020-05-04 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098982#comment-17098982
 ] 

Till Rohrmann edited comment on FLINK-17464 at 5/4/20, 2:35 PM:


Thanks for reporting this issue [~johnlon]. Your description of Flink's 
behaviour is correct. The reasoning behind this behaviour is that Flink must 
not lose jobs due to transient exceptions. When recovering jobs, Flink needs to 
interact with external systems and it can happen that certain operations fail. 
If Flink encounters a problem, it takes the conservative approach to fail the 
complete process so that a new process can take over and try again to recover 
the persisted jobs. This will work if the encounter problem eventually 
disappears.

Unfortunately, it won't work when the problem repeats deterministically as in 
your case or if someone would meddle around with some internal state of Flink 
(e.g. removing persisted blobs belonging to a submitted job). This problem even 
more problematic in case of a session cluster where other jobs are affected by 
one faulty job.

Ideally, one would like to distinguish between transient exceptions and 
deterministic ones. If this were possible, then one could retry for the former 
ones and fail the jobs in case one encounters the latter ones. Since this is in 
general a hard problem for which I don't know a good solution, I guess it is a 
good proposal to make the failure behaviour in case of recoveries configurable. 
As you have suggested such a sandbox mode could simply transition the job into 
a failed state instead of failing the whole process. 

The drawback of such a mode would be that you might fail some jobs which might 
be recoverable if retried a bit more.


was (Author: till.rohrmann):
Thanks for reporting this issue [~johnlon]. Your description of Flink's 
behaviour is correct. The reasoning behind this behaviour is that Flink must 
not lose jobs due to ephemeral exceptions. When recovering jobs, Flink needs to 
interact with external systems and it can happen that certain operations fail. 
If Flink encounters a problem, it takes the conservative approach to fail the 
complete process so that a new process can take over and try again to recover 
the persisted jobs. This will work if the encounter problem eventually 
disappears.

Unfortunately, it won't work when the problem repeats deterministically as in 
your case or if someone would meddle around with some internal state of Flink 
(e.g. removing persisted blobs belonging to a submitted job). This problem even 
more problematic in case of a session cluster where other jobs are affected by 
one faulty job.

Ideally, one would like to distinguish between ephemeral exceptions and 
deterministic ones. If this were possible, then one could retry for the former 
ones and fail the jobs in case one encounters the latter ones. Since this is in 
general a hard problem for which I don't know a good solution, I guess it is a 
good proposal to make the failure behaviour in case of recoveries configurable. 
As you have suggested such a sandbox mode could simply transition the job into 
a failed state instead of failing the whole process. 

The drawback of such a mode would be that you might fail some jobs which might 
be recoverable if retried a bit more.

> Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe 
> cluster to recover service
> ---
>
> Key: FLINK-17464
> URL: https://issues.apache.org/jira/browse/FLINK-17464
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: John Lonergan
>Priority: Critical
>
> When recovering job graphs after a failover of the JobManager, or after a 
> restart of the cluster, the HA Cluster can get into a state where it cannot 
> be restarted and the only resoluton we have identified is to destroy the 
> Zookkeeper job graph store.
> This happens when any job graph that is being recovered throws an exception 
> during recovery on the master. 
> Whilst we encountered this issues on a sink that extends "InitialiseOnMaster" 
> we believe the vulnerability is generic in nature and the unrecolverable 
> problems encountered will occur if the application code throws any exception 
> for any reason during recovery on the main line. 
> These application exceptions propagate up to the JobManager ClusterEntryPoint 
> class at which point the JM leader does a system.exit. If there are remaining 
> JobManagers then they will also follow leader election and also encounter the 
> same sequence of events. Ultimately all JM's exit and then all TM's fail 
> also. 
> The entire cluster is destroyed.
> Because these events happen during job