[FLINK-6340] [flip-1] Add a termination future to the Execution

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0061272
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0061272
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0061272

Branch: refs/heads/master
Commit: e00612726991a05058168e5a4fbfb53853e645a5
Parents: aadfe45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:49:54 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  26 ++-
 .../runtime/executiongraph/ExecutionGraph.java  | 182 ++++++++++------
 .../executiongraph/ExecutionJobVertex.java      | 116 +++-------
 .../runtime/executiongraph/ExecutionVertex.java |  56 +++--
 .../ExecutionGraphRestartTest.java              |  78 ++++---
 .../executiongraph/ExecutionGraphTestUtils.java |   8 +-
 .../ExecutionStateProgressTest.java             |  94 --------
 .../TerminalStateDeadlockTest.java              | 216 -------------------
 8 files changed, 250 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 729e161..2680849 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private final 
ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> 
partialInputChannelDeploymentDescriptors;
 
+       /** A future that completes once the Execution reaches a terminal 
ExecutionState */
+       private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+
        private volatile ExecutionState state = CREATED;
 
        private volatile SimpleSlot assignedResource;     // once assigned, 
never changes until the execution is archived
@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                markTimestamp(ExecutionState.CREATED, startTimestamp);
 
                this.partialInputChannelDeploymentDescriptors = new 
ConcurrentLinkedQueue<>();
+               this.terminationFuture = new FlinkCompletableFuture<>();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                this.taskState = checkpointStateHandles;
        }
 
+       /**
+        * Gets a future that completes once the task execution reaches a 
terminal state.
+        * The future will be completed with specific state that the execution 
reached.
+        *
+        * @return A future for the execution's termination
+        */
+       public Future<ExecutionState> getTerminationFuture() {
+               return terminationFuture;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                }
                                        }
                                        finally {
-                                               vertex.executionCanceled();
+                                               vertex.executionCanceled(this);
+                                               
terminationFuture.complete(CANCELED);
                                        }
                                        return;
                                }
@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                        }
                                        finally {
-                                               vertex.executionFinished();
+                                               vertex.executionFinished(this);
+                                               
terminationFuture.complete(FINISHED);
                                        }
                                        return;
                                }
@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                        }
                                        finally {
-                                               vertex.executionCanceled();
+                                               vertex.executionCanceled(this);
+                                               
terminationFuture.complete(CANCELED);
                                        }
                                        return;
                                }
@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        
vertex.getExecutionGraph().deregisterExecution(this);
                                }
                                finally {
-                                       vertex.executionFailed(t);
+                                       vertex.executionFailed(this, t);
+                                       terminationFuture.complete(FAILED);
                                }
 
                                if (!isCallback && (current == RUNNING || 
current == DEPLOYING)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 23ed99d..fff1ea2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -188,6 +190,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /** Registered KvState instances reported by the TaskManagers. */
        private final KvStateLocationRegistry kvStateLocationRegistry;
 
+       private int numVerticesTotal;
+
        // ------ Configuration of the Execution -------
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
@@ -203,6 +207,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
+       private final AtomicInteger verticesFinished;
+
        /** Current status of the job execution */
        private volatile JobStatus state = JobStatus.CREATED;
 
@@ -210,9 +216,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * that was not recoverable and triggered job failure */
        private volatile Throwable failureCause;
 
-       /** The number of job vertices that have reached a terminal state */
-       private volatile int numFinishedJobVertices;
-
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
@@ -317,6 +320,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                this.restartStrategy = restartStrategy;
                this.kvStateLocationRegistry = new 
KvStateLocationRegistry(jobId, getAllVertices());
+
+               this.verticesFinished = new AtomicInteger();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -454,7 +459,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        return jv.getTaskVertices();
                }
                else {
-                       ArrayList<ExecutionVertex> all = new 
ArrayList<ExecutionVertex>();
+                       ArrayList<ExecutionVertex> all = new ArrayList<>();
                        for (ExecutionJobVertex jv : jobVertices) {
                                if (jv.getGraph() != this) {
                                        throw new IllegalArgumentException("Can 
only use ExecutionJobVertices of this ExecutionGraph");
@@ -586,6 +591,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                };
        }
 
+       public int getTotalNumberOfVertices() {
+               return numVerticesTotal;
+       }
+
        public Map<IntermediateDataSetID, IntermediateResult> 
getAllIntermediateResults() {
                return Collections.unmodifiableMap(this.intermediateResults);
        }
@@ -620,7 +629,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         */
        public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
 
-               Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<String, Accumulator<?, ?>>();
+               Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<>();
 
                for (ExecutionVertex vertex : getAllExecutionVertices()) {
                        Map<String, Accumulator<?, ?>> next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -657,7 +666,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
 
-               Map<String, SerializedValue<Object>> result = new 
HashMap<String, SerializedValue<Object>>();
+               Map<String, SerializedValue<Object>> result = new HashMap<>();
                for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
                        result.put(entry.getKey(), new 
SerializedValue<Object>(entry.getValue().getLocalValue()));
                }
@@ -713,6 +722,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        }
 
                        this.verticesInCreationOrder.add(ejv);
+                       this.numVerticesTotal += ejv.getParallelism();
                }
        }
 
