This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 37756561d99 [FLINK-34344] Pass JobID to CheckpointStatsTracker
37756561d99 is described below

commit 37756561d99ff73ba8cbf445c57f57fe11250867
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Fri Feb 2 16:02:14 2024 +0100

    [FLINK-34344] Pass JobID to CheckpointStatsTracker
---
 .../runtime/checkpoint/CheckpointStatsTracker.java   |  6 ++++--
 .../scheduler/DefaultExecutionGraphFactory.java      |  3 ++-
 .../checkpoint/CheckpointCoordinatorFailureTest.java |  4 +++-
 .../CheckpointCoordinatorMasterHooksTest.java        |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java        | 20 +++++++++++++-------
 .../CheckpointCoordinatorTestingUtils.java           |  2 +-
 .../checkpoint/CheckpointStatsTrackerTest.java       | 12 ++++++------
 .../flink/runtime/dispatcher/DispatcherTest.java     |  4 +++-
 .../TestingDefaultExecutionGraphBuilder.java         |  3 ++-
 .../AbstractCheckpointStatsHandlerTest.java          |  4 +++-
 10 files changed, 38 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index cf66341fc06..ea04211d6f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -111,9 +111,11 @@ public class CheckpointStatsTracker {
      * @param numRememberedCheckpoints Maximum number of checkpoints to 
remember, including in
      *     progress ones.
      * @param metricGroup Metric group for exposed metrics
+     * @param jobID ID of the job being checkpointed
      */
-    public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup 
metricGroup) {
-        this(numRememberedCheckpoints, metricGroup, new JobID(), 
Integer.MAX_VALUE);
+    public CheckpointStatsTracker(
+            int numRememberedCheckpoints, MetricGroup metricGroup, JobID 
jobID) {
+        this(numRememberedCheckpoints, metricGroup, jobID, Integer.MAX_VALUE);
     }
 
     CheckpointStatsTracker(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index aaeb8b6d4c7..67e91a887a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements 
ExecutionGraphFactory {
                         () ->
                                 new CheckpointStatsTracker(
                                         
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
-                                        jobManagerJobMetricGroup));
+                                        jobManagerJobMetricGroup,
+                                        jobManagerJobMetricGroup.jobId()));
         this.isDynamicGraph = isDynamicGraph;
         this.executionJobVertexFactory = 
checkNotNull(executionJobVertexFactory);
         this.nonFinishedHybridPartitionShouldBeUnknown = 
