This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new 4dad2e29a09 [FLINK-29132][rest] Cleanup subtask attempt metrics according to the JobDetails to avoid memory leak. 4dad2e29a09 is described below commit 4dad2e29a090c731a0474a80e320264040c348ce Author: Gen Luo <luogen...@gmail.com> AuthorDate: Thu Sep 1 17:08:53 2022 +0800 [FLINK-29132][rest] Cleanup subtask attempt metrics according to the JobDetails to avoid memory leak. This closes #20733. --- .../runtime/messages/webmonitor/JobDetails.java | 96 ++++++++--------- .../handler/job/JobVertexBackPressureHandler.java | 34 +++--- .../rest/handler/legacy/metrics/MetricStore.java | 49 +++++++-- .../messages/webmonitor/JobDetailsTest.java | 31 ------ .../job/JobVertexBackPressureHandlerTest.java | 12 +-- .../handler/legacy/metrics/MetricStoreTest.java | 114 ++++++++++++++++----- 6 files changed, 198 insertions(+), 138 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java index 74b2964228c..04fce9483c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; @@ -40,8 +41,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -60,8 +64,6 @@ public class JobDetails implements Serializable { private static final String FIELD_NAME_STATUS = "state"; private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification"; private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total"; - private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS = - "current-execution-attempts"; private final JobID jobId; @@ -84,10 +86,12 @@ public class JobDetails implements Serializable { /** * The map holds the attempt number of the current execution attempt in the Execution, which is * considered as the representing execution for the subtask of the vertex. The keys and values - * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is used to accumulate - * the metrics of a subtask in MetricFetcher. + * are JobVertexID -> SubtaskIndex -> CurrenAttempts info. + * + * <p>The field is excluded from the json. Any usage from the web UI and the history server is + * not allowed. */ - private final Map<String, Map<Integer, Integer>> currentExecutionAttempts; + private final Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts; @VisibleForTesting public JobDetails( @@ -123,7 +127,7 @@ public class JobDetails implements Serializable { long lastUpdateTime, int[] tasksPerState, int numTasks, - Map<String, Map<Integer, Integer>> currentExecutionAttempts) { + Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts) { this.jobId = checkNotNull(jobId); this.jobName = checkNotNull(jobName); this.startTime = startTime; @@ -150,22 +154,25 @@ public class JobDetails implements Serializable { int[] countsPerStatus = new int[ExecutionState.values().length]; long lastChanged = 0; int numTotalTasks = 0; - Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>(); + Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts = new HashMap<>(); for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) { AccessExecutionVertex[] taskVertices = ejv.getTaskVertices(); numTotalTasks += taskVertices.length; - Map<Integer, Integer> vertexAttempts = new HashMap<>(); + Map<Integer, CurrentAttempts> vertexAttempts = new HashMap<>(); for (AccessExecutionVertex taskVertex : taskVertices) { - if (taskVertex.getCurrentExecutions().size() > 1) { - vertexAttempts.put( - taskVertex.getParallelSubtaskIndex(), - taskVertex.getCurrentExecutionAttempt().getAttemptNumber()); - } ExecutionState state = taskVertex.getExecutionState(); countsPerStatus[state.ordinal()]++; lastChanged = Math.max(lastChanged, taskVertex.getStateTimestamp(state)); + + vertexAttempts.put( + taskVertex.getParallelSubtaskIndex(), + new CurrentAttempts( + taskVertex.getCurrentExecutionAttempt().getAttemptNumber(), + taskVertex.getCurrentExecutions().stream() + .map(AccessExecution::getAttemptNumber) + .collect(Collectors.toSet()))); } if (!vertexAttempts.isEmpty()) { @@ -226,7 +233,7 @@ public class JobDetails implements Serializable { return tasksPerState; } - public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() { + public Map<String, Map<Integer, CurrentAttempts>> getCurrentExecutionAttempts() { return currentExecutionAttempts; } // ------------------------------------------------------------------------ @@ -326,20 +333,6 @@ public class JobDetails implements Serializable { jsonGenerator.writeEndObject(); - if (!jobDetails.currentExecutionAttempts.isEmpty()) { - jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS); - for (Map.Entry<String, Map<Integer, Integer>> vertex : - jobDetails.currentExecutionAttempts.entrySet()) { - jsonGenerator.writeObjectFieldStart(vertex.getKey()); - for (Map.Entry<Integer, Integer> attempt : vertex.getValue().entrySet()) { - jsonGenerator.writeNumberField( - String.valueOf(attempt.getKey()), attempt.getValue()); - } - jsonGenerator.writeEndObject(); - } - jsonGenerator.writeEndObject(); - } - jsonGenerator.writeEndObject(); } } @@ -379,28 +372,6 @@ public class JobDetails implements Serializable { jsonNode == null ? 0 : jsonNode.intValue(); } - Map<String, Map<Integer, Integer>> attempts = new HashMap<>(); - JsonNode attemptsNode = rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS); - if (attemptsNode != null) { - attemptsNode - .fields() - .forEachRemaining( - vertex -> { - String vertexId = vertex.getKey(); - Map<Integer, Integer> vertexAttempts = - attempts.computeIfAbsent( - vertexId, k -> new HashMap<>()); - vertex.getValue() - .fields() - .forEachRemaining( - attempt -> - vertexAttempts.put( - Integer.parseInt( - attempt.getKey()), - attempt.getValue().intValue())); - }); - } - return new JobDetails( jobId, jobName, @@ -411,7 +382,30 @@ public class JobDetails implements Serializable { lastUpdateTime, numVerticesPerExecutionState, numTasks, - attempts); + new HashMap<>()); + } + } + + /** + * The CurrentAttempts holds the attempt number of the current representative execution attempt, + * and the attempt numbers of all the running attempts. + */ + public static final class CurrentAttempts implements Serializable { + private final int representativeAttempt; + + private final Set<Integer> currentAttempts; + + public CurrentAttempts(int representativeAttempt, Set<Integer> currentAttempts) { + this.representativeAttempt = representativeAttempt; + this.currentAttempts = Collections.unmodifiableSet(currentAttempts); + } + + public int getRepresentativeAttempt() { + return representativeAttempt; + } + + public Set<Integer> getCurrentAttempts() { + return currentAttempts; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java index 898b74ba0d0..57d3e4207cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java @@ -82,23 +82,23 @@ public class JobVertexBackPressureHandler metricFetcher .getMetricStore() .getTaskMetricStore(jobId.toString(), jobVertexId.toString()); - Map<String, Map<Integer, Integer>> jobCurrentExecutions = - metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString()); - Map<Integer, Integer> currentExecutionAttempts = - jobCurrentExecutions != null - ? jobCurrentExecutions.get(jobVertexId.toString()) + Map<String, Map<Integer, Integer>> jobRepresentativeExecutions = + metricFetcher.getMetricStore().getRepresentativeAttempts().get(jobId.toString()); + Map<Integer, Integer> representativeAttempts = + jobRepresentativeExecutions != null + ? jobRepresentativeExecutions.get(jobVertexId.toString()) : null; return CompletableFuture.completedFuture( taskMetricStore != null - ? createJobVertexBackPressureInfo(taskMetricStore, currentExecutionAttempts) + ? createJobVertexBackPressureInfo(taskMetricStore, representativeAttempts) : JobVertexBackPressureInfo.deprecated()); } private JobVertexBackPressureInfo createJobVertexBackPressureInfo( - TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) { + TaskMetricStore taskMetricStore, Map<Integer, Integer> representativeAttempts) { List<SubtaskBackPressureInfo> subtaskBackPressureInfos = - createSubtaskBackPressureInfo(taskMetricStore, currentExecutionAttempts); + createSubtaskBackPressureInfo(taskMetricStore, representativeAttempts); return new JobVertexBackPressureInfo( JobVertexBackPressureInfo.VertexBackPressureStatus.OK, getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)), @@ -107,7 +107,7 @@ public class JobVertexBackPressureHandler } private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo( - TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) { + TaskMetricStore taskMetricStore, Map<Integer, Integer> representativeAttempts) { Map<Integer, SubtaskMetricStore> subtaskMetricStores = taskMetricStore.getAllSubtaskMetricStores(); List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size()); @@ -121,19 +121,19 @@ public class JobVertexBackPressureHandler createSubtaskAttemptBackpressureInfo( subtaskIndex, null, subtaskMetricStore, null)); } else { - int currentAttempt = - currentExecutionAttempts == null + int representativeAttempt = + representativeAttempts == null ? -1 - : currentExecutionAttempts.getOrDefault(subtaskIndex, -1); - if (!allAttemptsMetricStores.containsKey(currentAttempt)) { + : representativeAttempts.getOrDefault(subtaskIndex, -1); + if (!allAttemptsMetricStores.containsKey(representativeAttempt)) { // allAttemptsMetricStores is not empty here - currentAttempt = allAttemptsMetricStores.keySet().iterator().next(); + representativeAttempt = allAttemptsMetricStores.keySet().iterator().next(); } List<SubtaskBackPressureInfo> otherConcurrentAttempts = new ArrayList<>(allAttemptsMetricStores.size() - 1); for (Map.Entry<Integer, ComponentMetricStore> attemptStore : allAttemptsMetricStores.entrySet()) { - if (attemptStore.getKey() == currentAttempt) { + if (attemptStore.getKey() == representativeAttempt) { continue; } otherConcurrentAttempts.add( @@ -146,8 +146,8 @@ public class JobVertexBackPressureHandler result.add( createSubtaskAttemptBackpressureInfo( subtaskIndex, - currentAttempt, - allAttemptsMetricStores.get(currentAttempt), + representativeAttempt, + allAttemptsMetricStores.get(representativeAttempt), otherConcurrentAttempts)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index c86ac763f9c..1908afa7427 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,7 +65,7 @@ public class MetricStore { * CurrentExecutionAttemptNumber. When a metric of an execution attempt is added, the metric can * also be added to the SubtaskMetricStore when it is of the representing execution. */ - private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts = + private final Map<String, Map<String, Map<Integer, Integer>>> representativeAttempts = new ConcurrentHashMap<>(); /** @@ -82,18 +84,39 @@ public class MetricStore { */ synchronized void retainJobs(List<String> activeJobs) { jobs.keySet().retainAll(activeJobs); - currentExecutionAttempts.keySet().retainAll(activeJobs); + representativeAttempts.keySet().retainAll(activeJobs); } public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> jobs) { - jobs.forEach( - job -> - currentExecutionAttempts.put( - job.getJobId().toString(), job.getCurrentExecutionAttempts())); + for (JobDetails job : jobs) { + String jobId = job.getJobId().toString(); + Map<String, Map<Integer, CurrentAttempts>> currentAttempts = + job.getCurrentExecutionAttempts(); + Map<String, Map<Integer, Integer>> jobRepresentativeAttempts = + representativeAttempts.compute( + jobId, (k, overwritten) -> new HashMap<>(currentAttempts.size())); + currentAttempts.forEach( + (vertexId, subtaskAttempts) -> { + Map<Integer, Integer> vertexAttempts = + jobRepresentativeAttempts.compute( + vertexId, (k, overwritten) -> new HashMap<>()); + TaskMetricStore taskMetricStore = getTaskMetricStore(jobId, vertexId); + subtaskAttempts.forEach( + (subtaskIndex, attempts) -> { + // Updates representative attempts + vertexAttempts.put( + subtaskIndex, attempts.getRepresentativeAttempt()); + // Retains current attempt metrics to avoid memory leak + taskMetricStore + .getSubtaskMetricStore(subtaskIndex) + .retainAttempts(attempts.getCurrentAttempts()); + }); + }); + } } - public Map<String, Map<String, Map<Integer, Integer>>> getCurrentExecutionAttempts() { - return currentExecutionAttempts; + public Map<String, Map<String, Map<Integer, Integer>>> getRepresentativeAttempts() { + return representativeAttempts; } /** @@ -341,7 +364,7 @@ public class MetricStore { // which means there should be only one execution private boolean isRepresentativeAttempt( String jobID, String vertexID, int subtaskIndex, int attemptNumber) { - return Optional.of(currentExecutionAttempts) + return Optional.of(representativeAttempts) .map(m -> m.get(jobID)) .map(m -> m.get(vertexID)) .map(m -> m.get(subtaskIndex)) @@ -507,6 +530,14 @@ public class MetricStore { return unmodifiableMap(attempts); } + void retainAttempts(Set<Integer> currentAttempts) { + int latestAttempt = currentAttempts.stream().mapToInt(i -> i).max().orElse(0); + attempts.keySet() + .removeIf( + attempt -> + attempt < latestAttempt && !currentAttempts.contains(attempt)); + } + private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) { if (source == null) { return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java index 609ef1f8e0d..790ca43ce7d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java @@ -29,8 +29,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -104,33 +102,4 @@ class JobDetailsTest { assertThat(unmarshalled).isEqualTo(expected); } - - @Test - void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException { - Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>(); - currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2); - currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4); - currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1); - - final JobDetails expected = - new JobDetails( - new JobID(), - "foobar", - 1L, - 10L, - 9L, - JobStatus.RUNNING, - 8L, - new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3}, - 42, - currentExecutionAttempts); - - final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - final JsonNode marshalled = objectMapper.valueToTree(expected); - - final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class); - - assertThat(unmarshalled).isEqualTo(expected); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java index 93714132681..d218d9ffd55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java @@ -285,16 +285,16 @@ class JobVertexBackPressureHandlerTest { for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) { multipleAttemptsMetricStore.add(metricDump); } - // Update currentExecutionAttempts directly without JobDetails. - Map<Integer, Integer> currentExecutionAttempts = new HashMap<>(); - currentExecutionAttempts.put(0, 1); - currentExecutionAttempts.put(1, 0); + // Update representativeAttempts directly without JobDetails. + Map<Integer, Integer> representativeAttempts = new HashMap<>(); + representativeAttempts.put(0, 1); + representativeAttempts.put(1, 0); multipleAttemptsMetricStore - .getCurrentExecutionAttempts() + .getRepresentativeAttempts() .put( TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), Collections.singletonMap( - TEST_JOB_VERTEX_ID.toString(), currentExecutionAttempts)); + TEST_JOB_VERTEX_ID.toString(), representativeAttempts)); JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index c54f50291fc..84e5cd6396e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -18,21 +18,30 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the MetricStore. */ class MetricStoreTest { + private static final JobID JOB_ID = new JobID(); + @Test void testAdd() throws IOException { MetricStore store = setupStore(new MetricStore()); @@ -40,49 +49,55 @@ class MetricStoreTest { assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")).isEqualTo("0"); assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1")) .isEqualTo("1"); - assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1")).isEqualTo("2"); - assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3"); + assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric3", "-1")) + .isEqualTo("2"); + assertThat(store.getJobMetricStore(JOB_ID.toString()).getMetric("abc.metric4", "-1")) + .isEqualTo("3"); - assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1")) + assertThat( + store.getTaskMetricStore(JOB_ID.toString(), "taskid") + .getMetric("8.abc.metric5", "-1")) .isEqualTo("14"); - assertThat(store.getSubtaskMetricStore("jobid", "taskid", 8).getMetric("abc.metric5", "-1")) + assertThat( + store.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 8) + .getMetric("abc.metric5", "-1")) .isEqualTo("14"); assertThat( - store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 1) + store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 1) .getMetric("abc.metric5", "-1")) .isEqualTo("4"); assertThat( - store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2) + store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2) .getMetric("abc.metric5", "-1")) .isEqualTo("14"); assertThat( - store.getTaskMetricStore("jobid", "taskid") + store.getTaskMetricStore(JOB_ID.toString(), "taskid") .getMetric("8.opname.abc.metric6", "-1")) .isEqualTo("5"); assertThat( - store.getTaskMetricStore("jobid", "taskid") + store.getTaskMetricStore(JOB_ID.toString(), "taskid") .getMetric("8.opname.abc.metric7", "-1")) .isEqualTo("6"); assertThat( - store.getTaskMetricStore("jobid", "taskid") + store.getTaskMetricStore(JOB_ID.toString(), "taskid") .getMetric("1.opname.abc.metric7", "-1")) .isEqualTo("6"); assertThat( - store.getSubtaskMetricStore("jobid", "taskid", 1) + store.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 1) .getMetric("opname.abc.metric7", "-1")) .isEqualTo("6"); - assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 2)).isNull(); + assertThat(store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 2)).isNull(); assertThat( - store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 3) + store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 3) .getMetric("opname.abc.metric7", "-1")) .isEqualTo("6"); assertThat( - store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2) + store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2) .getMetric("opname.abc.metric7", "-1")) .isEqualTo("6"); assertThat( - store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 4) + store.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 4) .getMetric("opname.abc.metric7", "-1")) .isEqualTo("16"); } @@ -106,11 +121,48 @@ class MetricStoreTest { assertThat(store.getJobs()).isEmpty(); } + @Test + void testSubtaskMetricStoreCleanup() { + MetricStore store = setupStore(new MetricStore()); + assertThat( + store.getTaskMetricStore(JOB_ID.toString(), "taskid") + .getSubtaskMetricStore(8) + .getAllAttemptsMetricStores() + .keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 2, 4, 5)); + + Set<Integer> currentAttempts = new HashSet<>(Arrays.asList(1, 4)); + Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts = + Collections.singletonMap( + "taskid", + Collections.singletonMap(8, new CurrentAttempts(1, currentAttempts))); + JobDetails jobDetail = + new JobDetails( + JOB_ID, + "jobname", + 0, + 0, + 0, + JobStatus.RUNNING, + 0, + new int[10], + 1, + currentExecutionAttempts); + store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail)); + + assertThat( + store.getTaskMetricStore(JOB_ID.toString(), "taskid") + .getSubtaskMetricStore(8) + .getAllAttemptsMetricStores() + .keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 4, 5)); + } + static MetricStore setupStore(MetricStore store) { - Map<Integer, Integer> currentExecutionAttempts = new HashMap<>(); - currentExecutionAttempts.put(8, 2); - store.getCurrentExecutionAttempts() - .put("jobid", Collections.singletonMap("taskid", currentExecutionAttempts)); + Map<Integer, Integer> representativeAttempts = new HashMap<>(); + representativeAttempts.put(8, 2); + store.getRepresentativeAttempts() + .put(JOB_ID.toString(), Collections.singletonMap("taskid", representativeAttempts)); QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc"); @@ -126,7 +178,8 @@ class MetricStoreTest { MetricDump.CounterDump cd22 = new MetricDump.CounterDump(tm2, "metric2", 10); MetricDump.CounterDump cd22a = new MetricDump.CounterDump(tm2, "metric2b", 10); - QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc"); + QueryScopeInfo.JobQueryScopeInfo job = + new QueryScopeInfo.JobQueryScopeInfo(JOB_ID.toString(), "abc"); MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2); MetricDump.CounterDump cd4 = new MetricDump.CounterDump(job, "metric4", 3); @@ -136,30 +189,41 @@ class MetricStoreTest { MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3); QueryScopeInfo.TaskQueryScopeInfo task = - new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, "abc"); + new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), "taskid", 8, 1, "abc"); MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4); QueryScopeInfo.TaskQueryScopeInfo speculativeTask = - new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, "abc"); + new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), "taskid", 8, 2, "abc"); MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14); QueryScopeInfo.OperatorQueryScopeInfo operator = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 2, "opname", "abc"); + new QueryScopeInfo.OperatorQueryScopeInfo( + JOB_ID.toString(), "taskid", 8, 2, "opname", "abc"); MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5); MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6); QueryScopeInfo.OperatorQueryScopeInfo operator2 = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, 3, "opname", "abc"); + new QueryScopeInfo.OperatorQueryScopeInfo( + JOB_ID.toString(), "taskid", 1, 3, "opname", "abc"); MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5); MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6); QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 4, "opname", "abc"); + new QueryScopeInfo.OperatorQueryScopeInfo( + JOB_ID.toString(), "taskid", 8, 4, "opname", "abc"); MetricDump.CounterDump cd63 = new MetricDump.CounterDump(speculativeOperator2, "metric6", 15); MetricDump.CounterDump cd73 = new MetricDump.CounterDump(speculativeOperator2, "metric7", 16); + QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator3 = + new QueryScopeInfo.OperatorQueryScopeInfo( + JOB_ID.toString(), "taskid", 8, 5, "opname", "abc"); + MetricDump.CounterDump cd64 = + new MetricDump.CounterDump(speculativeOperator3, "metric6", 17); + MetricDump.CounterDump cd74 = + new MetricDump.CounterDump(speculativeOperator3, "metric7", 18); + store.add(cd1); store.add(cd2); store.add(cd2a); @@ -179,6 +243,8 @@ class MetricStoreTest { store.add(cd52); store.add(cd63); store.add(cd73); + store.add(cd64); + store.add(cd74); return store; }