scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r855137624


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            totalTimeWaited += waitTime;
+            waitTime = waitTime * 2;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+          // There is a chance that we were spuriously woken up but the 
outboundObserver is no

Review Comment:
   I think await returns the current phase, can you use that instead?
   
   nit: the wakeup doesn't have to be spurious, it could have been ready and 
thus notified but it is just no longer ready for some unrelated reason
   
   



##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);

Review Comment:
   One possible difference is we are blocking waiting for isReady to transition 
while synchronizing on the outbound observer.
   Would it be safer to synchronize on something internal to the 
DirectStreamObserver instead in case the outbound observer is synchronized upon 
elsewhere? For example what if the outbound observer (sometimes?) synchronizes 
on itself to transition from isReady=false to isReady=true but not for onNext? 
Or does grpc document syncronization for these observers and thus it doesn't 
matter?
   
   If you don't think that is a concern for some reason, LGTM to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to