nonFinishedHybridPartitionShouldBeUnknown;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 8873b938f1a..6e6bcffe762 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -212,7 +213,8 @@ class CheckpointCoordinatorFailureTest {
                 new FailingCompletedCheckpointStore(failure);
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         final AtomicInteger cleanupCallCount = new AtomicInteger(0);
         final CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e31d7811561..75e637dd078 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -500,7 +500,7 @@ class CheckpointCoordinatorMasterHooksTest {
                         new 
ExecutionGraphCheckpointPlanCalculatorContext(graph),
                         graph.getVerticesTopologically(),
                         false),
-                new CheckpointStatsTracker(1, new DummyMetricGroup()));
+                new CheckpointStatsTracker(1, new DummyMetricGroup(), new 
JobID()));
     }
 
     private static <T> T mockGeneric(Class<?> clazz) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5c63552be7d..222b5dd1d8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -251,7 +251,8 @@ class CheckpointCoordinatorTest {
         ExecutionVertex lateReportVertex =
                 
executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator coordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -501,7 +502,8 @@ class CheckpointCoordinatorTest {
         
jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -743,7 +745,8 @@ class CheckpointCoordinatorTest {
         // given: Checkpoint coordinator which fails on 
initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setCheckpointStatsTracker(statsTracker)
@@ -2017,7 +2020,8 @@ class CheckpointCoordinatorTest {
         ExecutionAttemptID attemptID1 = 
vertex1.getCurrentExecutionAttempt().getAttemptId();
         ExecutionAttemptID attemptID2 = 
vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -2868,7 +2872,7 @@ class CheckpointCoordinatorTest {
 
         // set up the coordinator and validate the initial state
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), 
new JobID());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setCompletedCheckpointStore(store)
@@ -3186,7 +3190,8 @@ class CheckpointCoordinatorTest {
         TestFailJobCallback failureCallback = new TestFailJobCallback();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
 
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -3236,7 +3241,8 @@ class CheckpointCoordinatorTest {
         TestFailJobCallback failureCallback = new TestFailJobCallback();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
 
         final String exceptionMsg = "Test store exception.";
         try (SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 03d8d5ced59..aac4ab51b36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -781,7 +781,7 @@ public class CheckpointCoordinatorTestingUtils {
         private boolean allowCheckpointsAfterTasksFinished;
 
         private CheckpointStatsTracker checkpointStatsTracker =
-                new CheckpointStatsTracker(1, new DummyMetricGroup());
+                new CheckpointStatsTracker(1, new DummyMetricGroup(), new 
JobID());
 
         private BiFunction<
                         Set<ExecutionJobVertex>,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index a4dc88d5197..83ce318baea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -66,7 +66,7 @@ class CheckpointStatsTrackerTest {
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(), 
new JobID());
 
         PendingCheckpointStats pending =
                 tracker.reportPendingCheckpoint(
@@ -114,7 +114,7 @@ class CheckpointStatsTrackerTest {
                 singletonMap(jobVertexID, jobVertex.getParallelism());
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), 
new JobID());
 
         // Completed checkpoint
         PendingCheckpointStats completed1 =
@@ -240,7 +240,7 @@ class CheckpointStatsTrackerTest {
     void testCreateSnapshot() {
         JobVertexID jobVertexID = new JobVertexID();
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), 
new JobID());
 
         CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
 
@@ -295,7 +295,7 @@ class CheckpointStatsTrackerTest {
                     }
                 };
 
-        CheckpointStatsTracker tracker = new CheckpointStatsTracker(10, 
metricGroup);
+        CheckpointStatsTracker tracker = new CheckpointStatsTracker(10, 
metricGroup, new JobID());
 
         PendingCheckpointStats pending =
                 tracker.reportPendingCheckpoint(
@@ -422,7 +422,7 @@ class CheckpointStatsTrackerTest {
                     }
                 };
 
-        new CheckpointStatsTracker(0, metricGroup);
+        new CheckpointStatsTracker(0, metricGroup, new JobID());
 
         // Make sure this test is adjusted when further metrics are added
         assertThat(registeredGaugeNames)
@@ -471,7 +471,7 @@ class CheckpointStatsTrackerTest {
                         .build(EXECUTOR_RESOURCE.getExecutor());
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
-        CheckpointStatsTracker stats = new CheckpointStatsTracker(0, 
metricGroup);
+        CheckpointStatsTracker stats = new CheckpointStatsTracker(0, 
metricGroup, new JobID());
 
         // Make sure to adjust this test if metrics are added/removed
         assertThat(registeredGauges).hasSize(12);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 463a737ac3e..e7bfc30ab17 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -737,7 +737,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
     private CheckpointStatsSnapshot 
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
         CheckpointStatsTracker checkpointStatsTracker =
                 new CheckpointStatsTracker(
-                        10, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+                        10,
+                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                        new JobID());
         checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
         checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
         return checkpointStatsTracker.createSnapshot();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 58e38d62d45..1bbcbe43388 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RpcOptions;
@@ -193,7 +194,7 @@ public class TestingDefaultExecutionGraphBuilder {
                 new DefaultVertexAttemptNumberStore(),
                 Optional.ofNullable(vertexParallelismStore)
                         .orElseGet(() -> 
SchedulerBase.computeVertexParallelismStore(jobGraph)),
-                () -> new CheckpointStatsTracker(0, new 
UnregisteredMetricsGroup()),
+                () -> new CheckpointStatsTracker(0, new 
UnregisteredMetricsGroup(), new JobID()),
                 isDynamicGraph,
                 executionJobVertexFactory,
                 markPartitionFinishedStrategy,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
index 9f629a84916..c1ad110491c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -63,7 +63,9 @@ class AbstractCheckpointStatsHandlerTest {
 
     private static final CheckpointStatsTracker checkpointStatsTracker =
             new CheckpointStatsTracker(
-                    10, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+                    10,
+                    
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                    new JobID());
 
     @Test
     void testRetrieveSnapshotFromCache() throws Exception {

Reply via email to