This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new dfe2874b875 [FLINK-32217][runtime] Fix NPE in 
MetricStore#updateCurrentExecutionAttempts.
dfe2874b875 is described below

commit dfe2874b8757f942d0143e6e5ea6ed3cc13a21d6
Author: xincheng.ljr <xincheng....@alibaba-inc.com>
AuthorDate: Thu Jun 1 15:04:25 2023 +0800

    [FLINK-32217][runtime] Fix NPE in 
MetricStore#updateCurrentExecutionAttempts.
    
    This closes #22680
    
    (cherry picked from commit 3245e0443b2a4663552a5b707c5c8c46876c1f6d)
---
 .../rest/handler/legacy/metrics/MetricStore.java   | 24 +++++++++++++------
 .../handler/legacy/metrics/MetricStoreTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index d3cdec7dd31..965cff49fe9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -100,20 +100,30 @@ public class MetricStore {
                         Map<Integer, Integer> vertexAttempts =
                                 jobRepresentativeAttempts.compute(
                                         vertexId, (k, overwritten) -> new 
HashMap<>());
-                        JobMetricStore jobMetricStore = this.jobs.get(jobId);
-                        TaskMetricStore taskMetricStore =
-                                jobMetricStore.getTaskMetricStore(vertexId);
+                        Optional<TaskMetricStore> taskMetricStoreOptional =
+                                Optional.ofNullable(this.jobs.get(jobId))
+                                        .map(map -> 
map.getTaskMetricStore(vertexId));
+
                         // Retains current active subtasks to accommodate 
dynamic scaling
-                        
taskMetricStore.retainSubtasks(subtaskAttempts.keySet());
+                        taskMetricStoreOptional.ifPresent(
+                                taskMetricStore ->
+                                        
taskMetricStore.retainSubtasks(subtaskAttempts.keySet()));
+
                         subtaskAttempts.forEach(
                                 (subtaskIndex, attempts) -> {
                                     // Updates representative attempts
                                     vertexAttempts.put(
                                             subtaskIndex, 
attempts.getRepresentativeAttempt());
                                     // Retains current attempt metrics to 
avoid memory leak
-                                    taskMetricStore
-                                            
.getSubtaskMetricStore(subtaskIndex)
-                                            
.retainAttempts(attempts.getCurrentAttempts());
+                                    taskMetricStoreOptional
+                                            .map(
+                                                    taskMetricStore ->
+                                                            
taskMetricStore.getSubtaskMetricStore(
+                                                                    
subtaskIndex))
+                                            .ifPresent(
+                                                    subtaskMetricStore ->
+                                                            
subtaskMetricStore.retainAttempts(
+                                                                    
attempts.getCurrentAttempts()));
                                 });
                     });
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 812fe1ba174..95ac5167ef6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 
 /** Tests for the MetricStore. */
 class MetricStoreTest {
@@ -124,6 +125,33 @@ class MetricStoreTest {
         assertThat(store.getJobs()).isEmpty();
     }
 
+    @Test
+    void 
testUpdateCurrentExecutionAttemptsWithNonExistentComponentMetricStore() {
+        MetricStore metricStore = new MetricStore();
+        assertThat(metricStore.getJobs()).isEmpty();
+
+        JobDetails jobDetail =
+                new JobDetails(
+                        JOB_ID,
+                        "jobname",
+                        0,
+                        0,
+                        0,
+                        JobStatus.RUNNING,
+                        0,
+                        new int[10],
+                        1,
+                        Collections.singletonMap(
+                                "taskid",
+                                Collections.singletonMap(
+                                        1, new CurrentAttempts(1, new 
HashSet<>()))));
+        assertThatCode(
+                        () ->
+                                metricStore.updateCurrentExecutionAttempts(
+                                        Collections.singletonList(jobDetail)))
+                .doesNotThrowAnyException();
+    }
+
     @Test
     void testTaskMetricStoreCleanup() {
         MetricStore store = setupStore(new MetricStore());

Reply via email to