lukecwik commented on code in PR #22190:
URL: https://github.com/apache/beam/pull/22190#discussion_r916254625


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java:
##########
@@ -149,34 +249,51 @@ public synchronized void stop() {
     }
   }
 
+  /** Wait until all published events are processed. Should only be used for 
testing */
+  @VisibleForTesting
+  void sync() {
+    CompletableFuture<Void> fut = new CompletableFuture<>();
+    disruptor.publishEvent(
+        (state, seq, arg0) -> {
+          state.eventType = StateEventType.SYNCHRONIZE;
+          state.syncFuture = arg0;
+        },
+        fut);
+    try {
+      fut.get();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
   /** Add the tracker to the sampling set. */
-  synchronized void addTracker(ExecutionStateTracker tracker) {
-    this.activeTrackers.add(tracker);
+  void addTracker(ExecutionStateTracker tracker) {
+    disruptor.publishEvent(
+        (state, seq, arg0) -> {
+          state.eventType = StateEventType.ADD;
+          state.tracker = arg0;
+        },
+        tracker);
   }
 
   /** Remove the tracker from the sampling set. */
   void removeTracker(ExecutionStateTracker tracker) {
-    synchronized (this) {
-      activeTrackers.remove(tracker);
-    }
-
-    // Attribute any remaining time since the last sampling while removing the 
tracker.
-    //
-    // There is a race condition here; if sampling happens in the time between 
when we remove the
-    // tracker from activeTrackers and read the lastSampleTicks value, the 
sampling time will
-    // be lost for the tracker being removed. This is acceptable as sampling 
is already an
-    // approximation of actual execution time.
-    long millisSinceLastSample = clock.getMillis() - this.lastSampleTimeMillis;
-    if (millisSinceLastSample > 0) {
-      tracker.takeSample(millisSinceLastSample);
-    }
+    disruptor.publishEvent(
+        (state, seq, arg0) -> {
+          state.eventType = StateEventType.REMOVE;
+          state.tracker = arg0;
+        },
+        tracker);
   }
 
   /** Attributing sampling time to trackers. */
   @VisibleForTesting
-  public synchronized void doSampling(long millisSinceLastSample) {
-    for (ExecutionStateTracker tracker : activeTrackers) {
-      tracker.takeSample(millisSinceLastSample);
-    }
+  public void doSampling(long millisSinceLastSample) {

Review Comment:
   I think a bunch of tests rely on the sampling happening by the time this 
method returns and I see the sync method which is a pretty neat solution.
   
   I wish our tests didn't invoke this stuff at all but can see why for the 
convenience of not having to write tests that have multiple threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to