scwhittle commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1756596636


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, 
Instant now) {
         "<table border=\"1\" "
             + 
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
     writer.println(
-        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th></tr>");
+        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th><th>Produced By</th></tr>");

Review Comment:
   don't see matching row change for header change



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) {
       return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 
1;
     }
   }
+
+  /** EnableStreamingEngine defaults to false unless one of the two 
experiments is set. */

Review Comment:
   looks like just 1 experiment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -82,23 +83,25 @@ public final class FanOutStreamingEngineWorkerHarness 
implements StreamingWorker
       LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class);
   private static final String PUBLISH_NEW_WORKER_METADATA_THREAD = 
"PublishNewWorkerMetadataThread";
   private static final String CONSUME_NEW_WORKER_METADATA_THREAD = 
"ConsumeNewWorkerMetadataThread";
+  private static final String STREAM_STARTER_THREAD = "WindmillStreamStarter";

Review Comment:
   add int to format for these, thread name takes in thread count for name



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +332,60 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private volatile @Nullable StreamObserver<RequestT> 
delegateRequestObserver;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = null;
+    }
+
+    private synchronized StreamObserver<RequestT> delegate() {
+      if (delegateRequestObserver == null) {
+        throw new NullPointerException(

Review Comment:
   use Precondtions.checkNotNull?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -317,29 +352,38 @@ public boolean commitWorkItem(
       if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      if (!isShutdown()) {
+        try {
+          flushInternal(queue);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          queuedBytes = 0;
+          queue.clear();

Review Comment:
   perhaps the finally should cover the isShutdown case too? seems like we 
might want to clear teh queue anyway



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        // Skip responses when the stream is shutdown since they are now 
invalid.
+        if (!isShutdown()) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) failure = e;
+          else failure.addSuppressed(e);
         }
       }
     }
-    if (finalException != null) {
-      throw finalException;
+    if (failure != null) {
+      throw failure;
+    }
+  }
+
+  @Override
+  protected void shutdownInternal() {
+    Iterator<PendingRequest> pendingRequests = pending.values().iterator();
+    while (pendingRequests.hasNext()) {
+      PendingRequest pendingRequest = pendingRequests.next();

Review Comment:
   what are concurrenthashmap guarantees around concurrent iteration and 
removal? 
   From docs it apperas iteration is weakly-consistent so it may see removed 
things and it is possible that we would call completeWithStatus twice on the 
same request if the iteration here and the onResponse happened at the same time.
   
   To ensure we don't you could remove everything you iterate and examine 
return value like:
   
   PendingRequest r 
    = pendingRequests.remove(iter.next())
   if (r != null) {
     r.completeWithStatus();
   }
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -165,46 +174,52 @@ private static Watermarks createWatermarks(
         .build();
   }
 
-  private void sendRequestExtension(GetWorkBudget adjustment) {
-    inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment));
-    StreamingGetWorkRequest extension =
-        StreamingGetWorkRequest.newBuilder()
-            .setRequestExtension(
-                Windmill.StreamingGetWorkRequestExtension.newBuilder()
-                    .setMaxItems(adjustment.items())
-                    .setMaxBytes(adjustment.bytes()))
-            .build();
+  private void sendRequestExtension() {

Review Comment:
   add impl note about not synchronizing here due to this running on grpc 
serial executor for message which can deadlock since we send on the stream 
beneath the synchronization



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -74,69 +76,125 @@ public void onNext(T value) {
     while (true) {
       try {
         synchronized (lock) {
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          return;
+        }
+
         synchronized (lock) {
           messagesSinceReady = 0;
-          outboundObserver.onNext(value);
+          tryOnNext(value);
           return;
         }
       } catch (TimeoutException e) {
+        if (isReadyNotifier.isTerminated()) {

Review Comment:
   don't think you need this, awaitAdvanceInterruptibly will return -1 if it's 
terminated.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -265,18 +317,21 @@ public GetWorkerMetadataStream 
createGetWorkerMetadataStream(
         onNewWindmillEndpoints);
   }
 
-  private StreamObserverFactory newStreamObserverFactory() {
+  private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) {
     return StreamObserverFactory.direct(
-        DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
+        hasDeadline ? DEFAULT_DEADLINE_SECONDS : NO_DEADLINE, 
windmillMessagesBetweenIsReadyChecks);
   }
 
   @Override
   public void appendSummaryHtml(PrintWriter writer) {
     writer.write("Active Streams:<br>");
-    for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
-      stream.appendSummaryHtml(writer);
-      writer.write("<br>");
-    }
+    streamRegistry.stream()
+        
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken))
+        .collect(

Review Comment:
   seems like if you've sorted you can remove the to map and just apply 
printSummaryHtmlForWorker to the AbstractWindmillStream iterable



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -110,7 +110,18 @@ public static ProcessingContext createProcessingContext(
       GetDataClient getDataClient,
       Consumer<Commit> workCommitter,
       HeartbeatSender heartbeatSender) {
-    return ProcessingContext.create(computationId, getDataClient, 
workCommitter, heartbeatSender);
+    return ProcessingContext.create(
+        computationId, getDataClient, workCommitter, heartbeatSender, "");
+  }
+
+  public static ProcessingContext createProcessingContext(
+      String backendWorkerToken,

Review Comment:
   nit: put token last to match constructor ordering?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -312,7 +312,7 @@ private synchronized ImmutableMap<ShardedKey, WorkId> 
getStuckCommitsAt(
       if (executableWork != null) {
         Work work = executableWork.work();
         if (work.isStuckCommittingAt(stuckCommitDeadline)) {
-          LOG.error(

Review Comment:
   think this should remain an error, if we have such stuck commits we should 
debug why



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -125,23 +127,22 @@ private FanOutStreamingEngineWorkerHarness(
     this.connections = new 
AtomicReference<>(StreamingEngineConnectionState.EMPTY);
     this.channelCachingStubFactory = channelCachingStubFactory;
     this.dispatcherClient = dispatcherClient;
-    this.isBudgetRefreshPaused = new AtomicBoolean(false);
     this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.windmillStreamStarter =
+        Executors.newCachedThreadPool(
+            new 
ThreadFactoryBuilder().setNameFormat(STREAM_STARTER_THREAD).build());
+    this.windmillStreamCloser =
+        Executors.newCachedThreadPool(
+            new 
ThreadFactoryBuilder().setNameFormat(STREAM_CLOSER_THREAD).build());

Review Comment:
   just single thread pool for starting/closing as stream ops? The stack will 
be pretty clear what the operation is.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +332,60 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private volatile @Nullable StreamObserver<RequestT> 
delegateRequestObserver;

Review Comment:
   remove the volatile if synchronized.  Though it seems like volatile might be 
enough if you just read it once in delegate. can do that by assigning to stack 
variable or using precondtions return value.
   
   return Preconditions.checkNotNull(delegateRequestObserver);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() {
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
     newWorkerMetadataPublisher.shutdownNow();
     newWorkerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
+  private void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    consumeEndpoints(newWindmillEndpoints).join();
+  }
+
   /**
    * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
    * new backend worker metadata.
    */
-  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+  private synchronized CompletableFuture<Void> consumeEndpoints(
+      WindmillEndpoints newWindmillEndpoints) {
     LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
     ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
         createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
-
+    CompletableFuture<Void> closeStaleStreams =
+        closeStaleStreams(newWindmillConnections.values(), 
connections.get().windmillStreams());
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
+        createAndStartNewStreams(newWindmillConnections.values()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
             .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
     LOG.info(
         "Setting new connections: {}. Previous connections: {}.",
         newConnectionsState,
         connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+
+    // Close the streams outside the lock.
+    return closeStaleStreams;
+  }
+
+  /** Close the streams that are no longer valid asynchronously. */
+  private CompletableFuture<Void> closeStaleStreams(
+      Collection<WindmillConnection> newWindmillConnections,
+      ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) {
+    return CompletableFuture.allOf(
+        currentStreams.entrySet().stream()
+            .filter(
+                connectionAndStream ->
+                    
!newWindmillConnections.contains(connectionAndStream.getKey()))
+            .map(
+                entry ->
+                    CompletableFuture.runAsync(
+                        () -> {
+                          LOG.debug("Closing streams to {}", 
entry.getKey().backendWorkerToken());

Review Comment:
   log the full connection here and below? the endpoint matters too
   
   we can also consider if we want these to be info logs



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -185,49 +202,69 @@ protected final void startStream() {
     while (true) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
           streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
           if (clientClosed.get()) {
             halfClose();
           }
           return;
         }
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
           sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          break;

Review Comment:
   rethrow the interrupted exception?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() {
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
     newWorkerMetadataPublisher.shutdownNow();
     newWorkerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
+  private void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    consumeEndpoints(newWindmillEndpoints).join();
+  }
+
   /**
    * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
    * new backend worker metadata.
    */
-  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+  private synchronized CompletableFuture<Void> consumeEndpoints(
+      WindmillEndpoints newWindmillEndpoints) {
     LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
     ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
         createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
-
+    CompletableFuture<Void> closeStaleStreams =
+        closeStaleStreams(newWindmillConnections.values(), 
connections.get().windmillStreams());
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
+        createAndStartNewStreams(newWindmillConnections.values()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
             .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
     LOG.info(
         "Setting new connections: {}. Previous connections: {}.",
         newConnectionsState,
         connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+
+    // Close the streams outside the lock.

Review Comment:
   instead of synchronized method how about using a synchronized block to keep 
the join in same method?
   
   CompletableFuture<Void> blocker;
   synchronized(this) {..}
   blocker.join();



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -185,49 +202,69 @@ protected final void startStream() {
     while (true) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
           streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
           if (clientClosed.get()) {
             halfClose();
           }
           return;
         }
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
           sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          break;
+        } catch (IOException ioe) {
           // Keep trying to create the stream.
         }
       }
     }
+
+    // We were never able to start the stream, remove it from the stream 
registry.
+    streamRegistry.remove(this);

Review Comment:
   if rethrowing above, this will need to be done in that case as well.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -82,13 +86,14 @@ public abstract class AbstractWindmillStream<RequestT, 
ResponseT> implements Win
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableRequestObserver<RequestT> requestObserver;
+  private final AtomicBoolean isShutdown;
+  private final AtomicBoolean streamClosed;

Review Comment:
   keep comment for streamclosed



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        // Skip responses when the stream is shutdown since they are now 
invalid.
+        if (!isShutdown()) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) failure = e;

Review Comment:
   think spotless likes braces for if statements



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -165,46 +174,52 @@ private static Watermarks createWatermarks(
         .build();
   }
 
-  private void sendRequestExtension(GetWorkBudget adjustment) {
-    inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment));
-    StreamingGetWorkRequest extension =
-        StreamingGetWorkRequest.newBuilder()
-            .setRequestExtension(
-                Windmill.StreamingGetWorkRequestExtension.newBuilder()
-                    .setMaxItems(adjustment.items())
-                    .setMaxBytes(adjustment.bytes()))
-            .build();
+  private void sendRequestExtension() {
+    GetWorkBudget currentInFlightBudget = inFlightBudget.get();
+    GetWorkBudget currentMaxBudget = maxGetWorkBudget.get();
 
-    executor()
-        .execute(
+    // If the outstanding items or bytes limit has gotten too low, top both 
off with a
+    // GetWorkExtension. The goal is to keep the limits relatively close to 
their maximum
+    // values without sending too many extension requests.
+    if (currentInFlightBudget.items() < currentMaxBudget.items() / 2
+        || currentInFlightBudget.bytes() < currentMaxBudget.bytes() / 2) {
+      GetWorkBudget extension = 
currentMaxBudget.subtract(currentInFlightBudget);
+      if (extension.items() > 0 || extension.bytes() > 0) {

Review Comment:
   it seems like at least one is positive by the above if.
   But it seems like we might want to ensure the extension is non-negative for 
both fields? 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -311,71 +401,87 @@ public void onNext(ResponseT response) {
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (maybeTeardownStream()) {
+        return;
+      }
+
+      Status status = Status.fromThrowable(t);
+      setLastError(status.toString());
+
+      if (t instanceof StreamObserverCancelledException) {
+        logger.error(
+            "StreamObserver was unexpectedly cancelled for stream={}, 
worker={}. stacktrace={}",
+            getClass(),
+            backendWorkerToken,
+            t.getStackTrace(),
+            t);
+      } else if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {

Review Comment:
   factor out the error handling/logging and share it with the error path in 
onCompleted



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+
+/**
+ * Indicates that a {@link WindmillStream#shutdown()} was called while waiting 
for some internal
+ * operation to complete. Most common use of this exception should be 
conversion to a {@link
+ * org.apache.beam.runners.dataflow.worker.WorkItemCancelledException} as the 
{@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} being 
processed by {@link
+ * WindmillStream}.
+ */
+final class WindmillStreamShutdownException extends RuntimeException {

Review Comment:
   internal?
   this should be translated before it gets to a user correct?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -265,18 +317,21 @@ public GetWorkerMetadataStream 
createGetWorkerMetadataStream(
         onNewWindmillEndpoints);
   }
 
-  private StreamObserverFactory newStreamObserverFactory() {
+  private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) {

Review Comment:
   how about passing in deadline instead? easier to read at call-sites



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -39,7 +39,9 @@
 @ThreadSafe
 public final class DirectStreamObserver<T> implements StreamObserver<T> {

Review Comment:
   maybe this change can be submitted separately? Would be nice to have a test 
for it showing the previous bug as well.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java:
##########
@@ -67,17 +70,18 @@ public <T extends GetWorkBudgetSpender> void 
distributeBudget(
       GetWorkBudgetSpender getWorkBudgetSpender = 
streamAndDesiredBudget.getKey();
       GetWorkBudget desired = streamAndDesiredBudget.getValue();
       GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget();
-      if (isBelowFiftyPercentOfTarget(remaining, desired)) {
+      if (isBelowFiftyPercentOfTarget(remaining, desired) && 
isActiveWorkBudgetAware) {

Review Comment:
   this is unclear to me.  It seems like in one case the adjustment is addtiive 
and another is resetting
   
   Wouldn't we want to only increase by the desired-remaining in either case?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -74,69 +76,125 @@ public void onNext(T value) {
     while (true) {
       try {
         synchronized (lock) {
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          return;
+        }
+
         synchronized (lock) {
           messagesSinceReady = 0;
-          outboundObserver.onNext(value);
+          tryOnNext(value);
           return;
         }
       } catch (TimeoutException e) {
+        if (isReadyNotifier.isTerminated()) {
+          return;
+        }
+
         totalSecondsWaited += waitSeconds;
-        if (totalSecondsWaited > deadlineSeconds) {
-          LOG.error(
-              "Exceeded timeout waiting for the outboundObserver to become 
ready meaning "
-                  + "that the stream deadline was not respected.");
-          throw new RuntimeException(e);
+        if (hasDeadlineExpired(totalSecondsWaited)) {
+          String errorMessage = 
constructStreamCancelledErrorMessage(totalSecondsWaited);
+          LOG.error(errorMessage);
+          throw new StreamObserverCancelledException(errorMessage, e);
         }
+
         if (totalSecondsWaited > 30) {
           LOG.info(
               "Output channel stalled for {}s, outbound thread {}.",
               totalSecondsWaited,
               Thread.currentThread().getName());
         }
+
         waitSeconds = waitSeconds * 2;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+        StreamObserverCancelledException ex = new 
StreamObserverCancelledException(e);
+        LOG.error("Interrupted while waiting for outboundObserver to become 
ready.", ex);
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * Only send the next value if the phaser is not terminated by the time we 
acquire the lock since
+   * the phaser can be terminated at any time.
+   */
+  private void tryOnNext(T value) {

Review Comment:
   not sure these phaser checks are necessary since the outboundObserver itself 
should stop blocking for onNext if the notifier is terminated.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (isStreamDone()) {
+        return;
+      }
+
+      Status status = Status.fromThrowable(t);
+      setLastError(status.toString());
+      // Don't log every error since it will get noisy, and many errors 
transient.
+      if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+        long nowMillis = Instant.now().getMillis();
+        String responseDebug;
+        if (lastResponseTimeMs.get() == 0) {
+          responseDebug = "never received response";
+        } else {
+          responseDebug = "received response " + (nowMillis - 
lastResponseTimeMs.get()) + "ms ago";
+        }
+        LOG.debug(
+            "{} streaming Windmill RPC errors for {}, last was: {} with status 
{}."
+                + " created {}ms ago, {}. This is normal with autoscaling.",
+            AbstractWindmillStream.this.getClass(),
+            errorCount.get(),
+            t,
+            status,
+            nowMillis - startTimeMs.get(),
+            responseDebug);
+      }
+
+      // If the stream was stopped due to a resource exhausted error then we 
are throttled.
+      if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+        startThrottleTimer();
+      }
+
+      try {
+        long sleep = backoff.nextBackOffMillis();
+        sleepUntil.set(Instant.now().getMillis() + sleep);
+        sleeper.sleep(sleep);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (IOException e) {
+        // Ignore.
+      }
+
+      tryRestartStream();
     }
 
     @Override
     public void onCompleted() {
-      onStreamFinished(null);
+      if (isStreamDone()) {
+        return;
+      }
+      errorCount.incrementAndGet();
+      String error =
+          "Stream completed successfully but did not complete requested 
operations, "
+              + "recreating";
+      LOG.warn(error);
+      setLastError(error);
+      tryRestartStream();
     }
 
-    private void onStreamFinished(@Nullable Throwable t) {
-      synchronized (this) {
-        if (clientClosed.get() && !hasPendingRequests()) {
-          streamRegistry.remove(AbstractWindmillStream.this);
-          finishLatch.countDown();
-          return;
-        }
+    private void tryRestartStream() {
+      if (!isShutdown()) {
+        executeSafely(AbstractWindmillStream.this::startStream);
       }
-      if (t != null) {
-        Status status = null;
-        if (t instanceof StatusRuntimeException) {
-          status = ((StatusRuntimeException) t).getStatus();
-        }
-        String statusError = status == null ? "" : status.toString();
-        setLastError(statusError);
-        if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
-          long nowMillis = Instant.now().getMillis();
-          String responseDebug;
-          if (lastResponseTimeMs.get() == 0) {
-            responseDebug = "never received response";
-          } else {
-            responseDebug =
-                "received response " + (nowMillis - lastResponseTimeMs.get()) 
+ "ms ago";
-          }
-          LOG.debug(
-              "{} streaming Windmill RPC errors for {}, last was: {} with 
status {}."
-                  + " created {}ms ago, {}. This is normal with autoscaling.",
-              AbstractWindmillStream.this.getClass(),
-              errorCount.get(),
-              t,
-              statusError,
-              nowMillis - startTimeMs.get(),
-              responseDebug);
-        }
-        // If the stream was stopped due to a resource exhausted error then we 
are throttled.
-        if (status != null && status.getCode() == 
Status.Code.RESOURCE_EXHAUSTED) {
-          startThrottleTimer();
-        }
+    }
 
-        try {
-          long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        } catch (IOException e) {
-          // Ignore.
-        }
-      } else {
-        errorCount.incrementAndGet();
-        String error =
-            "Stream completed successfully but did not complete requested 
operations, "
-                + "recreating";
-        LOG.warn(error);
-        setLastError(error);
+    private synchronized boolean isStreamDone() {
+      if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) {
+        streamRegistry.remove(AbstractWindmillStream.this);

Review Comment:
   Add a comment on return value
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to