This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f6322dae132 Fix ConcurrentModification exception possible in 
DataflowExecutionStateSampler (#30993)
f6322dae132 is described below

commit f6322dae13230cd3e15da3cae7ac2158b5efc7fb
Author: Sam Whittle <scwhit...@users.noreply.github.com>
AuthorDate: Tue Apr 16 19:16:34 2024 +0200

    Fix ConcurrentModification exception possible in 
DataflowExecutionStateSampler (#30993)
    
    Also ensure that the result is not modified by observing it. Previously we
    were merging into completed with each observation which appeared unintended.
---
 .../dataflow/worker/DataflowExecutionStateSampler.java     | 14 ++++++++------
 .../dataflow/worker/DataflowExecutionStateSamplerTest.java |  3 +++
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
index 1ff9a9be40d..80955185eea 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
@@ -39,6 +39,7 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
 
   private final ConcurrentHashMap<String, DataflowExecutionStateTracker> 
activeTrackersByWorkId =
       new ConcurrentHashMap<>();
+  // The maps within completeProcessingMetrics should not be modified.
   private final ConcurrentHashMap<String, Map<String, IntSummaryStatistics>>
       completedProcessingMetrics = new ConcurrentHashMap<>();
 
@@ -64,7 +65,7 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
     this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker);
   }
 
-  private static Map<String, IntSummaryStatistics> mergeStepStatsMaps(
+  private static void mergeStepStatsMaps(
       Map<String, IntSummaryStatistics> map1, Map<String, 
IntSummaryStatistics> map2) {
     for (Entry<String, IntSummaryStatistics> steps : map2.entrySet()) {
       map1.compute(
@@ -77,7 +78,6 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
             return v;
           });
     }
-    return map1;
   }
 
   @Override
@@ -118,13 +118,15 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
   }
 
   public Map<String, IntSummaryStatistics> 
getProcessingDistributionsForWorkId(String workId) {
+    Map<String, IntSummaryStatistics> result;
     DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
     if (tracker == null) {
-      return completedProcessingMetrics.getOrDefault(workId, new HashMap<>());
+      result = new HashMap<>();
+    } else {
+      result = tracker.getProcessingTimesByStepCopy();
     }
-    return mergeStepStatsMaps(
-        completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
-        tracker.getProcessingTimesByStepCopy());
+    mergeStepStatsMaps(result, completedProcessingMetrics.getOrDefault(workId, 
new HashMap<>()));
+    return result;
   }
 
   public void resetForWorkId(String workId) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
index 920e37d40ec..ab5059bd937 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
@@ -106,6 +106,9 @@ public class DataflowExecutionStateSamplerTest {
     assertThat(sampler.getActiveMessageMetadataForWorkId(workId).get(), 
equalTo(testMetadata));
     assertThat(
         sampler.getProcessingDistributionsForWorkId(workId), 
equalTo(testCompletedProcessingTimes));
+    // Repeated calls should not modify the result.
+    assertThat(
+        sampler.getProcessingDistributionsForWorkId(workId), 
equalTo(testCompletedProcessingTimes));
   }
 
   @Test

Reply via email to