scwhittle commented on code in PR #35135:
URL: https://github.com/apache/beam/pull/35135#discussion_r2124946591


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -543,15 +543,21 @@ public void start(String processBundleId) {
 
     @Override
     public void updateIntermediateMonitoringData(Map<String, ByteString> 
monitoringData) {
-      for (ExecutionStateImpl executionState : executionStates) {
-        executionState.updateMonitoringData(monitoringData);
+      // executionState may get reset in reset() call below
+      synchronized (activeStateTrackers) {
+        for (ExecutionStateImpl executionState : executionStates) {
+          executionState.updateMonitoringData(monitoringData);

Review Comment:
   can we instead make the  ExecutionStateImpl synchronize lastReportedValue 
and hasReportedValue? it could just synchronize on itself for it an reset().  
The comment around those says it is guarded by some other mutex but it clearly 
isn't in the reset case.
   
   It's odd to use activeStateTrackers here for internal state to unrelated 
executionState just because we happen to be holding it in reset() when 
modifying the execution states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to