This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 37756561d99 [FLINK-34344] Pass JobID to CheckpointStatsTracker 37756561d99 is described below commit 37756561d99ff73ba8cbf445c57f57fe11250867 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Fri Feb 2 16:02:14 2024 +0100 [FLINK-34344] Pass JobID to CheckpointStatsTracker --- .../runtime/checkpoint/CheckpointStatsTracker.java | 6 ++++-- .../scheduler/DefaultExecutionGraphFactory.java | 3 ++- .../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++- .../CheckpointCoordinatorMasterHooksTest.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 20 +++++++++++++------- .../CheckpointCoordinatorTestingUtils.java | 2 +- .../checkpoint/CheckpointStatsTrackerTest.java | 12 ++++++------ .../flink/runtime/dispatcher/DispatcherTest.java | 4 +++- .../TestingDefaultExecutionGraphBuilder.java | 3 ++- .../AbstractCheckpointStatsHandlerTest.java | 4 +++- 10 files changed, 38 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index cf66341fc06..ea04211d6f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -111,9 +111,11 @@ public class CheckpointStatsTracker { * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in * progress ones. * @param metricGroup Metric group for exposed metrics + * @param jobID ID of the job being checkpointed */ - public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) { - this(numRememberedCheckpoints, metricGroup, new JobID(), Integer.MAX_VALUE); + public CheckpointStatsTracker( + int numRememberedCheckpoints, MetricGroup metricGroup, JobID jobID) { + this(numRememberedCheckpoints, metricGroup, jobID, Integer.MAX_VALUE); } CheckpointStatsTracker( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index aaeb8b6d4c7..67e91a887a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { () -> new CheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), - jobManagerJobMetricGroup)); + jobManagerJobMetricGroup, + jobManagerJobMetricGroup.jobId())); this.isDynamicGraph = isDynamicGraph; this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 8873b938f1a..6e6bcffe762 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; @@ -212,7 +213,8 @@ class CheckpointCoordinatorFailureTest { new FailingCompletedCheckpointStore(failure); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); final AtomicInteger cleanupCallCount = new AtomicInteger(0); final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index e31d7811561..75e637dd078 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -500,7 +500,7 @@ class CheckpointCoordinatorMasterHooksTest { new ExecutionGraphCheckpointPlanCalculatorContext(graph), graph.getVerticesTopologically(), false), - new CheckpointStatsTracker(1, new DummyMetricGroup())); + new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID())); } private static <T> T mockGeneric(Class<?> clazz) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 5c63552be7d..222b5dd1d8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -251,7 +251,8 @@ class CheckpointCoordinatorTest { ExecutionVertex lateReportVertex = executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0]; CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder() .setTimer(manuallyTriggeredScheduledExecutor) @@ -501,7 +502,8 @@ class CheckpointCoordinatorTest { jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished(); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setTimer(manuallyTriggeredScheduledExecutor) @@ -743,7 +745,8 @@ class CheckpointCoordinatorTest { // given: Checkpoint coordinator which fails on initializeCheckpointLocation. TestFailJobCallback failureCallback = new TestFailJobCallback(); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setCheckpointStatsTracker(statsTracker) @@ -2017,7 +2020,8 @@ class CheckpointCoordinatorTest { ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId(); ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId(); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); // set up the coordinator and validate the initial state CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() @@ -2868,7 +2872,7 @@ class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointStatsTracker tracker = - new CheckpointStatsTracker(10, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), new JobID()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setCompletedCheckpointStore(store) @@ -3186,7 +3190,8 @@ class CheckpointCoordinatorTest { TestFailJobCallback failureCallback = new TestFailJobCallback(); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() @@ -3236,7 +3241,8 @@ class CheckpointCoordinatorTest { TestFailJobCallback failureCallback = new TestFailJobCallback(); CheckpointStatsTracker statsTracker = - new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker( + Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); final String exceptionMsg = "Test store exception."; try (SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 03d8d5ced59..aac4ab51b36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -781,7 +781,7 @@ public class CheckpointCoordinatorTestingUtils { private boolean allowCheckpointsAfterTasksFinished; private CheckpointStatsTracker checkpointStatsTracker = - new CheckpointStatsTracker(1, new DummyMetricGroup()); + new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID()); private BiFunction< Set<ExecutionJobVertex>, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index a4dc88d5197..83ce318baea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -66,7 +66,7 @@ class CheckpointStatsTrackerTest { ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID); CheckpointStatsTracker tracker = - new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(), new JobID()); PendingCheckpointStats pending = tracker.reportPendingCheckpoint( @@ -114,7 +114,7 @@ class CheckpointStatsTrackerTest { singletonMap(jobVertexID, jobVertex.getParallelism()); CheckpointStatsTracker tracker = - new CheckpointStatsTracker(10, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), new JobID()); // Completed checkpoint PendingCheckpointStats completed1 = @@ -240,7 +240,7 @@ class CheckpointStatsTrackerTest { void testCreateSnapshot() { JobVertexID jobVertexID = new JobVertexID(); CheckpointStatsTracker tracker = - new CheckpointStatsTracker(10, new UnregisteredMetricsGroup()); + new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), new JobID()); CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot(); @@ -295,7 +295,7 @@ class CheckpointStatsTrackerTest { } }; - CheckpointStatsTracker tracker = new CheckpointStatsTracker(10, metricGroup); + CheckpointStatsTracker tracker = new CheckpointStatsTracker(10, metricGroup, new JobID()); PendingCheckpointStats pending = tracker.reportPendingCheckpoint( @@ -422,7 +422,7 @@ class CheckpointStatsTrackerTest { } }; - new CheckpointStatsTracker(0, metricGroup); + new CheckpointStatsTracker(0, metricGroup, new JobID()); // Make sure this test is adjusted when further metrics are added assertThat(registeredGaugeNames) @@ -471,7 +471,7 @@ class CheckpointStatsTrackerTest { .build(EXECUTOR_RESOURCE.getExecutor()); ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID); - CheckpointStatsTracker stats = new CheckpointStatsTracker(0, metricGroup); + CheckpointStatsTracker stats = new CheckpointStatsTracker(0, metricGroup, new JobID()); // Make sure to adjust this test if metrics are added/removed assertThat(registeredGauges).hasSize(12); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 463a737ac3e..e7bfc30ab17 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -737,7 +737,9 @@ public class DispatcherTest extends AbstractDispatcherTest { private CheckpointStatsSnapshot getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() { CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( - 10, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup()); + 10, + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + new JobID()); checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress(); checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress(); return checkpointStatsTracker.createSnapshot(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 58e38d62d45..1bbcbe43388 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RpcOptions; @@ -193,7 +194,7 @@ public class TestingDefaultExecutionGraphBuilder { new DefaultVertexAttemptNumberStore(), Optional.ofNullable(vertexParallelismStore) .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)), - () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()), + () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(), new JobID()), isDynamicGraph, executionJobVertexFactory, markPartitionFinishedStrategy, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java index 9f629a84916..c1ad110491c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java @@ -63,7 +63,9 @@ class AbstractCheckpointStatsHandlerTest { private static final CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( - 10, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup()); + 10, + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + new JobID()); @Test void testRetrieveSnapshotFromCache() throws Exception {