[hotfix] [dist. coordination] Add safety check for execution graph state 
transitions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60895a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60895a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60895a3c

Branch: refs/heads/table-retraction
Commit: 60895a3ccd83609088be6ecef3445f7c78c9955a
Parents: 874d956
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 22 19:53:50 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  8 +++++++
 .../flink/runtime/jobgraph/JobStatus.java       | 22 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e911f49..1c7b1c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1105,6 +1105,14 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
+               // consistency check
+               if (current.isTerminalState()) {
+                       String message = "Job is trying to leave terminal state 
" + current;
+                       LOG.error(message);
+                       throw new IllegalStateException(message);
+               }
+
+               // now do the actual state transition
                if (STATE_UPDATER.compareAndSet(this, current, newState)) {
                        LOG.info("Job {} ({}) switched from state {} to {}.", 
getJobName(), getJobID(), current, newState, error);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 6a0ac97..4ef86bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -69,11 +69,31 @@ public enum JobStatus {
        JobStatus(TerminalState terminalState) {
                this.terminalState = terminalState;
        }
-       
+
+       /**
+        * Checks whether this state is <i>globally terminal</i>. A globally 
terminal job
+        * is complete and cannot fail any more and will not be restarted or 
recovered by another
+        * standby master node.
+        * 
+        * <p>When a globally terminal state has been reached, all recovery 
data for the job is
+        * dropped from the high-availability services.
+        * 
+        * @return True, if this job status is globally terminal, false 
otherwise.
+        */
        public boolean isGloballyTerminalState() {
                return terminalState == TerminalState.GLOBALLY;
        }
 
+       /**
+        * Checks whether this state is <i>locally terminal</i>. Locally 
terminal refers to the
+        * state of a job's execution graph within an executing JobManager. If 
the execution graph
+        * is locally terminal, the JobManager will not continue executing or 
recovering the job. 
+        *
+        * <p>The only state that is locally terminal, but not globally 
terminal is {@link #SUSPENDED},
+        * which is typically entered when the executing JobManager looses its 
leader status.
+        * 
+        * @return True, if this job status is terminal, false otherwise.
+        */
        public boolean isTerminalState() {
                return terminalState != TerminalState.NON_TERMINAL;
        }

Reply via email to