This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 4dad2e29a09 [FLINK-29132][rest] Cleanup subtask attempt metrics 
according to the JobDetails to avoid memory leak.
4dad2e29a09 is described below

commit 4dad2e29a090c731a0474a80e320264040c348ce
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Thu Sep 1 17:08:53 2022 +0800

    [FLINK-29132][rest] Cleanup subtask attempt metrics according to the 
JobDetails to avoid memory leak.
    
    This closes #20733.
---
 .../runtime/messages/webmonitor/JobDetails.java    |  96 ++++++++---------
 .../handler/job/JobVertexBackPressureHandler.java  |  34 +++---
 .../rest/handler/legacy/metrics/MetricStore.java   |  49 +++++++--
 .../messages/webmonitor/JobDetailsTest.java        |  31 ------
 .../job/JobVertexBackPressureHandlerTest.java      |  12 +--
 .../handler/legacy/metrics/MetricStoreTest.java    | 114 ++++++++++++++++-----
 6 files changed, 198 insertions(+), 138 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 74b2964228c..04fce9483c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -40,8 +41,11 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -60,8 +64,6 @@ public class JobDetails implements Serializable {
     private static final String FIELD_NAME_STATUS = "state";
     private static final String FIELD_NAME_LAST_MODIFICATION = 
"last-modification";
     private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
-    private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS =
-            "current-execution-attempts";
 
     private final JobID jobId;
 
@@ -84,10 +86,12 @@ public class JobDetails implements Serializable {
     /**
      * The map holds the attempt number of the current execution attempt in 
the Execution, which is
      * considered as the representing execution for the subtask of the vertex. 
The keys and values
-     * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is 
used to accumulate
-     * the metrics of a subtask in MetricFetcher.
+     * are JobVertexID -> SubtaskIndex -> CurrenAttempts info.
+     *
+     * <p>The field is excluded from the json. Any usage from the web UI and 
the history server is
+     * not allowed.
      */
-    private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+    private final Map<String, Map<Integer, CurrentAttempts>> 
currentExecutionAttempts;
 
     @VisibleForTesting
     public JobDetails(
@@ -123,7 +127,7 @@ public class JobDetails implements Serializable {
             long lastUpdateTime,
             int[] tasksPerState,
             int numTasks,
-            Map<String, Map<Integer, Integer>> currentExecutionAttempts) {
+            Map<String, Map<Integer, CurrentAttempts>> 
currentExecutionAttempts) {
         this.jobId = checkNotNull(jobId);
         this.jobName = checkNotNull(jobName);
         this.startTime = startTime;
@@ -150,22 +154,25 @@ public class JobDetails implements Serializable {
         int[] countsPerStatus = new int[ExecutionState.values().length];
         long lastChanged = 0;
         int numTotalTasks = 0;
-        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new 
HashMap<>();
+        Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts = 
new HashMap<>();
 
         for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
             AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
             numTotalTasks += taskVertices.length;
-            Map<Integer, Integer> vertexAttempts = new HashMap<>();
+            Map<Integer, CurrentAttempts> vertexAttempts = new HashMap<>();
 
             for (AccessExecutionVertex taskVertex : taskVertices) {
-                if (taskVertex.getCurrentExecutions().size() > 1) {
-                    vertexAttempts.put(
-                            taskVertex.getParallelSubtaskIndex(),
-                            
taskVertex.getCurrentExecutionAttempt().getAttemptNumber());
-                }
                 ExecutionState state = taskVertex.getExecutionState();
                 countsPerStatus[state.ordinal()]++;
                 lastChanged = Math.max(lastChanged, 
taskVertex.getStateTimestamp(state));
+
+                vertexAttempts.put(
+                        taskVertex.getParallelSubtaskIndex(),
+                        new CurrentAttempts(
+                                
taskVertex.getCurrentExecutionAttempt().getAttemptNumber(),
+                                taskVertex.getCurrentExecutions().stream()
+                                        .map(AccessExecution::getAttemptNumber)
+                                        .collect(Collectors.toSet())));
             }
 
             if (!vertexAttempts.isEmpty()) {
@@ -226,7 +233,7 @@ public class JobDetails implements Serializable {
         return tasksPerState;
     }
 
-    public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() {
+    public Map<String, Map<Integer, CurrentAttempts>> 
getCurrentExecutionAttempts() {
         return currentExecutionAttempts;
     }
     // ------------------------------------------------------------------------
@@ -326,20 +333,6 @@ public class JobDetails implements Serializable {
 
             jsonGenerator.writeEndObject();
 
-            if (!jobDetails.currentExecutionAttempts.isEmpty()) {
-                
jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
-                for (Map.Entry<String, Map<Integer, Integer>> vertex :
-                        jobDetails.currentExecutionAttempts.entrySet()) {
-                    jsonGenerator.writeObjectFieldStart(vertex.getKey());
-                    for (Map.Entry<Integer, Integer> attempt : 
vertex.getValue().entrySet()) {
-                        jsonGenerator.writeNumberField(
-                                String.valueOf(attempt.getKey()), 
attempt.getValue());
-                    }
-                    jsonGenerator.writeEndObject();
-                }
-                jsonGenerator.writeEndObject();
-            }
-
             jsonGenerator.writeEndObject();
         }
     }
@@ -379,28 +372,6 @@ public class JobDetails implements Serializable {
                         jsonNode == null ? 0 : jsonNode.intValue();
             }
 
-            Map<String, Map<Integer, Integer>> attempts = new HashMap<>();
-            JsonNode attemptsNode = 
rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
-            if (attemptsNode != null) {
-                attemptsNode
-                        .fields()
-                        .forEachRemaining(
-                                vertex -> {
-                                    String vertexId = vertex.getKey();
-                                    Map<Integer, Integer> vertexAttempts =
-                                            attempts.computeIfAbsent(
-                                                    vertexId, k -> new 
HashMap<>());
-                                    vertex.getValue()
-                                            .fields()
-                                            .forEachRemaining(
-                                                    attempt ->
-                                                            vertexAttempts.put(
-                                                                    
Integer.parseInt(
-                                                                            
attempt.getKey()),
-                                                                    
attempt.getValue().intValue()));
-                                });
-            }
-
             return new JobDetails(
                     jobId,
                     jobName,
@@ -411,7 +382,30 @@ public class JobDetails implements Serializable {
                     lastUpdateTime,
                     numVerticesPerExecutionState,
                     numTasks,
-                    attempts);
+                    new HashMap<>());
+        }
+    }
+
+    /**
+     * The CurrentAttempts holds the attempt number of the current 
representative execution attempt,
+     * and the attempt numbers of all the running attempts.
+     */
+    public static final class CurrentAttempts implements Serializable {
+        private final int representativeAttempt;
+
+        private final Set<Integer> currentAttempts;
+
+        public CurrentAttempts(int representativeAttempt, Set<Integer> 
currentAttempts) {
+            this.representativeAttempt = representativeAttempt;
+            this.currentAttempts = 
Collections.unmodifiableSet(currentAttempts);
+        }
+
+        public int getRepresentativeAttempt() {
+            return representativeAttempt;
+        }
+
+        public Set<Integer> getCurrentAttempts() {
+            return currentAttempts;
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
index 898b74ba0d0..57d3e4207cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
@@ -82,23 +82,23 @@ public class JobVertexBackPressureHandler
                 metricFetcher
                         .getMetricStore()
                         .getTaskMetricStore(jobId.toString(), 
jobVertexId.toString());
-        Map<String, Map<Integer, Integer>> jobCurrentExecutions =
-                
metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString());
-        Map<Integer, Integer> currentExecutionAttempts =
-                jobCurrentExecutions != null
-                        ? jobCurrentExecutions.get(jobVertexId.toString())
+        Map<String, Map<Integer, Integer>> jobRepresentativeExecutions =
+                
metricFetcher.getMetricStore().getRepresentativeAttempts().get(jobId.toString());
+        Map<Integer, Integer> representativeAttempts =
+                jobRepresentativeExecutions != null
+                        ? 
jobRepresentativeExecutions.get(jobVertexId.toString())
                         : null;
 
         return CompletableFuture.completedFuture(
                 taskMetricStore != null
-                        ? createJobVertexBackPressureInfo(taskMetricStore, 
currentExecutionAttempts)
+                        ? createJobVertexBackPressureInfo(taskMetricStore, 
representativeAttempts)
                         : JobVertexBackPressureInfo.deprecated());
     }
 
     private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
-            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
representativeAttempts) {
         List<SubtaskBackPressureInfo> subtaskBackPressureInfos =
-                createSubtaskBackPressureInfo(taskMetricStore, 
currentExecutionAttempts);
+                createSubtaskBackPressureInfo(taskMetricStore, 
representativeAttempts);
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
                 
getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)),
@@ -107,7 +107,7 @@ public class JobVertexBackPressureHandler
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
currentExecutionAttempts) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> 
representativeAttempts) {
         Map<Integer, SubtaskMetricStore> subtaskMetricStores =
                 taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new 
ArrayList<>(subtaskMetricStores.size());
@@ -121,19 +121,19 @@ public class JobVertexBackPressureHandler
                         createSubtaskAttemptBackpressureInfo(
                                 subtaskIndex, null, subtaskMetricStore, null));
             } else {
-                int currentAttempt =
-                        currentExecutionAttempts == null
+                int representativeAttempt =
+                        representativeAttempts == null
                                 ? -1
-                                : 
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
-                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                                : 
representativeAttempts.getOrDefault(subtaskIndex, -1);
+                if 
(!allAttemptsMetricStores.containsKey(representativeAttempt)) {
                     // allAttemptsMetricStores is not empty here
-                    currentAttempt = 
allAttemptsMetricStores.keySet().iterator().next();
+                    representativeAttempt = 
allAttemptsMetricStores.keySet().iterator().next();
                 }
                 List<SubtaskBackPressureInfo> otherConcurrentAttempts =
                         new ArrayList<>(allAttemptsMetricStores.size() - 1);
                 for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
                         allAttemptsMetricStores.entrySet()) {
-                    if (attemptStore.getKey() == currentAttempt) {
+                    if (attemptStore.getKey() == representativeAttempt) {
                         continue;
                     }
                     otherConcurrentAttempts.add(
@@ -146,8 +146,8 @@ public class JobVertexBackPressureHandler
                 result.add(
                         createSubtaskAttemptBackpressureInfo(
                                 subtaskIndex,
-                                currentAttempt,
-                                allAttemptsMetricStores.get(currentAttempt),
+                                representativeAttempt,
+                                
allAttemptsMetricStores.get(representativeAttempt),
                                 otherConcurrentAttempts));
             }
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index c86ac763f9c..1908afa7427 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -63,7 +65,7 @@ public class MetricStore {
      * CurrentExecutionAttemptNumber. When a metric of an execution attempt is 
added, the metric can
      * also be added to the SubtaskMetricStore when it is of the representing 
execution.
      */
-    private final Map<String, Map<String, Map<Integer, Integer>>> 
currentExecutionAttempts =
+    private final Map<String, Map<String, Map<Integer, Integer>>> 
representativeAttempts =
             new ConcurrentHashMap<>();
 
     /**
@@ -82,18 +84,39 @@ public class MetricStore {
      */
     synchronized void retainJobs(List<String> activeJobs) {
         jobs.keySet().retainAll(activeJobs);
-        currentExecutionAttempts.keySet().retainAll(activeJobs);
+        representativeAttempts.keySet().retainAll(activeJobs);
     }
 
     public synchronized void 
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
-        jobs.forEach(
-                job ->
-                        currentExecutionAttempts.put(
-                                job.getJobId().toString(), 
job.getCurrentExecutionAttempts()));
+        for (JobDetails job : jobs) {
+            String jobId = job.getJobId().toString();
+            Map<String, Map<Integer, CurrentAttempts>> currentAttempts =
+                    job.getCurrentExecutionAttempts();
+            Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+                    representativeAttempts.compute(
+                            jobId, (k, overwritten) -> new 
HashMap<>(currentAttempts.size()));
+            currentAttempts.forEach(
+                    (vertexId, subtaskAttempts) -> {
+                        Map<Integer, Integer> vertexAttempts =
+                                jobRepresentativeAttempts.compute(
+                                        vertexId, (k, overwritten) -> new 
HashMap<>());
+                        TaskMetricStore taskMetricStore = 
getTaskMetricStore(jobId, vertexId);
+                        subtaskAttempts.forEach(
+                                (subtaskIndex, attempts) -> {
+                                    // Updates representative attempts
+                                    vertexAttempts.put(
+                                            subtaskIndex, 
attempts.getRepresentativeAttempt());
+                                    // Retains current attempt metrics to 
avoid memory leak
+                                    taskMetricStore
+                                            
.getSubtaskMetricStore(subtaskIndex)
+                                            
.retainAttempts(attempts.getCurrentAttempts());
+                                });
+                    });
+        }
     }
 
-    public Map<String, Map<String, Map<Integer, Integer>>> 
getCurrentExecutionAttempts() {
-        return currentExecutionAttempts;
+    public Map<String, Map<String, Map<Integer, Integer>>> 
getRepresentativeAttempts() {
+        return representativeAttempts;
     }
 
     /**
@@ -341,7 +364,7 @@ public class MetricStore {
     // which means there should be only one execution
     private boolean isRepresentativeAttempt(
             String jobID, String vertexID, int subtaskIndex, int 
attemptNumber) {
-        return Optional.of(currentExecutionAttempts)
+        return Optional.of(representativeAttempts)
                         .map(m -> m.get(jobID))
                         .map(m -> m.get(vertexID))
                         .map(m -> m.get(subtaskIndex))
@@ -507,6 +530,14 @@ public class MetricStore {
             return unmodifiableMap(attempts);
         }
 
+        void retainAttempts(Set<Integer> currentAttempts) {
+            int latestAttempt = currentAttempts.stream().mapToInt(i -> 
i).max().orElse(0);
+            attempts.keySet()
+                    .removeIf(
+                            attempt ->
+                                    attempt < latestAttempt && 
!currentAttempts.contains(attempt));
+        }
+
         private static SubtaskMetricStore unmodifiable(SubtaskMetricStore 
source) {
             if (source == null) {
                 return null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 609ef1f8e0d..790ca43ce7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,8 +29,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -104,33 +102,4 @@ class JobDetailsTest {
 
         assertThat(unmarshalled).isEqualTo(expected);
     }
-
-    @Test
-    void testJobDetailsWithExecutionAttemptsMarshalling() throws 
JsonProcessingException {
-        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new 
HashMap<>();
-        currentExecutionAttempts.computeIfAbsent("a", k -> new 
HashMap<>()).put(1, 2);
-        currentExecutionAttempts.computeIfAbsent("a", k -> new 
HashMap<>()).put(2, 4);
-        currentExecutionAttempts.computeIfAbsent("b", k -> new 
HashMap<>()).put(3, 1);
-
-        final JobDetails expected =
-                new JobDetails(
-                        new JobID(),
-                        "foobar",
-                        1L,
-                        10L,
-                        9L,
-                        JobStatus.RUNNING,
-                        8L,
-                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
-                        42,
-                        currentExecutionAttempts);
-
-        final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
-
-        final JsonNode marshalled = objectMapper.valueToTree(expected);
-
-        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, 
JobDetails.class);
-
-        assertThat(unmarshalled).isEqualTo(expected);
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 93714132681..d218d9ffd55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -285,16 +285,16 @@ class JobVertexBackPressureHandlerTest {
         for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) {
             multipleAttemptsMetricStore.add(metricDump);
         }
-        // Update currentExecutionAttempts directly without JobDetails.
-        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
-        currentExecutionAttempts.put(0, 1);
-        currentExecutionAttempts.put(1, 0);
+        // Update representativeAttempts directly without JobDetails.
+        Map<Integer, Integer> representativeAttempts = new HashMap<>();
+        representativeAttempts.put(0, 1);
+        representativeAttempts.put(1, 0);
         multipleAttemptsMetricStore
-                .getCurrentExecutionAttempts()
+                .getRepresentativeAttempts()
                 .put(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         Collections.singletonMap(
-                                TEST_JOB_VERTEX_ID.toString(), 
currentExecutionAttempts));
+                                TEST_JOB_VERTEX_ID.toString(), 
representativeAttempts));
 
         JobVertexBackPressureHandler jobVertexBackPressureHandler =
                 new JobVertexBackPressureHandler(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index c54f50291fc..84e5cd6396e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -18,21 +18,30 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the MetricStore. */
 class MetricStoreTest {
 
+    private static final JobID JOB_ID = new JobID();
+
     @Test
     void testAdd() throws IOException {
         MetricStore store = setupStore(new MetricStore());
@@ -40,49 +49,55 @@ class MetricStoreTest {
         assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", 
"-1")).isEqualTo("0");
         
assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", 
"-1"))
                 .isEqualTo("1");
-        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric3", 
"-1")).isEqualTo("2");
-        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", 
"-1")).isEqualTo("3");
+        
assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric3", 
"-1"))
+                .isEqualTo("2");
+        
assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric4", 
"-1"))
+                .isEqualTo("3");
 
