This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 07959a4141b [FLINK-32199][rest] Remove redundant metrics in TaskMetricStore after rescale down. 07959a4141b is described below commit 07959a4141ba599194e1e0a8b6da163793d53d0d Author: Junrui Lee <jrlee....@gmail.com> AuthorDate: Tue May 30 20:21:29 2023 +0800 [FLINK-32199][rest] Remove redundant metrics in TaskMetricStore after rescale down. This closes #22706 (cherry picked from commit f3ab9626bf18cad993f7cecba23a7bce6e14407b) --- .../rest/handler/legacy/metrics/MetricStore.java | 13 ++++++++ .../handler/legacy/metrics/MetricStoreTest.java | 37 ++++++++++++++++------ 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index 6499c9caeb3..d3cdec7dd31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -503,6 +503,19 @@ public class MetricStore { } void retainSubtasks(Set<Integer> activeSubtasks) { + // Retain metrics of pattern subtaskIndex.metricName which are directly stored in + // TaskMetricStore + metrics.keySet() + .removeIf( + key -> { + // To prevent errors in metric parsing, here we only + // clean up metrics with a pattern of + // "subtaskIndex.metricName" + String index = key.substring(0, Math.max(key.indexOf('.'), 0)); + return index.matches("\\d+") + && !activeSubtasks.contains(Integer.parseInt(index)); + }); + subtasks.keySet().retainAll(activeSubtasks); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index dd8d7e69b4d..812fe1ba174 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -27,13 +27,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.junit.jupiter.api.Test; -import java.io.IOException; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -43,7 +46,7 @@ class MetricStoreTest { private static final JobID JOB_ID = new JobID(); @Test - void testAdd() throws IOException { + void testAdd() { MetricStore store = setupStore(new MetricStore()); assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")).isEqualTo("0"); @@ -124,10 +127,11 @@ class MetricStoreTest { @Test void testTaskMetricStoreCleanup() { MetricStore store = setupStore(new MetricStore()); - assertThat( - store.getTaskMetricStore(JOB_ID.toString(), "taskid") - .getAllSubtaskMetricStores() - .keySet()) + MetricStore.TaskMetricStore taskMetricStore = + store.getTaskMetricStore(JOB_ID.toString(), "taskid"); + assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8)); + assertThat(getTaskMetricStoreIndexes(taskMetricStore)) .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8)); Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts = @@ -148,13 +152,26 @@ class MetricStoreTest { currentExecutionAttempts); store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail)); - assertThat( - store.getTaskMetricStore(JOB_ID.toString(), "taskid") - .getAllSubtaskMetricStores() - .keySet()) + assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet()) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(1)); + + assertThat(getTaskMetricStoreIndexes(taskMetricStore)) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(1)); } + @Nonnull + private static Set<Integer> getTaskMetricStoreIndexes( + MetricStore.TaskMetricStore taskMetricStore) { + return taskMetricStore.metrics.keySet().stream() + .map( + key -> { + String index = key.substring(0, Math.max(key.indexOf('.'), 0)); + return index.matches("\\d+") ? Integer.parseInt(index) : null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + @Test void testSubtaskMetricStoreCleanup() { MetricStore store = setupStore(new MetricStore());