Repository: flink
Updated Branches:
  refs/heads/master 8305c1755 -> 8330539e4


[FLINK-3803] [runtime] Pass CheckpointStatsTracker to ExecutionGraph

`CheckpointStatsTracker` was instantiated in 
`ExecutionGraph#enableSnapshotCheckpointing`,
where the Flink configuration is not available to parse the configuration.

Instead of instantiating the `CheckpointStatsTracker` in the `ExecutionGraph`
method, we directly pass it to it.

This closes #1927.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8330539e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8330539e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8330539e

Branch: refs/heads/master
Commit: 8330539e4770ce36576149410d66122a81daa03f
Parents: 8305c17
Author: Ufuk Celebi <u...@apache.org>
Authored: Mon Apr 25 15:15:47 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Apr 26 10:22:09 2016 +0200

----------------------------------------------------------------------
 .../stats/SimpleCheckpointStatsTracker.java     | 19 ++++-----
 .../runtime/executiongraph/ExecutionGraph.java  | 37 +++++-------------
 .../flink/runtime/jobmanager/JobManager.scala   | 20 +++++++++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  4 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java | 41 ++++++++++----------
 5 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 5ee4fc3..aea18e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
@@ -108,22 +108,19 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
 
        public SimpleCheckpointStatsTracker(
                        int historySize,
-                       ExecutionVertex[] tasksToWaitFor) {
+                       List<ExecutionJobVertex> tasksToWaitFor) {
 
                checkArgument(historySize >= 0);
                this.historySize = historySize;
 
-               // We know upfront, which tasks will ack the checkpoints.
-               if (tasksToWaitFor != null && tasksToWaitFor.length > 0) {
-                       taskParallelism = new HashMap<>();
+               // We know upfront which tasks will ack the checkpoints
+               if (tasksToWaitFor != null && !tasksToWaitFor.isEmpty()) {
+                       taskParallelism = new HashMap<>(tasksToWaitFor.size());
 
-                       for (ExecutionVertex vertex : tasksToWaitFor) {
-                               taskParallelism.put(
-                                               vertex.getJobvertexId(),
-                                               
vertex.getTotalNumberOfParallelSubtasks());
+                       for (ExecutionJobVertex vertex : tasksToWaitFor) {
+                               taskParallelism.put(vertex.getJobVertexId(), 
vertex.getParallelism());
                        }
-               }
-               else {
+               } else {
                        taskParallelism = Collections.emptyMap();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3c20647..2ad7832 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -38,17 +37,14 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.StateStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
@@ -58,12 +54,10 @@ import 
org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
-
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -72,14 +66,15 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -358,7 +353,8 @@ public class ExecutionGraph implements Serializable {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        RecoveryMode recoveryMode,
-                       StateStore<CompletedCheckpoint> savepointStore) throws 
Exception {
+                       StateStore<CompletedCheckpoint> savepointStore,
+                       CheckpointStatsTracker statsTracker) throws Exception {
 
                // simple sanity checks
                if (interval < 10 || checkpointTimeout < 10) {
@@ -375,20 +371,7 @@ public class ExecutionGraph implements Serializable {
                // disable to make sure existing checkpoint coordinators are 
cleared
                disableSnaphotCheckpointing();
 
-               boolean isStatsDisabled = jobConfiguration.getBoolean(
-                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-               if (isStatsDisabled) {
-                       checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
-               }
-               else {
-                       int historySize = jobConfiguration.getInteger(
-                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-
-                       checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, tasksToWaitFor);
-               }
+               checkpointStatsTracker = Objects.requireNonNull(statsTracker, 
"Checkpoint stats tracker");
 
                // create the coordinator that triggers and commits checkpoints 
and holds the state
                checkpointCoordinator = new CheckpointCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ba9e1ef..a8f1a0a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
+import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -1179,6 +1180,22 @@ class JobManager(
 
           val checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
 
+          // Checkpoint stats tracker
+          val isStatsDisabled: Boolean = flinkConfiguration.getBoolean(
+            ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+            ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE)
+
+          val checkpointStatsTracker : CheckpointStatsTracker =
+            if (isStatsDisabled) {
+              new DisabledCheckpointStatsTracker()
+            } else {
+              val historySize: Int = flinkConfiguration.getInteger(
+                ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
+
+              new SimpleCheckpointStatsTracker(historySize, ackVertices)
+            }
+
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
@@ -1192,7 +1209,8 @@ class JobManager(
             checkpointIdCounter,
             completedCheckpoints,
             recoveryMode,
-            savepointStore)
+            savepointStore,
+            checkpointStatsTracker)
         }
 
         // get notified about job status changes

http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index c788bef..d1c74d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -72,7 +73,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
                                        RecoveryMode.STANDALONE,
-                                       new 
HeapStateStore<CompletedCheckpoint>());
+                                       new 
HeapStateStore<CompletedCheckpoint>(),
+                                       new DisabledCheckpointStatsTracker());
 
                        CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                        SavepointCoordinator savepointCoordinator = 
executionGraph.getSavepointCoordinator();

http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 56228ef..9751eeb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +51,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @Test
        public void testNoCompletedCheckpointYet() throws Exception {
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(
-                               0, new ExecutionVertex[0]);
+                               0, Collections.<ExecutionJobVertex>emptyList());
 
                assertFalse(tracker.getJobStats().isDefined());
                assertFalse(tracker.getOperatorStats(new 
JobVertexID()).isDefined());
@@ -59,7 +60,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @Test
        public void testRandomStats() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(16);
-               ExecutionVertex[] tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
+               List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
                for (int i = 0; i < checkpoints.length; i++) {
@@ -75,7 +76,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @Test
        public void testIllegalOperatorId() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(16);
-               ExecutionVertex[] tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
+               List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
                for (CompletedCheckpoint checkpoint : checkpoints) {
@@ -90,7 +91,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @Test
        public void testCompletedCheckpointReordering() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(2);
-               ExecutionVertex[] tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
+               List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
                // First the second checkpoint notifies
@@ -110,7 +111,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @SuppressWarnings("unchecked")
        public void testOperatorStateCachedClearedOnNewCheckpoint() throws 
Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(2);
-               ExecutionVertex[] tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
+               List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
                tracker.onCompletedCheckpoint(checkpoints[0]);
@@ -230,12 +231,12 @@ public class SimpleCheckpointStatsTrackerTest {
 
        private static void verifySubtaskStats(
                        CheckpointStatsTracker tracker,
-                       ExecutionVertex[] tasksToWaitFor,
+                       List<ExecutionJobVertex> tasksToWaitFor,
                        CompletedCheckpoint checkpoint) {
 
-               for (ExecutionVertex vertex : tasksToWaitFor) {
-                       JobVertexID operatorId = vertex.getJobvertexId();
-                       int parallelism = 
vertex.getTotalNumberOfParallelSubtasks();
+               for (ExecutionJobVertex vertex : tasksToWaitFor) {
+                       JobVertexID operatorId = vertex.getJobVertexId();
+                       int parallelism = vertex.getParallelism();
 
                        OperatorCheckpointStats actualStats = 
tracker.getOperatorStats(operatorId).get();
 
@@ -341,7 +342,8 @@ public class SimpleCheckpointStatsTrackerTest {
                return checkpoints;
        }
 
-       private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint 
checkpoint) {
+       private List<ExecutionJobVertex> 
createTasksToWaitFor(CompletedCheckpoint checkpoint) {
+
                Map<JobVertexID, Integer> operators = new HashMap<>();
 
                for (StateForTask state : checkpoint.getStates()) {
@@ -355,17 +357,14 @@ public class SimpleCheckpointStatsTrackerTest {
                        }
                }
 
-               ExecutionVertex[] tasksToWaitFor = new 
ExecutionVertex[operators.size()];
-
-               int i = 0;
-               for (JobVertexID operatorId : operators.keySet()) {
-                       tasksToWaitFor[i] = mock(ExecutionVertex.class);
-                       
when(tasksToWaitFor[i].getJobvertexId()).thenReturn(operatorId);
-                       
when(tasksToWaitFor[i].getTotalNumberOfParallelSubtasks()).thenReturn(operators.get(operatorId));
-
-                       i++;
+               List<ExecutionJobVertex> jobVertices = new 
ArrayList<>(checkpoint.getStates().size());
+               for (JobVertexID vertexId : operators.keySet()) {
+                       ExecutionJobVertex v = mock(ExecutionJobVertex.class);
+                       when(v.getJobVertexId()).thenReturn(vertexId);
+                       
when(v.getParallelism()).thenReturn(operators.get(vertexId));
+                       jobVertices.add(v);
                }
 
-               return tasksToWaitFor;
+               return jobVertices;
        }
 }

Reply via email to