-        assertThat(store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.abc.metric5", "-1"))
+        assertThat(
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+                                .getMetric("8.abc.metric5", "-1"))
                 .isEqualTo("14");
-        assertThat(store.getSubtaskMetricStore("jobid", "taskid", 
8).getMetric("abc.metric5", "-1"))
+        assertThat(
+                        store.getSubtaskMetricStore(JOB_ID.toString(), 
"taskid", 8)
+                                .getMetric("abc.metric5", "-1"))
                 .isEqualTo("14");
         assertThat(
-                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 
8, 1)
+                        store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 8, 1)
                                 .getMetric("abc.metric5", "-1"))
                 .isEqualTo("4");
         assertThat(
-                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 
8, 2)
+                        store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 8, 2)
                                 .getMetric("abc.metric5", "-1"))
                 .isEqualTo("14");
 
         assertThat(
-                        store.getTaskMetricStore("jobid", "taskid")
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
                                 .getMetric("8.opname.abc.metric6", "-1"))
                 .isEqualTo("5");
         assertThat(
-                        store.getTaskMetricStore("jobid", "taskid")
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
                                 .getMetric("8.opname.abc.metric7", "-1"))
                 .isEqualTo("6");
         assertThat(
-                        store.getTaskMetricStore("jobid", "taskid")
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
                                 .getMetric("1.opname.abc.metric7", "-1"))
                 .isEqualTo("6");
         assertThat(
-                        store.getSubtaskMetricStore("jobid", "taskid", 1)
+                        store.getSubtaskMetricStore(JOB_ID.toString(), 
"taskid", 1)
                                 .getMetric("opname.abc.metric7", "-1"))
                 .isEqualTo("6");