@@ -878,9 +888,23 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                        if (current == JobStatus.RUNNING || current == 
JobStatus.CREATED) {
                                if (transitionState(current, 
JobStatus.CANCELLING)) {
+
+                                       final ArrayList<Future<?>> futures = 
new ArrayList<>(verticesInCreationOrder.size());
+
+                                       // cancel all tasks (that still need 
cancelling)
                                        for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               ejv.cancel();
+                                               
futures.add(ejv.cancelWithFuture());
                                        }
+
+                                       // we build a future that is complete 
once all vertices have reached a terminal state
+                                       final ConjunctFuture allTerminal = 
FutureUtils.combineAll(futures);
+                                       allTerminal.thenAccept(new 
AcceptFunction<Void>() {
+                                               @Override
+                                               public void accept(Void value) {
+                                                       
allVerticesInTerminalState();
+                                               }
+                                       });
+
                                        return;
                                }
                        }
@@ -968,26 +992,33 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                current == JobStatus.SUSPENDED ||
                                current.isGloballyTerminalState()) {
                                return;
-                       } else if (current == JobStatus.RESTARTING) {
+                       }
+                       else if (current == JobStatus.RESTARTING) {
                                this.failureCause = t;
 
                                if (tryRestartOrFail()) {
                                        return;
                                }
-                               // concurrent job status change, let's check 
again
-                       } else if (transitionState(current, JobStatus.FAILING, 
t)) {
+                       }
+                       else if (transitionState(current, JobStatus.FAILING, 
t)) {
                                this.failureCause = t;
 
-                               if (!verticesInCreationOrder.isEmpty()) {
-                                       // cancel all. what is failed will not 
cancel but stay failed
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               ejv.cancel();
-                                       }
-                               } else {
-                                       // set the state of the job to failed
-                                       transitionState(JobStatus.FAILING, 
JobStatus.FAILED, t);
+                               // we build a future that is complete once all 
vertices have reached a terminal state
+                               final ArrayList<Future<?>> futures = new 
ArrayList<>(verticesInCreationOrder.size());
+
+                               // cancel all tasks (that still need cancelling)
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       futures.add(ejv.cancelWithFuture());
                                }
 
+                               final ConjunctFuture allTerminal = 
FutureUtils.combineAll(futures);
+                               allTerminal.thenAccept(new 
AcceptFunction<Void>() {
+                                       @Override
+                                       public void accept(Void value) {
+                                               allVerticesInTerminalState();
+                                       }
+                               });
+
                                return;
                        }
 
@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                                stateTimestamps[i] = 0;
                                        }
                                }
-                               numFinishedJobVertices = 0;
+
                                transitionState(JobStatus.RESTARTING, 
JobStatus.CREATED);
 
                                // if we have checkpointed state, reload it 
