[FLINK-4256] [distributed runtime] Clean up serializability of ExecutionGraph


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9b0dad8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9b0dad8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9b0dad8

Branch: refs/heads/master
Commit: c9b0dad89b79f11ce13fe78f8a0823b8266f930f
Parents: 2a4b222
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 4 16:48:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 5 19:57:01 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 36 +++++++++++---------
 .../runtime/executiongraph/ExecutionGraph.java  | 22 ++++--------
 .../executiongraph/ExecutionJobVertex.java      |  9 +++--
 .../runtime/executiongraph/ExecutionVertex.java | 22 ++++++------
 .../api/graph/StreamingJobGraphGenerator.java   |  1 -
 5 files changed, 39 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index fd296c3..5bab780 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -56,7 +57,6 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -87,23 +87,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A single execution of a vertex. While an {@link ExecutionVertex} can be 
executed multiple times (for recovery,
  * or other re-computation), this class tracks the state of a single execution 
of that vertex and the resources.
  * 
- * NOTE ABOUT THE DESIGN RATIONAL:
+ * <p>NOTE ABOUT THE DESIGN RATIONAL:
  * 
- * In several points of the code, we need to deal with possible concurrent 
state changes and actions.
+ * <p>In several points of the code, we need to deal with possible concurrent 
state changes and actions.
  * For example, while the call to deploy a task (send it to the TaskManager) 
happens, the task gets cancelled.
  * 
- * We could lock the entire portion of the code (decision to deploy, deploy, 
set state to running) such that
+ * <p>We could lock the entire portion of the code (decision to deploy, 
deploy, set state to running) such that
  * it is guaranteed that any "cancel command" will only pick up after 
deployment is done and that the "cancel
  * command" call will never overtake the deploying call.
  * 
- * This blocks the threads big time, because the remote calls may take long. 
Depending of their locking behavior, it
+ * <p>This blocks the threads big time, because the remote calls may take 
long. Depending of their locking behavior, it
  * may even result in distributed deadlocks (unless carefully avoided). We 
therefore use atomic state updates and
  * occasional double-checking to ensure that the state after a completed call 
