This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new dfe2874b875 [FLINK-32217][runtime] Fix NPE in MetricStore#updateCurrentExecutionAttempts. dfe2874b875 is described below commit dfe2874b8757f942d0143e6e5ea6ed3cc13a21d6 Author: xincheng.ljr <xincheng....@alibaba-inc.com> AuthorDate: Thu Jun 1 15:04:25 2023 +0800 [FLINK-32217][runtime] Fix NPE in MetricStore#updateCurrentExecutionAttempts. This closes #22680 (cherry picked from commit 3245e0443b2a4663552a5b707c5c8c46876c1f6d) --- .../rest/handler/legacy/metrics/MetricStore.java | 24 +++++++++++++------ .../handler/legacy/metrics/MetricStoreTest.java | 28 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index d3cdec7dd31..965cff49fe9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -100,20 +100,30 @@ public class MetricStore { Map<Integer, Integer> vertexAttempts = jobRepresentativeAttempts.compute( vertexId, (k, overwritten) -> new HashMap<>()); - JobMetricStore jobMetricStore = this.jobs.get(jobId); - TaskMetricStore taskMetricStore = - jobMetricStore.getTaskMetricStore(vertexId); + Optional<TaskMetricStore> taskMetricStoreOptional = + Optional.ofNullable(this.jobs.get(jobId)) + .map(map -> map.getTaskMetricStore(vertexId)); + // Retains current active subtasks to accommodate dynamic scaling - taskMetricStore.retainSubtasks(subtaskAttempts.keySet()); + taskMetricStoreOptional.ifPresent( + taskMetricStore -> + taskMetricStore.retainSubtasks(subtaskAttempts.keySet())); + subtaskAttempts.forEach( (subtaskIndex, attempts) -> { // Updates representative attempts vertexAttempts.put( subtaskIndex, attempts.getRepresentativeAttempt()); // Retains current attempt metrics to avoid memory leak - taskMetricStore - .getSubtaskMetricStore(subtaskIndex) - .retainAttempts(attempts.getCurrentAttempts()); + taskMetricStoreOptional + .map( + taskMetricStore -> + taskMetricStore.getSubtaskMetricStore( + subtaskIndex)) + .ifPresent( + subtaskMetricStore -> + subtaskMetricStore.retainAttempts( + attempts.getCurrentAttempts())); }); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 812fe1ba174..95ac5167ef6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** Tests for the MetricStore. */ class MetricStoreTest { @@ -124,6 +125,33 @@ class MetricStoreTest { assertThat(store.getJobs()).isEmpty(); } + @Test + void testUpdateCurrentExecutionAttemptsWithNonExistentComponentMetricStore() { + MetricStore metricStore = new MetricStore(); + assertThat(metricStore.getJobs()).isEmpty(); + + JobDetails jobDetail = + new JobDetails( + JOB_ID, + "jobname", + 0, + 0, + 0, + JobStatus.RUNNING, + 0, + new int[10], + 1, + Collections.singletonMap( + "taskid", + Collections.singletonMap( + 1, new CurrentAttempts(1, new HashSet<>())))); + assertThatCode( + () -> + metricStore.updateCurrentExecutionAttempts( + Collections.singletonList(jobDetail))) + .doesNotThrowAnyException(); + } + @Test void testTaskMetricStoreCleanup() { MetricStore store = setupStore(new MetricStore());