into the executions
@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * For testing: This waits until the job execution has finished.
         */
        public void waitUntilFinished() throws InterruptedException {
-               synchronized (progressLock) {
-                       while (!state.isTerminalState()) {
-                               progressLock.wait();
+               // we may need multiple attempts in the presence of failures / 
recovery
+               while (true) {
+                       for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+                               for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+                                       try {
+                                               
ev.getCurrentExecutionAttempt().getTerminationFuture().get();
+                                       }
+                                       catch (ExecutionException e) {
+                                               // this should never happen
+                                               throw new RuntimeException(e);
+                                       }
+                               }
+                       }
+
+                       // now that all vertices have been (at some point) in a 
terminal state,
+                       // we need to check if the job as a whole has entered a 
final state
+                       if (state.isTerminalState()) {
+                               return;
                        }
                }
        }
@@ -1129,59 +1175,57 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       void jobVertexInFinalState() {
-               synchronized (progressLock) {
-                       if (numFinishedJobVertices >= 
verticesInCreationOrder.size()) {
-                               throw new IllegalStateException("All vertices 
are already finished, cannot transition vertex to finished.");
-                       }
-
-                       numFinishedJobVertices++;
+       void vertexFinished() {
+               int numFinished = verticesFinished.incrementAndGet();
+               if (numFinished == numVerticesTotal) {
+                       // done :-)
+                       allVerticesInTerminalState();
+               }
+       }
 
-                       if (numFinishedJobVertices == 
verticesInCreationOrder.size()) {
+       void vertexUnFinished() {
+               verticesFinished.getAndDecrement();
+       }
 
-                               // we are done, transition to the final state
-                               JobStatus current;
-                               while (true) {
-                                       current = this.state;
+       private void allVerticesInTerminalState() {
+               // we are done, transition to the final state
+               JobStatus current;
+               while (true) {
+                       current = this.state;
 
-                                       if (current == JobStatus.RUNNING) {
-                                               if (transitionState(current, 
JobStatus.FINISHED)) {
-                                                       postRunCleanup();
-                                                       break;
-                                               }
-                                       }
-                                       else if (current == 
JobStatus.CANCELLING) {
-                                               if (transitionState(current, 
JobStatus.CANCELED)) {
-                                                       postRunCleanup();
-                                                       break;
-                                               }
-                                       }
-                                       else if (current == JobStatus.FAILING) {
-                                               if (tryRestartOrFail()) {
-                                                       break;
-                                               }
-                                               // concurrent job status 
change, let's check again
-                                       }
-                                       else if (current == 
JobStatus.SUSPENDED) {
-                                               // we've already cleaned up 
when entering the SUSPENDED state
-                                               break;
-                                       }
-                                       else if 
(current.isGloballyTerminalState()) {
-                                               LOG.warn("Job has entered 
globally terminal state without waiting for all " +
-                                                       "job vertices to reach 
final state.");
-                                               break;
-                                       }
-                                       else {
-                                               fail(new 
Exception("ExecutionGraph went into final state from state " + current));
-                                               break;
-                                       }
+                       if (current == JobStatus.RUNNING) {
+                               if (transitionState(current, 
JobStatus.FINISHED)) {
+                                       postRunCleanup();
+                                       break;
                                }
-                               // done transitioning the state
-
-                               // also, notify waiters
-                               progressLock.notifyAll();
+                       }
+                       else if (current == JobStatus.CANCELLING) {
+                               if (transitionState(current, 
JobStatus.CANCELED)) {
+                                       postRunCleanup();
+                                       break;
+                               }
+                       }
+                       else if (current == JobStatus.FAILING) {
+                               if (tryRestartOrFail()) {
+                                       break;
+                               }
+                               // concurrent job status change, let's check 
again
+                       }
+                       else if (current == JobStatus.SUSPENDED) {
+                               // we've already cleaned up when entering the 
SUSPENDED state
+                               break;
+                       }
+                       else if (current.isGloballyTerminalState()) {
+                               LOG.warn("Job has entered globally terminal 
state without waiting for all " +
+                                               "job vertices to reach final 
state.");
+                               break;
+                       }
+                       else {
+                               fail(new Exception("ExecutionGraph went into 
final state from state " + current));
+                               break;
                        }
                }
+               // done transitioning the state
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2e5de64..3197e65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,9 +66,9 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        public static final int VALUE_NOT_SET = -1;
 
        private final Object stateMonitor = new Object();
-       
+
        private final ExecutionGraph graph;
-       
+
        private final JobVertex jobVertex;
 
        /**
@@ -91,12 +92,10 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        private final ExecutionVertex[] taskVertices;
 
        private final IntermediateResult[] producedDataSets;
-       
+
        private final List<IntermediateResult> inputs;
-       
-       private final int parallelism;
 
-       private final boolean[] finishedSubtasks;
+       private final int parallelism;
 
        private final SlotSharingGroup slotSharingGroup;
 
@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
 
        private int maxParallelism;
 
-       private volatile int numSubtasksInFinalState;
-
        /**
         * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
         * serialize the same information multiple times in order to create the
@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                catch (Throwable t) {
                        throw new JobException("Creating the input splits 
caused an error: " + t.getMessage(), t);
                }
-               
-               finishedSubtasks = new boolean[parallelism];
        }
 
        /**
@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                return serializedTaskInformation;
        }
 
-       public boolean isInFinalState() {
-               return numSubtasksInFinalState == parallelism;
-       }
-
        @Override
        public ExecutionState getAggregateState() {
                int[] num = new int[ExecutionState.values().length];
@@ -484,51 +475,51 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                return slots;
        }
 
+       /**
+        * Cancels all currently running vertex executions.
+        */
        public void cancel() {
                for (ExecutionVertex ev : getTaskVertices()) {
                        ev.cancel();
                }
        }
-       
-       public void fail(Throwable t) {
+
+       /**
+        * Cancels all currently running vertex executions.
+        * 
+        * @return A future that is complete once all tasks have canceled.
+        */
+       public Future<Void> cancelWithFuture() {
+               // we collect all futures from the task cancellations
+               ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+
+               // cancel each vertex
                for (ExecutionVertex ev : getTaskVertices()) {
-                       ev.fail(t);
+                       futures.add(ev.cancel());
                }
+
+               // return a conjunct future, which is complete once all 
individual tasks are canceled
+               return FutureUtils.combineAll(futures);
        }
-       
-       public void waitForAllVerticesToReachFinishingState() throws 
InterruptedException {
-               synchronized (stateMonitor) {
-                       while (numSubtasksInFinalState < parallelism) {
-                               stateMonitor.wait();
-                       }
+
+       public void fail(Throwable t) {
+               for (ExecutionVertex ev : getTaskVertices()) {
+                       ev.fail(t);
                }
        }
-       
+
        public void resetForNewExecution() {
-               if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState 
== parallelism)) {
-                       throw new IllegalStateException("Cannot reset vertex 
that is not in final state");
-               }
-               
+
                synchronized (stateMonitor) {
                        // check and reset the sharing groups with scheduler 
hints
                        if (slotSharingGroup != null) {
                                slotSharingGroup.clearTaskAssignment();
                        }
-                       
-                       // reset vertices one by one. if one reset fails, the 
"vertices in final state"
-                       // fields will be consistent to handle triggered cancel 
calls
+
                        for (int i = 0; i < parallelism; i++) {
                                taskVertices[i].resetForNewExecution();
-                               if (finishedSubtasks[i]) {
-                                       finishedSubtasks[i] = false;
-                                       numSubtasksInFinalState--;
-                               }
-                       }
-                       
-                       if (numSubtasksInFinalState != 0) {
-                               throw new RuntimeException("Bug: resetting the 
execution job vertex failed.");
                        }
-                       
+
                        // set up the input splits again
                        try {
                                if (this.inputSplits != null) {
@@ -548,51 +539,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                        }
                }
        }
-       
-       
//---------------------------------------------------------------------------------------------
-       //  Notifications
-       
//---------------------------------------------------------------------------------------------
-       
-       void vertexFinished(int subtask) {
-               subtaskInFinalState(subtask);
-       }
-       
-       void vertexCancelled(int subtask) {
-               subtaskInFinalState(subtask);
-       }
-       
-       void vertexFailed(int subtask, Throwable error) {
-               subtaskInFinalState(subtask);
-       }
-       
-       private void subtaskInFinalState(int subtask) {
-               synchronized (stateMonitor) {
-                       if (!finishedSubtasks[subtask]) {
-                               finishedSubtasks[subtask] = true;
-                               
-                               if (numSubtasksInFinalState+1 == parallelism) {
-                                       
-                                       // call finalizeOnMaster hook
-                                       try {
-                                               
getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
-                                       }
-                                       catch (Throwable t) {
-                                               getGraph().fail(t);
-                                       }
-
-                                       numSubtasksInFinalState++;
-                                       
-                                       // we are in our final state
-                                       stateMonitor.notifyAll();
-                                       
-                                       // tell the graph
-                                       graph.jobVertexInFinalState();
-                               } else {
-                                       numSubtasksInFinalState++;
-                               }
-                       }
-               }
-       }
 
        // 
--------------------------------------------------------------------------------------------
        //  Accumulators / Metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3f6ce88..bcf7a7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -60,8 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
@@ -509,30 +508,41 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        //   Actions
        // 
--------------------------------------------------------------------------------------------
 
-       public void resetForNewExecution() {
+       public Execution resetForNewExecution() {
 
                LOG.debug("Resetting execution vertex {} for new execution.", 
getTaskNameWithSubtaskIndex());
 
                synchronized (priorExecutions) {
-                       Execution execution = currentExecution;
-                       ExecutionState state = execution.getState();
+                       final Execution oldExecution = currentExecution;
+                       final ExecutionState oldState = oldExecution.getState();
 
-                       if (state == FINISHED || state == CANCELED || state == 
FAILED) {
-                               priorExecutions.add(execution);
-                               currentExecution = new Execution(
+                       if (oldState.isTerminal()) {
+                               priorExecutions.add(oldExecution);
+
+                               final Execution newExecution = new Execution(
                                        getExecutionGraph().getFutureExecutor(),
                                        this,
-                                       execution.getAttemptNumber()+1,
+                                               
oldExecution.getAttemptNumber()+1,
                                        System.currentTimeMillis(),
                                        timeout);
 
+                               this.currentExecution = newExecution;
+
                                CoLocationGroup grp = 
jobVertex.getCoLocationGroup();
                                if (grp != null) {
                                        this.locationConstraint = 
grp.getLocationConstraint(subTaskIndex);
                                }
+
+                               // if the execution was 'FINISHED' before, tell 
the ExecutionGraph that
+                               // we take one step back on the road to 
reaching global FINISHED
+                               if (oldState == FINISHED) {
+                                       getExecutionGraph().vertexUnFinished();
+                               }
+
+                               return newExecution;
                        }
                        else {
-                               throw new IllegalStateException("Cannot reset a 
vertex that is in state " + state);
+                               throw new IllegalStateException("Cannot reset a 
vertex that is in non-terminal state " + oldState);
                        }
                }
        }
@@ -545,8 +555,16 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                this.currentExecution.deployToSlot(slot);
        }
 
-       public void cancel() {
-               this.currentExecution.cancel();
+       /**
+        *  
+        * @return A future that completes once the execution has reached its 
final state.
+        */
+       public Future<ExecutionState> cancel() {
+               // to avoid any case of mixup in the presence of concurrent 
calls,
+               // we copy a reference to the stack to make sure both calls go 
to the same Execution 
+               final Execution exec = this.currentExecution;
+               exec.cancel();
+               return exec.getTerminationFuture();
        }
 
        public void stop() {
@@ -621,16 +639,18 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        //   Notifications from the Execution Attempt
        // 
--------------------------------------------------------------------------------------------
 
-       void executionFinished() {
-               jobVertex.vertexFinished(subTaskIndex);
+       void executionFinished(Execution execution) {
+               if (execution == currentExecution) {
+                       getExecutionGraph().vertexFinished();
+               }
        }
 
-       void executionCanceled() {
-               jobVertex.vertexCancelled(subTaskIndex);
+       void executionCanceled(Execution execution) {
+               // nothing to do
        }
 
-       void executionFailed(Throwable t) {
-               jobVertex.vertexFailed(subTaskIndex, t);
+       void executionFailed(Execution execution, Throwable cause) {
+               // nothing to do
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1729582..1ebfcac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
+
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
        @Test
        public void testCancelWhileFailing() throws Exception {
-               // We want to manually control the restart and delay
-               RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
-               Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createSpyExecutionGraph(restartStrategy);
-               ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-               Instance instance = executionGraphInstanceTuple.f1;
-               doNothing().when(executionGraph).jobVertexInFinalState();
+               final RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
+               final ExecutionGraph graph = 
createExecutionGraph(restartStrategy).f0;
 
-               // Kill the instance...
-               instance.markDead();
+               assertEquals(JobStatus.RUNNING, graph.getState());
 
-               Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+               // switch all tasks to running
+               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+                       vertex.getCurrentExecutionAttempt().switchToRunning();
+               }
 
-               // ...and wait for all vertices to be in state FAILED. The
-               // jobVertexInFinalState does nothing, that's why we don't wait 
on the
-               // job status.
-               boolean success = false;
-               while (deadline.hasTimeLeft() && !success) {
-                       success = true;
-                       for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-                               ExecutionState state = 
vertex.getExecutionState();
-                               if (state != ExecutionState.FAILED && state != 
ExecutionState.CANCELED) {
-                                       success = false;
-                                       Thread.sleep(100);
-                                       break;
-                               }
-                       }
+               graph.fail(new Exception("test"));
+
+               assertEquals(JobStatus.FAILING, graph.getState());
+
+               graph.cancel();
+
+               assertEquals(JobStatus.CANCELLING, graph.getState());
+
+               // let all tasks finish cancelling
+               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
                }
 
-               // Still in failing
-               assertEquals(JobStatus.FAILING, executionGraph.getState());
+               assertEquals(JobStatus.CANCELED, graph.getState());
+       }
 
-               // The cancel call needs to change the state to CANCELLING
-               executionGraph.cancel();
+       @Test
+       public void testFailWhileCanceling() throws Exception {
+               final RestartStrategy restartStrategy = new NoRestartStrategy();
+               final ExecutionGraph graph = 
createExecutionGraph(restartStrategy).f0;
 
-               assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+               assertEquals(JobStatus.RUNNING, graph.getState());
 
-               // Unspy and finalize the job state
-               doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+               // switch all tasks to running
+               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+                       vertex.getCurrentExecutionAttempt().switchToRunning();
+               }
 
-               executionGraph.jobVertexInFinalState();
+               graph.cancel();
 
-               assertEquals(JobStatus.CANCELED, executionGraph.getState());
+               assertEquals(JobStatus.CANCELLING, graph.getState());
+
+               graph.fail(new Exception("test"));
+
+               assertEquals(JobStatus.FAILING, graph.getState());
+
+               // let all tasks finish cancelling
+               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
+               }
+
+               assertEquals(JobStatus.FAILED, graph.getState());
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b0137fa..73838dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -52,9 +51,10 @@ import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
-import org.mockito.Matchers;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 
@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
                        }
                };
 
-               doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
-               doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), 
Matchers.any(Throwable.class));
-               doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
                return ejv;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
deleted file mode 100644
index bd51c81..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-       @Test
-       public void testAccumulatedStateFinished() {
-               try {
-                       final JobID jid = new JobID();
-                       final JobVertexID vid = new JobVertexID();
-
-                       JobVertex ajv = new JobVertex("TestVertex", vid);
-                       ajv.setParallelism(3);
-                       
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-                       ExecutionGraph graph = new ExecutionGraph(
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               jid, 
-                               "test job", 
-                               new Configuration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy(),
-                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
-                       graph.attachJobGraph(Collections.singletonList(ajv));
-
-                       setGraphStatus(graph, JobStatus.RUNNING);
-
-                       ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-                       // mock resources and mock taskmanager
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               SimpleSlot slot = getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new SimpleActorGateway(
-                                                       
TestingUtils.defaultExecutionContext()))
-                               ).allocateSimpleSlot(jid);
-                               ee.deployToSlot(slot);
-                       }
-
-                       // finish all
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               ee.executionFinished();
-                       }
-
-                       assertTrue(ejv.isInFinalState());
-                       assertEquals(JobStatus.FINISHED, graph.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
deleted file mode 100644
index d717986..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.*;
-
-public class TerminalStateDeadlockTest {
-
-       private final Field stateField;
-       private final Field resourceField;
-       private final Field execGraphStateField;
-       private final Field execGraphSlotProviderField;
-
-       private final SimpleSlot resource;
-
-
-       public TerminalStateDeadlockTest() {
-               try {
-                       // the reflection fields to access the private fields
-                       this.stateField = 
Execution.class.getDeclaredField("state");
-                       this.stateField.setAccessible(true);
-
-                       this.resourceField = 
Execution.class.getDeclaredField("assignedResource");
-                       this.resourceField.setAccessible(true);
-
-                       this.execGraphStateField = 
ExecutionGraph.class.getDeclaredField("state");
-                       this.execGraphStateField.setAccessible(true);
-
-                       this.execGraphSlotProviderField = 
ExecutionGraph.class.getDeclaredField("slotProvider");
-                       this.execGraphSlotProviderField.setAccessible(true);
-                       
-                       // the dummy resource
-                       ResourceID resourceId = ResourceID.generate();
-                       InetAddress address = 
InetAddress.getByName("127.0.0.1");
-                       TaskManagerLocation ci = new 
TaskManagerLocation(resourceId, address, 12345);
-                               
-                       HardwareDescription resources = new 
HardwareDescription(4, 4000000, 3000000, 2000000);
-                       Instance instance = new Instance(
-                               new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), 
resources, 4);
-
-                       this.resource = instance.allocateSimpleSlot(new 
JobID());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-                       
-                       // silence the compiler
-                       throw new RuntimeException();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testProvokeDeadlock() {
-               try {
-                       final JobID jobId = resource.getJobID();
-                       final JobVertexID vid1 = new JobVertexID();
-                       final JobVertexID vid2 = new JobVertexID();
-                       
-                       final List<JobVertex> vertices;
-                       {
-                               JobVertex v1 = new JobVertex("v1", vid1);
-                               JobVertex v2 = new JobVertex("v2", vid2);
-                               v1.setParallelism(1);
-                               v2.setParallelism(1);
-                               v1.setInvokableClass(DummyInvokable.class);
-                               v2.setInvokableClass(DummyInvokable.class);
-                               vertices = Arrays.asList(v1, v2);
-                       }
-                       
-                       final Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       
-                       final Executor executor = 
Executors.newFixedThreadPool(4);
-                       
-                       // try a lot!
-                       for (int i = 0; i < 20000; i++) {
-                               final TestExecGraph eg = new 
TestExecGraph(jobId);
-                               eg.attachJobGraph(vertices);
-
-                               final Execution e1 = 
eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
-                               final Execution e2 = 
eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
-
-                               initializeExecution(e1);
-                               initializeExecution(e2);
-
-                               execGraphStateField.set(eg, JobStatus.FAILING);
-                               execGraphSlotProviderField.set(eg, scheduler);
-                               
-                               Runnable r1 = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               e1.cancelingComplete();
-                                       }
-                               };
-                               Runnable r2 = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               e2.cancelingComplete();
-                                       }
-                               };
-                               
-                               executor.execute(r1);
-                               executor.execute(r2);
-                               
-                               eg.waitTillDone();
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       private void initializeExecution(Execution exec) throws 
IllegalAccessException {
-               // set state to canceling
-               stateField.set(exec, ExecutionState.CANCELING);
-               
-               // assign a resource
-               resourceField.set(exec, resource);
-       }
-       
-       
-       static class TestExecGraph extends ExecutionGraph {
-
-               private static final Configuration EMPTY_CONFIG = new 
Configuration();
-
-               private static final Time TIMEOUT = Time.seconds(30L);
-
-               private volatile boolean done;
-
-               TestExecGraph(JobID jobId) throws IOException {
-                       super(
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               jobId,
-                               "test graph",
-                               EMPTY_CONFIG,
-                               new SerializedValue<>(new ExecutionConfig()),
-                               TIMEOUT,
-                               new FixedDelayRestartStrategy(1, 0),
-                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
-               }
-
-               @Override
-               public void scheduleForExecution() {
-                       // notify that we are done with the "restarting"
-                       synchronized (this) {
-                               done = true;
-                               this.notifyAll();
-                       }
-               }
-
-               public void waitTillDone() {
-                       try {
-                               synchronized (this) {
-                                       while (!done) {
-                                               this.wait();
-                                       }
-                               }
-                       }
-                       catch (InterruptedException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-       }
-}

Reply via email to