[hotfix] [dist. coordination] Add safety check for execution graph state transitions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60895a3c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60895a3c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60895a3c Branch: refs/heads/table-retraction Commit: 60895a3ccd83609088be6ecef3445f7c78c9955a Parents: 874d956 Author: Stephan Ewen <se...@apache.org> Authored: Wed Mar 22 19:53:50 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Mar 29 17:11:49 2017 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 8 +++++++ .../flink/runtime/jobgraph/JobStatus.java | 22 +++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e911f49..1c7b1c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1105,6 +1105,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { + // consistency check + if (current.isTerminalState()) { + String message = "Job is trying to leave terminal state " + current; + LOG.error(message); + throw new IllegalStateException(message); + } + + // now do the actual state transition if (STATE_UPDATER.compareAndSet(this, current, newState)) { LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), current, newState, error); http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 6a0ac97..4ef86bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -69,11 +69,31 @@ public enum JobStatus { JobStatus(TerminalState terminalState) { this.terminalState = terminalState; } - + + /** + * Checks whether this state is <i>globally terminal</i>. A globally terminal job + * is complete and cannot fail any more and will not be restarted or recovered by another + * standby master node. + * + * <p>When a globally terminal state has been reached, all recovery data for the job is + * dropped from the high-availability services. + * + * @return True, if this job status is globally terminal, false otherwise. + */ public boolean isGloballyTerminalState() { return terminalState == TerminalState.GLOBALLY; } + /** + * Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the + * state of a job's execution graph within an executing JobManager. If the execution graph + * is locally terminal, the JobManager will not continue executing or recovering the job. + * + * <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED}, + * which is typically entered when the executing JobManager looses its leader status. + * + * @return True, if this job status is terminal, false otherwise. + */ public boolean isTerminalState() { return terminalState != TerminalState.NON_TERMINAL; }