-        assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 
2)).isNull();
+        assertThat(store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 1, 2)).isNull();
         assertThat(
-                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 
1, 3)
+                        store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 1, 3)
                                 .getMetric("opname.abc.metric7", "-1"))
                 .isEqualTo("6");
         assertThat(
-                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 
8, 2)
+                        store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 8, 2)
                                 .getMetric("opname.abc.metric7", "-1"))
                 .isEqualTo("6");
         assertThat(
-                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 
8, 4)
+                        store.getSubtaskAttemptMetricStore(JOB_ID.toString(), 
"taskid", 8, 4)
                                 .getMetric("opname.abc.metric7", "-1"))
                 .isEqualTo("16");
     }
@@ -106,11 +121,48 @@ class MetricStoreTest {
         assertThat(store.getJobs()).isEmpty();
     }
 
+    @Test
+    void testSubtaskMetricStoreCleanup() {
+        MetricStore store = setupStore(new MetricStore());
+        assertThat(
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+                                .getSubtaskMetricStore(8)
+                                .getAllAttemptsMetricStores()
+                                .keySet())
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 2, 4, 
5));
+
+        Set<Integer> currentAttempts = new HashSet<>(Arrays.asList(1, 4));
+        Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
+                Collections.singletonMap(
+                        "taskid",
+                        Collections.singletonMap(8, new CurrentAttempts(1, 
currentAttempts)));
+        JobDetails jobDetail =
+                new JobDetails(
+                        JOB_ID,
+                        "jobname",
+                        0,
+                        0,
+                        0,
+                        JobStatus.RUNNING,
+                        0,
+                        new int[10],
+                        1,
+                        currentExecutionAttempts);
+        store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
+
+        assertThat(
+                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
+                                .getSubtaskMetricStore(8)
+                                .getAllAttemptsMetricStores()
+                                .keySet())
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 4, 5));
+    }
+
     static MetricStore setupStore(MetricStore store) {
-        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
-        currentExecutionAttempts.put(8, 2);
-        store.getCurrentExecutionAttempts()
-                .put("jobid", Collections.singletonMap("taskid", 
currentExecutionAttempts));
+        Map<Integer, Integer> representativeAttempts = new HashMap<>();
+        representativeAttempts.put(8, 2);
+        store.getRepresentativeAttempts()
+                .put(JOB_ID.toString(), Collections.singletonMap("taskid", 
representativeAttempts));
 
         QueryScopeInfo.JobManagerQueryScopeInfo jm =
                 new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
@@ -126,7 +178,8 @@ class MetricStoreTest {
         MetricDump.CounterDump cd22 = new MetricDump.CounterDump(tm2, 
"metric2", 10);
         MetricDump.CounterDump cd22a = new MetricDump.CounterDump(tm2, 
"metric2b", 10);
 
-        QueryScopeInfo.JobQueryScopeInfo job = new 
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+        QueryScopeInfo.JobQueryScopeInfo job =
+                new QueryScopeInfo.JobQueryScopeInfo(JOB_ID.toString(), "abc");
         MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, 
"metric3", 2);
         MetricDump.CounterDump cd4 = new MetricDump.CounterDump(job, 
"metric4", 3);
 
@@ -136,30 +189,41 @@ class MetricStoreTest {
         MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, 
"metric4", 3);
 
         QueryScopeInfo.TaskQueryScopeInfo task =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, 
"abc");
+                new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), 
"taskid", 8, 1, "abc");
         MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, 
