This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b923a67369e fix bug in getProcessingTimesByStepCopy (#30270)
b923a67369e is described below

commit b923a67369e0707575f974aa4cba700091aeb916
Author: clmccart <clairelmccarth...@gmail.com>
AuthorDate: Tue Feb 13 01:22:11 2024 -0800

    fix bug in getProcessingTimesByStepCopy (#30270)
    
    
    Co-authored-by: Claire McCarthy <clairemccar...@google.com>
---
 .../dataflow/worker/DataflowExecutionContext.java  | 42 +++++++++++++++-------
 1 file changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 7d45295b2d8..080fa7c9dac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -29,6 +29,8 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StepContext;
@@ -177,6 +179,7 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
 
   /** Dataflow specific {@link StepContext}. */
   public abstract static class DataflowStepContext implements StepContext {
+
     private final NameContext nameContext;
 
     public DataflowStepContext(NameContext nameContext) {
@@ -253,10 +256,13 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
      * Metadata on the message whose processing is currently being managed by 
this tracker. If no
      * message is actively being processed, activeMessageMetadata will be null.
      */
-    @Nullable private ActiveMessageMetadata activeMessageMetadata = null;
+    @GuardedBy("this")
+    @Nullable
+    private ActiveMessageMetadata activeMessageMetadata = null;
 
     private final MillisProvider clock = System::currentTimeMillis;
 
+    @GuardedBy("this")
     private final Map<String, IntSummaryStatistics> processingTimesByStep = 
new HashMap<>();
 
     public DataflowExecutionStateTracker(
@@ -313,20 +319,19 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       if (isDataflowProcessElementState) {
         DataflowExecutionState newDFState = (DataflowExecutionState) newState;
         if (newDFState.getStepName() != null && 
newDFState.getStepName().userName() != null) {
-          if (this.activeMessageMetadata != null) {
-            recordActiveMessageInProcessingTimesMap();
+          recordActiveMessageInProcessingTimesMap();
+          synchronized (this) {
+            this.activeMessageMetadata =
+                ActiveMessageMetadata.create(
+                    newDFState.getStepName().userName(), clock.getMillis());
           }
-          this.activeMessageMetadata =
-              
ActiveMessageMetadata.create(newDFState.getStepName().userName(), 
clock.getMillis());
         }
         elementExecutionTracker.enter(newDFState.getStepName());
       }
 
       return () -> {
         if (isDataflowProcessElementState) {
-          if (this.activeMessageMetadata != null) {
-            recordActiveMessageInProcessingTimesMap();
-          }
+          recordActiveMessageInProcessingTimesMap();
           elementExecutionTracker.exit();
         }
         baseCloseable.close();
@@ -337,12 +342,21 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       return this.workItemId;
     }
 
-    public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
+    public synchronized Optional<ActiveMessageMetadata> 
getActiveMessageMetadata() {
       return Optional.ofNullable(activeMessageMetadata);
     }
 
-    public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
-      Map<String, IntSummaryStatistics> processingTimesCopy = 
processingTimesByStep;
+    public synchronized Map<String, IntSummaryStatistics> 
getProcessingTimesByStepCopy() {
+      Map<String, IntSummaryStatistics> processingTimesCopy =
+          processingTimesByStep.entrySet().stream()
+              .collect(
+                  Collectors.toMap(
+                      e -> e.getKey(),
+                      e -> {
+                        IntSummaryStatistics clone = new 
IntSummaryStatistics();
+                        clone.combine(e.getValue());
+                        return clone;
+                      }));
       return processingTimesCopy;
     }
 
@@ -351,17 +365,19 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
      * processing times map. Sets the activeMessageMetadata to null after the 
entry has been
      * recorded.
      */
-    private void recordActiveMessageInProcessingTimesMap() {
+    private synchronized void recordActiveMessageInProcessingTimesMap() {
       if (this.activeMessageMetadata == null) {
         return;
       }
+      int processingTime =
+          (int) (System.currentTimeMillis() - 
this.activeMessageMetadata.startTime());
       this.processingTimesByStep.compute(
           this.activeMessageMetadata.userStepName(),
           (k, v) -> {
             if (v == null) {
               v = new IntSummaryStatistics();
             }
-            v.accept((int) (System.currentTimeMillis() - 
this.activeMessageMetadata.startTime()));
+            v.accept(processingTime);
             return v;
           });
       this.activeMessageMetadata = null;

Reply via email to