This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f6322dae132 Fix ConcurrentModification exception possible in DataflowExecutionStateSampler (#30993) f6322dae132 is described below commit f6322dae13230cd3e15da3cae7ac2158b5efc7fb Author: Sam Whittle <scwhit...@users.noreply.github.com> AuthorDate: Tue Apr 16 19:16:34 2024 +0200 Fix ConcurrentModification exception possible in DataflowExecutionStateSampler (#30993) Also ensure that the result is not modified by observing it. Previously we were merging into completed with each observation which appeared unintended. --- .../dataflow/worker/DataflowExecutionStateSampler.java | 14 ++++++++------ .../dataflow/worker/DataflowExecutionStateSamplerTest.java | 3 +++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java index 1ff9a9be40d..80955185eea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java @@ -39,6 +39,7 @@ public final class DataflowExecutionStateSampler extends ExecutionStateSampler { private final ConcurrentHashMap<String, DataflowExecutionStateTracker> activeTrackersByWorkId = new ConcurrentHashMap<>(); + // The maps within completeProcessingMetrics should not be modified. private final ConcurrentHashMap<String, Map<String, IntSummaryStatistics>> completedProcessingMetrics = new ConcurrentHashMap<>(); @@ -64,7 +65,7 @@ public final class DataflowExecutionStateSampler extends ExecutionStateSampler { this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker); } - private static Map<String, IntSummaryStatistics> mergeStepStatsMaps( + private static void mergeStepStatsMaps( Map<String, IntSummaryStatistics> map1, Map<String, IntSummaryStatistics> map2) { for (Entry<String, IntSummaryStatistics> steps : map2.entrySet()) { map1.compute( @@ -77,7 +78,6 @@ public final class DataflowExecutionStateSampler extends ExecutionStateSampler { return v; }); } - return map1; } @Override @@ -118,13 +118,15 @@ public final class DataflowExecutionStateSampler extends ExecutionStateSampler { } public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) { + Map<String, IntSummaryStatistics> result; DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId); if (tracker == null) { - return completedProcessingMetrics.getOrDefault(workId, new HashMap<>()); + result = new HashMap<>(); + } else { + result = tracker.getProcessingTimesByStepCopy(); } - return mergeStepStatsMaps( - completedProcessingMetrics.getOrDefault(workId, new HashMap<>()), - tracker.getProcessingTimesByStepCopy()); + mergeStepStatsMaps(result, completedProcessingMetrics.getOrDefault(workId, new HashMap<>())); + return result; } public void resetForWorkId(String workId) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java index 920e37d40ec..ab5059bd937 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java @@ -106,6 +106,9 @@ public class DataflowExecutionStateSamplerTest { assertThat(sampler.getActiveMessageMetadataForWorkId(workId).get(), equalTo(testMetadata)); assertThat( sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes)); + // Repeated calls should not modify the result. + assertThat( + sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes)); } @Test