"metric5", 4);
 
         QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, 
"abc");
+                new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), 
"taskid", 8, 2, "abc");
         MetricDump.CounterDump cd52 = new 
MetricDump.CounterDump(speculativeTask, "metric5", 14);
 
         QueryScopeInfo.OperatorQueryScopeInfo operator =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 
8, 2, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        JOB_ID.toString(), "taskid", 8, 2, "opname", "abc");
         MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, 
"metric6", 5);
         MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, 
"metric7", 6);
 
         QueryScopeInfo.OperatorQueryScopeInfo operator2 =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 
1, 3, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        JOB_ID.toString(), "taskid", 1, 3, "opname", "abc");
         MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, 
"metric6", 5);
         MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, 
"metric7", 6);
 
         QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 
8, 4, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        JOB_ID.toString(), "taskid", 8, 4, "opname", "abc");
         MetricDump.CounterDump cd63 =
                 new MetricDump.CounterDump(speculativeOperator2, "metric6", 
15);
         MetricDump.CounterDump cd73 =
                 new MetricDump.CounterDump(speculativeOperator2, "metric7", 
16);
 
+        QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator3 =
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        JOB_ID.toString(), "taskid", 8, 5, "opname", "abc");
+        MetricDump.CounterDump cd64 =
+                new MetricDump.CounterDump(speculativeOperator3, "metric6", 
17);
+        MetricDump.CounterDump cd74 =
+                new MetricDump.CounterDump(speculativeOperator3, "metric7", 
18);
+
         store.add(cd1);
         store.add(cd2);
         store.add(cd2a);
@@ -179,6 +243,8 @@ class MetricStoreTest {
         store.add(cd52);
         store.add(cd63);
         store.add(cd73);
+        store.add(cd64);
+        store.add(cd74);
 
         return store;
     }

Reply via email to