[FLINK-1638] [jobmanager] Cleanups in the ExecutionGraph for streaming fault 
tolerance


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bba2b3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bba2b3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bba2b3f

Branch: refs/heads/master
Commit: 2bba2b3f0447be1802872a19344008cffb997b2b
Parents: f2b5c21
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 10 13:42:59 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:59:02 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 111 ++++++++++++++-----
 .../runtime/executiongraph/ExecutionVertex.java |  14 +--
 2 files changed, 93 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bba2b3f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c319a5c..81f83e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -58,6 +58,30 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static akka.dispatch.Futures.future;
 
+/**
+ * The execution graph is the central data structure that coordinates the 
distributed
+ * execution of a data flow. It keeps representations of each parallel task, 
each
+ * intermediate result, and the communication between them.
+ *
+ * The execution graph consists of the following constructs:
+ * <ul>
+ *     <li>The {@link ExecutionJobVertex} represents one vertex from the 
JobGraph (usually one operation like
+ *         "map" or "join") during execution. It holds the aggregated state of 
all parallel subtasks.
+ *         The ExecutionJobVertex is identified inside the graph by the {@link 
JobVertexID}, which it takes
+ *         from the JobGraph's corresponding JobVertex.</li>
+ *     <li>The {@link ExecutionVertex} represents one parallel subtask. For 
each ExecutionJobVertex, there are
+ *         as many ExecutionVertices as the degree of parallelism. The 
ExecutionVertex is identified by
+ *         the ExecutionJobVertex and the number of the parallel subtask</li>
+ *     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. 
There may be multiple Executions
+ *         for the ExecutionVertex, in case of a failure, or in the case where 
some data needs to be recomputed
+ *         because it is no longer available when requested by later 
operations. An Execution is always
+ *         identified by an {@link ExecutionAttemptID}. All messages between 
the JobManager and the TaskManager
+ *         about deployment of tasks and updates in the task status always use 
the ExecutionAttemptID to
+ *         address the message receiver.</li>
+ * </ul>
+ *
+ *
+ */
 public class ExecutionGraph implements Serializable {
 
        private static final long serialVersionUID = 42L;
@@ -79,7 +103,7 @@ public class ExecutionGraph implements Serializable {
        /** The job configuration that was originally attached to the JobGraph. 
*/
        private final Configuration jobConfiguration;
 
-       /** The classloader of the user code. */
+       /** The classloader for the user code. Needed for calls into user code 
classes */
        private ClassLoader userClassLoader;
 
        /** All job vertices that are part of this graph */
@@ -94,31 +118,63 @@ public class ExecutionGraph implements Serializable {
        /** The currently executed tasks, for callbacks */
        private final ConcurrentHashMap<ExecutionAttemptID, Execution> 
currentExecutions;
 
+       /** A list of all libraries required during the job execution. 
Libraries have to be stored
+        * inside the BlobService and are referenced via the BLOB keys. */
        private final List<BlobKey> requiredJarFiles;
 
        private final List<ActorRef> jobStatusListenerActors;
 
        private final List<ActorRef> executionListenerActors;
 
+       /** Timestamps (in milliseconds as returned by {@code 
System.currentTimeMillis()} when
+        * the execution graph transitioned into a certain state. The index 
into this array is the
+        * ordinal of the enum value, i.e. the timestamp when the graph went 
into state "RUNNING" is
+        * at {@code stateTimestamps[RUNNING.ordinal()]}. */
        private final long[] stateTimestamps;
 
+       /** The lock used to secure all access to mutable fields, especially 
the tracking of progress
+        * within the job. */
        private final Object progressLock = new Object();
 
-       private int nextVertexToFinish;
+       /** The timeout for all messages that require a 
response/acknowledgement */
+       private final FiniteDuration timeout;
 
+
+       // ------ Configuration of the Execution -------
+
+       /** The number of times failed executions should be retried. */
        private int numberOfRetriesLeft;
 
+       /** The delay that the system should wait before restarting failed 
executions. */
        private long delayBeforeRetrying;
 
-       private final FiniteDuration timeout;
+       /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
+        * to deploy them immediately. */
+       private boolean allowQueuedScheduling = false;
+
+       /** The mode of scheduling. Decides how to select the initial set of 
tasks to be deployed.
+        * May indicate to deploy all sources, or to deploy everything, or to 
deploy via backtracking
+        * from results than need to be materialized. */
+       private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
+
+       // ------ Execution status and progress -------
+
+       /** Current status of the job execution */
        private volatile JobStatus state = JobStatus.CREATED;
 
+       /** The exception that caused the job to fail. This is set to the first 
root exception
+        * that was not recoverable and triggered job failure */
        private volatile Throwable failureCause;
 
+       /** The scheduler to use for scheduling new tasks as they are needed */
        private Scheduler scheduler;
 
-       private boolean allowQueuedScheduling = true;
+       /** The position of the vertex that is next expected to finish.
+        * This is an index into the "verticesInCreationOrder" collection.
+        * Once this value has reached the number of vertices, the job is done. 
*/
+       private int nextVertexToFinish;
+
 
        private ActorContext parentContext;
 
@@ -128,7 +184,6 @@ public class ExecutionGraph implements Serializable {
        
        private long monitoringInterval = 10000;
 
-       private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
        
        public ExecutionGraph(JobID jobId, String jobName, Configuration 
jobConfig, FiniteDuration timeout) {
@@ -397,8 +452,7 @@ public class ExecutionGraph implements Serializable {
                                        throw new JobException("BACKTRACKING is 
currently not supported as schedule mode.");
                        }
 
-                       if(monitoringEnabled)
-                       {
+                       if (monitoringEnabled) {
                                stateMonitorActor = 
StreamCheckpointCoordinator.spawn(parentContext, this,
                                                
Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
                        }
@@ -567,18 +621,17 @@ public class ExecutionGraph implements Serializable {
                }
        }
        
-       public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, 
Integer, Long> , StateHandle> states)
-       {
-               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> 
state : states.entrySet())
-               {
-                       
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
+       public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , 
StateHandle> states) {
+               synchronized (this.progressLock) {
+                       for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, 
StateHandle> state : states.entrySet()) {
+                               
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
+                       }
                }
        }
 
        public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, 
int partitionIndex) {
                Execution execution = currentExecutions.get(executionId);
 
-
                if (execution == null) {
                        fail(new IllegalStateException("Cannot find execution 
for execution ID " +
                                        executionId));
@@ -630,10 +683,12 @@ public class ExecutionGraph implements Serializable {
         * NOTE: This method never throws an error, only logs errors caused by 
the notified listeners.
         */
        private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
-               if(jobStatusListenerActors.size() > 0){
-                       for(ActorRef listener: jobStatusListenerActors){
-                               listener.tell(new 
ExecutionGraphMessages.JobStatusChanged(jobID, newState, 
System.currentTimeMillis(),
-                                                               error), 
ActorRef.noSender());
+               if (jobStatusListenerActors.size() > 0) {
+                       ExecutionGraphMessages.JobStatusChanged message =
+                                       new 
ExecutionGraphMessages.JobStatusChanged(jobID, newState, 
System.currentTimeMillis(), error);
+
+                       for (ActorRef listener: jobStatusListenerActors) {
+                               listener.tell(message, ActorRef.noSender());
                        }
                }
        }
@@ -642,16 +697,20 @@ public class ExecutionGraph implements Serializable {
         * NOTE: This method never throws an error, only logs errors caused by 
the notified listeners.
         */
        void notifyExecutionChange(JobVertexID vertexId, int subtask, 
ExecutionAttemptID executionID, ExecutionState
-                                                       newExecutionState, 
Throwable error) {
+                                                       newExecutionState, 
Throwable error)
+       {
                ExecutionJobVertex vertex = getJobVertex(vertexId);
 
-               if(executionListenerActors.size() >0){
+               if (executionListenerActors.size() > 0) {
                        String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
-                       for(ActorRef listener : executionListenerActors){
-                               listener.tell(new 
ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,
-                                                               
vertex.getJobVertex().getName(), vertex.getParallelism(), subtask,
-                                                               executionID, 
newExecutionState, System.currentTimeMillis(),
-                                                               message), 
ActorRef.noSender());
+                       ExecutionGraphMessages.ExecutionStateChanged 
actorMessage =
+                                       new 
ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  
vertex.getJobVertex().getName(),
+                                                                               
                                                        
vertex.getParallelism(), subtask,
+                                                                               
                                                        executionID, 
newExecutionState,
+                                                                               
                                                        
System.currentTimeMillis(), message);
+
+                       for (ActorRef listener : executionListenerActors) {
+                               listener.tell(actorMessage, 
ActorRef.noSender());
                        }
                }
 
@@ -674,7 +733,7 @@ public class ExecutionGraph implements Serializable {
                                        throw new IllegalStateException("Can 
only restart job from state restarting.");
                                }
                                if (scheduler == null) {
-                                       throw new IllegalStateException("The 
execution graph has not been schedudled before - scheduler is null.");
+                                       throw new IllegalStateException("The 
execution graph has not been scheduled before - scheduler is null.");
                                }
 
                                this.currentExecutions.clear();
@@ -718,5 +777,7 @@ public class ExecutionGraph implements Serializable {
                executionListenerActors.clear();
                
                scheduler = null;
+               parentContext = null;
+               stateMonitorActor = null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bba2b3f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 24bcf21..41b78f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -50,6 +50,9 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
@@ -377,8 +380,7 @@ public class ExecutionVertex implements Serializable {
                        Execution execution = currentExecution;
                        ExecutionState state = execution.getState();
 
-                       if (state == ExecutionState.FINISHED || state == 
ExecutionState.CANCELED
-                                       || state == ExecutionState.FAILED) {
+                       if (state == FINISHED || state == CANCELED || state 
==FAILED) {
                                priorExecutions.add(execution);
                                currentExecution = new Execution(this, 
execution.getAttemptNumber()+1,
                                                System.currentTimeMillis(), 
timeout);
@@ -388,8 +390,7 @@ public class ExecutionVertex implements Serializable {
                                        this.locationConstraint = 
grp.getLocationConstraint(subTaskIndex);
                                }
                                
-                               if(operatorState!=null)
-                               {
+                               if (operatorState != null) {
                                        
execution.setOperatorState(operatorState);
                                }
                                
@@ -436,9 +437,8 @@ public class ExecutionVertex implements Serializable {
                ExecutionState state = execution.getState();
 
                // sanity check
-               if (!(state == ExecutionState.FINISHED || state == 
ExecutionState.CANCELED || state == ExecutionState.FAILED)) {
-                       throw new IllegalStateException(
-                                       "Cannot archive ExecutionVertex that is 
not in a finished state.");
+               if (!(state == FINISHED || state == CANCELED || state == 
FAILED)) {
+                       throw new IllegalStateException("Cannot archive 
ExecutionVertex that is not in a finished state.");
                }
                
                // prepare the current execution for archiving

Reply via email to