This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 07959a4141b [FLINK-32199][rest] Remove redundant metrics in 
TaskMetricStore after rescale down.
07959a4141b is described below

commit 07959a4141ba599194e1e0a8b6da163793d53d0d
Author: Junrui Lee <jrlee....@gmail.com>
AuthorDate: Tue May 30 20:21:29 2023 +0800

    [FLINK-32199][rest] Remove redundant metrics in TaskMetricStore after 
rescale down.
    
    This closes #22706
    
    (cherry picked from commit f3ab9626bf18cad993f7cecba23a7bce6e14407b)
---
 .../rest/handler/legacy/metrics/MetricStore.java   | 13 ++++++++
 .../handler/legacy/metrics/MetricStoreTest.java    | 37 ++++++++++++++++------
 2 files changed, 40 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 6499c9caeb3..d3cdec7dd31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -503,6 +503,19 @@ public class MetricStore {
         }
 
         void retainSubtasks(Set<Integer> activeSubtasks) {
+            // Retain metrics of pattern subtaskIndex.metricName which are 
directly stored in
+            // TaskMetricStore
+            metrics.keySet()
+                    .removeIf(
+                            key -> {
+                                // To prevent errors in metric parsing, here 
we only
+                                // clean up metrics with a pattern of
+                                // "subtaskIndex.metricName"
+                                String index = key.substring(0, 
Math.max(key.indexOf('.'), 0));
+                                return index.matches("\\d+")
+                                        && 
!activeSubtasks.contains(Integer.parseInt(index));
+                            });
+
             subtasks.keySet().retainAll(activeSubtasks);
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index dd8d7e69b4d..812fe1ba174 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -27,13 +27,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -43,7 +46,7 @@ class MetricStoreTest {
     private static final JobID JOB_ID = new JobID();
 
     @Test
-    void testAdd() throws IOException {
+    void testAdd() {
         MetricStore store = setupStore(new MetricStore());
 
         assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", 
"-1")).isEqualTo("0");
@@ -124,10 +127,11 @@ class MetricStoreTest {
     @Test
     void testTaskMetricStoreCleanup() {
         MetricStore store = setupStore(new MetricStore());
-        assertThat(
-                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
-                                .getAllSubtaskMetricStores()
-                                .keySet())
+        MetricStore.TaskMetricStore taskMetricStore =
+                store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+        assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
+        assertThat(getTaskMetricStoreIndexes(taskMetricStore))
                 .containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));
 
         Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
@@ -148,13 +152,26 @@ class MetricStoreTest {
                         currentExecutionAttempts);
         store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
 
-        assertThat(
-                        store.getTaskMetricStore(JOB_ID.toString(), "taskid")
-                                .getAllSubtaskMetricStores()
-                                .keySet())
+        assertThat(taskMetricStore.getAllSubtaskMetricStores().keySet())
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
+
+        assertThat(getTaskMetricStoreIndexes(taskMetricStore))
                 
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
     }
 
+    @Nonnull
+    private static Set<Integer> getTaskMetricStoreIndexes(
+            MetricStore.TaskMetricStore taskMetricStore) {
+        return taskMetricStore.metrics.keySet().stream()
+                .map(
+                        key -> {
+                            String index = key.substring(0, 
Math.max(key.indexOf('.'), 0));
+                            return index.matches("\\d+") ? 
Integer.parseInt(index) : null;
+                        })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+    }
+
     @Test
     void testSubtaskMetricStoreCleanup() {
         MetricStore store = setupStore(new MetricStore());

Reply via email to