[FLINK-4256] [distributed runtime] Clean up serializability of ExecutionGraph
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9b0dad8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9b0dad8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9b0dad8 Branch: refs/heads/master Commit: c9b0dad89b79f11ce13fe78f8a0823b8266f930f Parents: 2a4b222 Author: Stephan Ewen <se...@apache.org> Authored: Thu Aug 4 16:48:37 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Aug 5 19:57:01 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 36 +++++++++++--------- .../runtime/executiongraph/ExecutionGraph.java | 22 ++++-------- .../executiongraph/ExecutionJobVertex.java | 9 +++-- .../runtime/executiongraph/ExecutionVertex.java | 22 ++++++------ .../api/graph/StreamingJobGraphGenerator.java | 1 - 5 files changed, 39 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index fd296c3..5bab780 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph; import akka.dispatch.OnComplete; import akka.dispatch.OnFailure; + import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -56,7 +57,6 @@ import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -87,23 +87,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery, * or other re-computation), this class tracks the state of a single execution of that vertex and the resources. * - * NOTE ABOUT THE DESIGN RATIONAL: + * <p>NOTE ABOUT THE DESIGN RATIONAL: * - * In several points of the code, we need to deal with possible concurrent state changes and actions. + * <p>In several points of the code, we need to deal with possible concurrent state changes and actions. * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled. * - * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that + * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel * command" call will never overtake the deploying call. * - * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it + * <p>This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting * actions if it is not. Many actions are also idempotent (like canceling). */ -public class Execution implements Serializable { - - private static final long serialVersionUID = 42L; +public class Execution { private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); @@ -136,15 +134,16 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution + /** The state with which the execution attempt should start */ private SerializedValue<StateHandle<?>> operatorState; - private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState; - /** The execution context which is used to execute futures. */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private ExecutionContext executionContext; - /* Lock for updating the accumulators atomically. */ + // ------------------------- Accumulators --------------------------------- + + /* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten + * by partial accumulators on a late heartbeat*/ private final SerializableObject accumulatorLock = new SerializableObject(); /* Continuously updated map of user-defined accumulators */ @@ -217,7 +216,7 @@ public class Execution implements Serializable { } public boolean isFinished() { - return state == FINISHED || state == FAILED || state == CANCELED; + return state.isTerminal(); } /** @@ -236,14 +235,18 @@ public class Execution implements Serializable { } public void setInitialState( - SerializedValue<StateHandle<?>> initialState, - Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) { + SerializedValue<StateHandle<?>> initialState, + Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) { + + if (initialKvState != null && initialKvState.size() > 0) { + throw new UnsupportedOperationException("Error: inconsistent handling of key/value state snapshots"); + } if (state != ExecutionState.CREATED) { throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); } + this.operatorState = initialState; - this.operatorKvState = initialKvState; } // -------------------------------------------------------------------------------------------- @@ -371,7 +374,6 @@ public class Execution implements Serializable { attemptId, slot, operatorState, - operatorKvState, attemptNumber); // register this execution at the execution graph, to receive call backs http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 4229105..b778fa6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorSystem; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; @@ -58,13 +59,14 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; @@ -104,15 +106,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to * address the message receiver.</li> * </ul> - * - * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by - * sending it to an archive actor via an actor message. The execution graph does contain some - * non-serializable fields. These fields are not required in the archived form and are cleared - * in the {@link #prepareForArchiving()} method.</p> */ -public class ExecutionGraph implements Serializable { - - private static final long serialVersionUID = 42L; +public class ExecutionGraph { private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); @@ -179,7 +174,7 @@ public class ExecutionGraph implements Serializable { // ------ Configuration of the Execution ------- /** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */ - private SerializedValue<ExecutionConfig> serializedExecutionConfig; + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able * to deploy them immediately. */ @@ -208,30 +203,25 @@ public class ExecutionGraph implements Serializable { // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- /** The scheduler to use for scheduling new tasks as they are needed */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private Scheduler scheduler; /** Strategy to use for restarts */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private RestartStrategy restartStrategy; /** The classloader for the user code. Needed for calls into user code classes */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private ClassLoader userClassLoader; /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private CheckpointCoordinator checkpointCoordinator; /** The coordinator for savepoints, if snapshot checkpoints are enabled */ private transient SavepointCoordinator savepointCoordinator; - /** Checkpoint stats tracker seperate from the coordinator in order to be + /** Checkpoint stats tracker separate from the coordinator in order to be * available after archiving. */ private CheckpointStatsTracker checkpointStatsTracker; /** The execution context which is used to execute futures. */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private ExecutionContext executionContext; // ------ Fields that are only relevant for archived execution graphs ------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 7d4be79..7b28b31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -42,20 +42,19 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.util.SerializableObject; + import org.slf4j.Logger; + import scala.concurrent.duration.FiniteDuration; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -public class ExecutionJobVertex implements Serializable { - - private static final long serialVersionUID = 42L; - +public class ExecutionJobVertex { + /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e20f466..08bf57f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -42,9 +42,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; - import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -69,9 +69,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of * which time it spawns an {@link Execution}. */ -public class ExecutionVertex implements Serializable { - - private static final long serialVersionUID = 42L; +public class ExecutionVertex { private static final Logger LOG = ExecutionGraph.LOG; @@ -91,6 +89,9 @@ public class ExecutionVertex implements Serializable { private final FiniteDuration timeout; + /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */ + private final String taskNameWithSubtask; + private volatile CoLocationConstraint locationConstraint; private volatile Execution currentExecution; // this field must never be null @@ -115,8 +116,11 @@ public class ExecutionVertex implements Serializable { IntermediateResult[] producedDataSets, FiniteDuration timeout, long createTimestamp) { + this.jobVertex = jobVertex; this.subTaskIndex = subTaskIndex; + this.taskNameWithSubtask = String.format("%s (%d/%d)", + jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism()); this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); @@ -172,11 +176,7 @@ public class ExecutionVertex implements Serializable { } public String getTaskNameWithSubtaskIndex() { - return String.format( - "%s (%d/%d)", - jobVertex.getJobVertex().getName(), - subTaskIndex + 1, - getTotalNumberOfParallelSubtasks()); + return this.taskNameWithSubtask; } public int getTotalNumberOfParallelSubtasks() { @@ -547,10 +547,9 @@ public class ExecutionVertex implements Serializable { */ public void prepareForArchiving() throws IllegalStateException { Execution execution = currentExecution; - ExecutionState state = execution.getState(); // sanity check - if (!(state == FINISHED || state == CANCELED || state == FAILED)) { + if (!execution.isFinished()) { throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); } @@ -637,7 +636,6 @@ public class ExecutionVertex implements Serializable { ExecutionAttemptID executionId, SimpleSlot targetSlot, SerializedValue<StateHandle<?>> operatorState, - Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState, int attemptNumber) { // Produced intermediate results http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 71cc7f2..3abecc1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -469,7 +469,6 @@ public class StreamingJobGraphGenerator { if (vertex.isInputVertex()) { triggerVertices.add(vertex.getID()); } - // TODO: add check whether the user function implements the checkpointing interface commitVertices.add(vertex.getID()); ackVertices.add(vertex.getID()); }