is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution implements Serializable {
-
-       private static final long serialVersionUID = 42L;
+public class Execution {
 
        private static final AtomicReferenceFieldUpdater<Execution, 
ExecutionState> STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");
@@ -136,15 +134,16 @@ public class Execution implements Serializable {
 
        private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
 
+       /** The state with which the execution attempt should start */
        private SerializedValue<StateHandle<?>> operatorState;
 
-       private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState;
-
        /** The execution context which is used to execute futures. */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ExecutionContext executionContext;
 
-       /* Lock for updating the accumulators atomically. */
+       // ------------------------- Accumulators 
---------------------------------
+       
+       /* Lock for updating the accumulators atomically. Prevents final 
accumulators to be overwritten
+       * by partial accumulators on a late heartbeat*/
        private final SerializableObject accumulatorLock = new 
SerializableObject();
 
        /* Continuously updated map of user-defined accumulators */
@@ -217,7 +216,7 @@ public class Execution implements Serializable {
        }
 
        public boolean isFinished() {
-               return state == FINISHED || state == FAILED || state == 
CANCELED;
+               return state.isTerminal();
        }
 
        /**
@@ -236,14 +235,18 @@ public class Execution implements Serializable {
        }
 
        public void setInitialState(
-               SerializedValue<StateHandle<?>> initialState,
-               Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
+                       SerializedValue<StateHandle<?>> initialState,
+                       Map<Integer, SerializedValue<StateHandle<?>>> 
initialKvState) {
+
+               if (initialKvState != null && initialKvState.size() > 0) {
+                       throw new UnsupportedOperationException("Error: 
inconsistent handling of key/value state snapshots");
+               }
 
                if (state != ExecutionState.CREATED) {
                        throw new IllegalArgumentException("Can only assign 
operator state when execution attempt is in CREATED");
                }
+
                this.operatorState = initialState;
-               this.operatorKvState = initialKvState;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -371,7 +374,6 @@ public class Execution implements Serializable {
                                attemptId,
                                slot,
                                operatorState,
-                               operatorKvState,
                                attemptNumber);
 
                        // register this execution at the execution graph, to 
receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4229105..b778fa6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -58,13 +59,14 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -104,15 +106,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *         about deployment of tasks and updates in the task status always use 
the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
- * 
- * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it 
can be archived by
- * sending it to an archive actor via an actor message. The execution graph 
does contain some
- * non-serializable fields. These fields are not required in the archived form 
and are cleared
- * in the {@link #prepareForArchiving()} method.</p>
  */
-public class ExecutionGraph implements Serializable {
-
-       private static final long serialVersionUID = 42L;
+public class ExecutionGraph {
 
        private static final AtomicReferenceFieldUpdater<ExecutionGraph, 
JobStatus> STATE_UPDATER =
                        
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, 
"state");
@@ -179,7 +174,7 @@ public class ExecutionGraph implements Serializable {
        // ------ Configuration of the Execution -------
 
        /** The execution configuration (see {@link ExecutionConfig}) related 
to this specific job. */
-       private SerializedValue<ExecutionConfig> serializedExecutionConfig;
+       private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
         * to deploy them immediately. */
@@ -208,30 +203,25 @@ public class ExecutionGraph implements Serializable {
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
        /** The scheduler to use for scheduling new tasks as they are needed */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private Scheduler scheduler;
 
        /** Strategy to use for restarts */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private RestartStrategy restartStrategy;
 
        /** The classloader for the user code. Needed for calls into user code 
classes */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ClassLoader userClassLoader;
 
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointCoordinator checkpointCoordinator;
 
        /** The coordinator for savepoints, if snapshot checkpoints are enabled 
*/
        private transient SavepointCoordinator savepointCoordinator;
 
-       /** Checkpoint stats tracker seperate from the coordinator in order to 
be
+       /** Checkpoint stats tracker separate from the coordinator in order to 
be
         * available after archiving. */
        private CheckpointStatsTracker checkpointStatsTracker;
 
        /** The execution context which is used to execute futures. */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ExecutionContext executionContext;
 
        // ------ Fields that are only relevant for archived execution graphs 
------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7d4be79..7b28b31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -42,20 +42,19 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
+
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ExecutionJobVertex implements Serializable {
-       
-       private static final long serialVersionUID = 42L;
-       
+public class ExecutionJobVertex {
+
        /** Use the same log for all ExecutionGraph classes */
        private static final Logger LOG = ExecutionGraph.LOG;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e20f466..08bf57f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -42,9 +42,9 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -69,9 +69,7 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex implements Serializable {
-
-       private static final long serialVersionUID = 42L;
+public class ExecutionVertex {
 
        private static final Logger LOG = ExecutionGraph.LOG;
 
@@ -91,6 +89,9 @@ public class ExecutionVertex implements Serializable {
 
        private final FiniteDuration timeout;
 
+       /** The name in the format "myTask (2/7)", cached to avoid frequent 
string concatenations */
+       private final String taskNameWithSubtask;
+
        private volatile CoLocationConstraint locationConstraint;
 
        private volatile Execution currentExecution;    // this field must 
never be null
@@ -115,8 +116,11 @@ public class ExecutionVertex implements Serializable {
                        IntermediateResult[] producedDataSets,
                        FiniteDuration timeout,
                        long createTimestamp) {
+
                this.jobVertex = jobVertex;
                this.subTaskIndex = subTaskIndex;
+               this.taskNameWithSubtask = String.format("%s (%d/%d)",
+                               jobVertex.getJobVertex().getName(), 
subTaskIndex + 1, jobVertex.getParallelism());
 
                this.resultPartitions = new 
LinkedHashMap<IntermediateResultPartitionID, 
IntermediateResultPartition>(producedDataSets.length, 1);
 
@@ -172,11 +176,7 @@ public class ExecutionVertex implements Serializable {
        }
 
        public String getTaskNameWithSubtaskIndex() {
-               return String.format(
-                               "%s (%d/%d)",
-                               jobVertex.getJobVertex().getName(),
-                               subTaskIndex + 1,
-                               getTotalNumberOfParallelSubtasks());
+               return this.taskNameWithSubtask;
        }
 
        public int getTotalNumberOfParallelSubtasks() {
@@ -547,10 +547,9 @@ public class ExecutionVertex implements Serializable {
         */
        public void prepareForArchiving() throws IllegalStateException {
                Execution execution = currentExecution;
-               ExecutionState state = execution.getState();
 
                // sanity check
-               if (!(state == FINISHED || state == CANCELED || state == 
FAILED)) {
+               if (!execution.isFinished()) {
                        throw new IllegalStateException("Cannot archive 
ExecutionVertex that is not in a finished state.");
                }
 
@@ -637,7 +636,6 @@ public class ExecutionVertex implements Serializable {
                        ExecutionAttemptID executionId,
                        SimpleSlot targetSlot,
                        SerializedValue<StateHandle<?>> operatorState,
-                       Map<Integer, SerializedValue<StateHandle<?>>> 
operatorKvState,
                        int attemptNumber) {
 
                // Produced intermediate results

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 71cc7f2..3abecc1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -469,7 +469,6 @@ public class StreamingJobGraphGenerator {
                                if (vertex.isInputVertex()) {
                                        triggerVertices.add(vertex.getID());
                                }
-                               // TODO: add check whether the user function 
implements the checkpointing interface
                                commitVertices.add(vertex.getID());
                                ackVertices.add(vertex.getID());
                        }

Reply via email to