Repository: flink Updated Branches: refs/heads/master 8305c1755 -> 8330539e4
[FLINK-3803] [runtime] Pass CheckpointStatsTracker to ExecutionGraph `CheckpointStatsTracker` was instantiated in `ExecutionGraph#enableSnapshotCheckpointing`, where the Flink configuration is not available to parse the configuration. Instead of instantiating the `CheckpointStatsTracker` in the `ExecutionGraph` method, we directly pass it to it. This closes #1927. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8330539e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8330539e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8330539e Branch: refs/heads/master Commit: 8330539e4770ce36576149410d66122a81daa03f Parents: 8305c17 Author: Ufuk Celebi <u...@apache.org> Authored: Mon Apr 25 15:15:47 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue Apr 26 10:22:09 2016 +0200 ---------------------------------------------------------------------- .../stats/SimpleCheckpointStatsTracker.java | 19 ++++----- .../runtime/executiongraph/ExecutionGraph.java | 37 +++++------------- .../flink/runtime/jobmanager/JobManager.scala | 20 +++++++++- ...ExecutionGraphCheckpointCoordinatorTest.java | 4 +- .../stats/SimpleCheckpointStatsTrackerTest.java | 41 ++++++++++---------- 5 files changed, 60 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java index 5ee4fc3..aea18e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint.stats; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.StateForTask; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import scala.Option; @@ -108,22 +108,19 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { public SimpleCheckpointStatsTracker( int historySize, - ExecutionVertex[] tasksToWaitFor) { + List<ExecutionJobVertex> tasksToWaitFor) { checkArgument(historySize >= 0); this.historySize = historySize; - // We know upfront, which tasks will ack the checkpoints. - if (tasksToWaitFor != null && tasksToWaitFor.length > 0) { - taskParallelism = new HashMap<>(); + // We know upfront which tasks will ack the checkpoints + if (tasksToWaitFor != null && !tasksToWaitFor.isEmpty()) { + taskParallelism = new HashMap<>(tasksToWaitFor.size()); - for (ExecutionVertex vertex : tasksToWaitFor) { - taskParallelism.put( - vertex.getJobvertexId(), - vertex.getTotalNumberOfParallelSubtasks()); + for (ExecutionJobVertex vertex : tasksToWaitFor) { + taskParallelism.put(vertex.getJobVertexId(), vertex.getParallelism()); } - } - else { + } else { taskParallelism = Collections.emptyMap(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3c20647..2ad7832 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,16 +19,15 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorSystem; - import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; -import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -38,17 +37,14 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.SavepointCoordinator; import org.apache.flink.runtime.checkpoint.StateStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.RecoveryMode; @@ -58,12 +54,10 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; -import org.apache.flink.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; - +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -72,14 +66,15 @@ import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Collection; -import java.util.HashSet; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -358,7 +353,8 @@ public class ExecutionGraph implements Serializable { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, - StateStore<CompletedCheckpoint> savepointStore) throws Exception { + StateStore<CompletedCheckpoint> savepointStore, + CheckpointStatsTracker statsTracker) throws Exception { // simple sanity checks if (interval < 10 || checkpointTimeout < 10) { @@ -375,20 +371,7 @@ public class ExecutionGraph implements Serializable { // disable to make sure existing checkpoint coordinators are cleared disableSnaphotCheckpointing(); - boolean isStatsDisabled = jobConfiguration.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); - - if (isStatsDisabled) { - checkpointStatsTracker = new DisabledCheckpointStatsTracker(); - } - else { - int historySize = jobConfiguration.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); - - checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor); - } + checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker"); // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index ba9e1ef..a8f1a0a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ +import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker} import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -1179,6 +1180,22 @@ class JobManager( val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) + // Checkpoint stats tracker + val isStatsDisabled: Boolean = flinkConfiguration.getBoolean( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE) + + val checkpointStatsTracker : CheckpointStatsTracker = + if (isStatsDisabled) { + new DisabledCheckpointStatsTracker() + } else { + val historySize: Int = flinkConfiguration.getInteger( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE) + + new SimpleCheckpointStatsTracker(historySize, ackVertices) + } + executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, @@ -1192,7 +1209,8 @@ class JobManager( checkpointIdCounter, completedCheckpoints, recoveryMode, - savepointStore) + savepointStore, + checkpointStatsTracker) } // get notified about job status changes http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index c788bef..d1c74d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -72,7 +73,8 @@ public class ExecutionGraphCheckpointCoordinatorTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()), RecoveryMode.STANDALONE, - new HeapStateStore<CompletedCheckpoint>()); + new HeapStateStore<CompletedCheckpoint>(), + new DisabledCheckpointStatsTracker()); CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator(); http://git-wip-us.apache.org/repos/asf/flink/blob/8330539e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index 56228ef..9751eeb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.StateForTask; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.SerializedValue; @@ -31,6 +31,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,7 +51,7 @@ public class SimpleCheckpointStatsTrackerTest { @Test public void testNoCompletedCheckpointYet() throws Exception { CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker( - 0, new ExecutionVertex[0]); + 0, Collections.<ExecutionJobVertex>emptyList()); assertFalse(tracker.getJobStats().isDefined()); assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined()); @@ -59,7 +60,7 @@ public class SimpleCheckpointStatsTrackerTest { @Test public void testRandomStats() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16); - ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); + List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); for (int i = 0; i < checkpoints.length; i++) { @@ -75,7 +76,7 @@ public class SimpleCheckpointStatsTrackerTest { @Test public void testIllegalOperatorId() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16); - ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); + List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); for (CompletedCheckpoint checkpoint : checkpoints) { @@ -90,7 +91,7 @@ public class SimpleCheckpointStatsTrackerTest { @Test public void testCompletedCheckpointReordering() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2); - ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); + List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); // First the second checkpoint notifies @@ -110,7 +111,7 @@ public class SimpleCheckpointStatsTrackerTest { @SuppressWarnings("unchecked") public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2); - ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); + List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); tracker.onCompletedCheckpoint(checkpoints[0]); @@ -230,12 +231,12 @@ public class SimpleCheckpointStatsTrackerTest { private static void verifySubtaskStats( CheckpointStatsTracker tracker, - ExecutionVertex[] tasksToWaitFor, + List<ExecutionJobVertex> tasksToWaitFor, CompletedCheckpoint checkpoint) { - for (ExecutionVertex vertex : tasksToWaitFor) { - JobVertexID operatorId = vertex.getJobvertexId(); - int parallelism = vertex.getTotalNumberOfParallelSubtasks(); + for (ExecutionJobVertex vertex : tasksToWaitFor) { + JobVertexID operatorId = vertex.getJobVertexId(); + int parallelism = vertex.getParallelism(); OperatorCheckpointStats actualStats = tracker.getOperatorStats(operatorId).get(); @@ -341,7 +342,8 @@ public class SimpleCheckpointStatsTrackerTest { return checkpoints; } - private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint checkpoint) { + private List<ExecutionJobVertex> createTasksToWaitFor(CompletedCheckpoint checkpoint) { + Map<JobVertexID, Integer> operators = new HashMap<>(); for (StateForTask state : checkpoint.getStates()) { @@ -355,17 +357,14 @@ public class SimpleCheckpointStatsTrackerTest { } } - ExecutionVertex[] tasksToWaitFor = new ExecutionVertex[operators.size()]; - - int i = 0; - for (JobVertexID operatorId : operators.keySet()) { - tasksToWaitFor[i] = mock(ExecutionVertex.class); - when(tasksToWaitFor[i].getJobvertexId()).thenReturn(operatorId); - when(tasksToWaitFor[i].getTotalNumberOfParallelSubtasks()).thenReturn(operators.get(operatorId)); - - i++; + List<ExecutionJobVertex> jobVertices = new ArrayList<>(checkpoint.getStates().size()); + for (JobVertexID vertexId : operators.keySet()) { + ExecutionJobVertex v = mock(ExecutionJobVertex.class); + when(v.getJobVertexId()).thenReturn(vertexId); + when(v.getParallelism()).thenReturn(operators.get(vertexId)); + jobVertices.add(v); } - return tasksToWaitFor; + return jobVertices; } }