[FLINK-1638] [jobmanager] Cleanups in the ExecutionGraph for streaming fault tolerance
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bba2b3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bba2b3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bba2b3f Branch: refs/heads/master Commit: 2bba2b3f0447be1802872a19344008cffb997b2b Parents: f2b5c21 Author: Stephan Ewen <se...@apache.org> Authored: Tue Mar 10 13:42:59 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:59:02 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 111 ++++++++++++++----- .../runtime/executiongraph/ExecutionVertex.java | 14 +-- 2 files changed, 93 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2bba2b3f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index c319a5c..81f83e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -58,6 +58,30 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static akka.dispatch.Futures.future; +/** + * The execution graph is the central data structure that coordinates the distributed + * execution of a data flow. It keeps representations of each parallel task, each + * intermediate result, and the communication between them. + * + * The execution graph consists of the following constructs: + * <ul> + * <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like + * "map" or "join") during execution. It holds the aggregated state of all parallel subtasks. + * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes + * from the JobGraph's corresponding JobVertex.</li> + * <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are + * as many ExecutionVertices as the degree of parallelism. The ExecutionVertex is identified by + * the ExecutionJobVertex and the number of the parallel subtask</li> + * <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions + * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed + * because it is no longer available when requested by later operations. An Execution is always + * identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager + * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to + * address the message receiver.</li> + * </ul> + * + * + */ public class ExecutionGraph implements Serializable { private static final long serialVersionUID = 42L; @@ -79,7 +103,7 @@ public class ExecutionGraph implements Serializable { /** The job configuration that was originally attached to the JobGraph. */ private final Configuration jobConfiguration; - /** The classloader of the user code. */ + /** The classloader for the user code. Needed for calls into user code classes */ private ClassLoader userClassLoader; /** All job vertices that are part of this graph */ @@ -94,31 +118,63 @@ public class ExecutionGraph implements Serializable { /** The currently executed tasks, for callbacks */ private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions; + /** A list of all libraries required during the job execution. Libraries have to be stored + * inside the BlobService and are referenced via the BLOB keys. */ private final List<BlobKey> requiredJarFiles; private final List<ActorRef> jobStatusListenerActors; private final List<ActorRef> executionListenerActors; + /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when + * the execution graph transitioned into a certain state. The index into this array is the + * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is + * at {@code stateTimestamps[RUNNING.ordinal()]}. */ private final long[] stateTimestamps; + /** The lock used to secure all access to mutable fields, especially the tracking of progress + * within the job. */ private final Object progressLock = new Object(); - private int nextVertexToFinish; + /** The timeout for all messages that require a response/acknowledgement */ + private final FiniteDuration timeout; + + // ------ Configuration of the Execution ------- + + /** The number of times failed executions should be retried. */ private int numberOfRetriesLeft; + /** The delay that the system should wait before restarting failed executions. */ private long delayBeforeRetrying; - private final FiniteDuration timeout; + /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able + * to deploy them immediately. */ + private boolean allowQueuedScheduling = false; + + /** The mode of scheduling. Decides how to select the initial set of tasks to be deployed. + * May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking + * from results than need to be materialized. */ + private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + + // ------ Execution status and progress ------- + + /** Current status of the job execution */ private volatile JobStatus state = JobStatus.CREATED; + /** The exception that caused the job to fail. This is set to the first root exception + * that was not recoverable and triggered job failure */ private volatile Throwable failureCause; + /** The scheduler to use for scheduling new tasks as they are needed */ private Scheduler scheduler; - private boolean allowQueuedScheduling = true; + /** The position of the vertex that is next expected to finish. + * This is an index into the "verticesInCreationOrder" collection. + * Once this value has reached the number of vertices, the job is done. */ + private int nextVertexToFinish; + private ActorContext parentContext; @@ -128,7 +184,6 @@ public class ExecutionGraph implements Serializable { private long monitoringInterval = 10000; - private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { @@ -397,8 +452,7 @@ public class ExecutionGraph implements Serializable { throw new JobException("BACKTRACKING is currently not supported as schedule mode."); } - if(monitoringEnabled) - { + if (monitoringEnabled) { stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this, Duration.create(monitoringInterval, TimeUnit.MILLISECONDS)); } @@ -567,18 +621,17 @@ public class ExecutionGraph implements Serializable { } } - public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) - { - for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> state : states.entrySet()) - { - tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); + public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) { + synchronized (this.progressLock) { + for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet()) { + tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); + } } } public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) { Execution execution = currentExecutions.get(executionId); - if (execution == null) { fail(new IllegalStateException("Cannot find execution for execution ID " + executionId)); @@ -630,10 +683,12 @@ public class ExecutionGraph implements Serializable { * NOTE: This method never throws an error, only logs errors caused by the notified listeners. */ private void notifyJobStatusChange(JobStatus newState, Throwable error) { - if(jobStatusListenerActors.size() > 0){ - for(ActorRef listener: jobStatusListenerActors){ - listener.tell(new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), - error), ActorRef.noSender()); + if (jobStatusListenerActors.size() > 0) { + ExecutionGraphMessages.JobStatusChanged message = + new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), error); + + for (ActorRef listener: jobStatusListenerActors) { + listener.tell(message, ActorRef.noSender()); } } } @@ -642,16 +697,20 @@ public class ExecutionGraph implements Serializable { * NOTE: This method never throws an error, only logs errors caused by the notified listeners. */ void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState - newExecutionState, Throwable error) { + newExecutionState, Throwable error) + { ExecutionJobVertex vertex = getJobVertex(vertexId); - if(executionListenerActors.size() >0){ + if (executionListenerActors.size() > 0) { String message = error == null ? null : ExceptionUtils.stringifyException(error); - for(ActorRef listener : executionListenerActors){ - listener.tell(new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId, - vertex.getJobVertex().getName(), vertex.getParallelism(), subtask, - executionID, newExecutionState, System.currentTimeMillis(), - message), ActorRef.noSender()); + ExecutionGraphMessages.ExecutionStateChanged actorMessage = + new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId, vertex.getJobVertex().getName(), + vertex.getParallelism(), subtask, + executionID, newExecutionState, + System.currentTimeMillis(), message); + + for (ActorRef listener : executionListenerActors) { + listener.tell(actorMessage, ActorRef.noSender()); } } @@ -674,7 +733,7 @@ public class ExecutionGraph implements Serializable { throw new IllegalStateException("Can only restart job from state restarting."); } if (scheduler == null) { - throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null."); + throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); } this.currentExecutions.clear(); @@ -718,5 +777,7 @@ public class ExecutionGraph implements Serializable { executionListenerActors.clear(); scheduler = null; + parentContext = null; + stateMonitorActor = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2bba2b3f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 24bcf21..41b78f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -50,6 +50,9 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; +import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; +import static org.apache.flink.runtime.execution.ExecutionState.FAILED; +import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; /** * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of @@ -377,8 +380,7 @@ public class ExecutionVertex implements Serializable { Execution execution = currentExecution; ExecutionState state = execution.getState(); - if (state == ExecutionState.FINISHED || state == ExecutionState.CANCELED - || state == ExecutionState.FAILED) { + if (state == FINISHED || state == CANCELED || state ==FAILED) { priorExecutions.add(execution); currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis(), timeout); @@ -388,8 +390,7 @@ public class ExecutionVertex implements Serializable { this.locationConstraint = grp.getLocationConstraint(subTaskIndex); } - if(operatorState!=null) - { + if (operatorState != null) { execution.setOperatorState(operatorState); } @@ -436,9 +437,8 @@ public class ExecutionVertex implements Serializable { ExecutionState state = execution.getState(); // sanity check - if (!(state == ExecutionState.FINISHED || state == ExecutionState.CANCELED || state == ExecutionState.FAILED)) { - throw new IllegalStateException( - "Cannot archive ExecutionVertex that is not in a finished state."); + if (!(state == FINISHED || state == CANCELED || state == FAILED)) { + throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); } // prepare the current execution for archiving