scwhittle commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1758496171


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +329,72 @@ public String backendWorkerToken() {
   }
 
   @Override
-  public void shutdown() {
+  public final void shutdown() {
+    // Don't lock here as isShutdown checks are used in the stream to free 
blocked
+    // threads or as exit conditions to loops.
     if (isShutdown.compareAndSet(false, true)) {
       requestObserver()
           .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+      shutdownInternal();
     }
   }
 
-  private void setLastError(String error) {
-    lastError.set(error);
-    lastErrorTime.set(DateTime.now());
+  private void recordRestartReason(String error) {
+    lastRestartReason.set(error);
+    lastRestartTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private volatile @Nullable StreamObserver<RequestT> 
delegateRequestObserver;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = null;
+    }
+
+    private synchronized StreamObserver<RequestT> delegate() {
+      return Preconditions.checkNotNull(
+          delegateRequestObserver,
+          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+    }
+
+    private synchronized void reset() {
+      delegateRequestObserver = requestObserverSupplier.get();
+    }
+
+    @Override
+    public void onNext(RequestT requestT) {
+      delegate().onNext(requestT);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      delegate().onError(throwable);
+    }
+
+    @Override
+    public synchronized void onCompleted() {

Review Comment:
   rm synchronized?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -264,24 +284,18 @@ protected void startThrottleTimer() {
 
   @Override
   public void adjustBudget(long itemsDelta, long bytesDelta) {

Review Comment:
   can we name this setBudget?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -251,14 +300,16 @@ public final void appendSummaryHtml(PrintWriter writer) {
         streamClosed.get());
   }
 
-  // Don't require synchronization on stream, see the appendSummaryHtml 
comment.
+  /**
+   * @implNote Don't require synchronization on stream, see the {@link
+   *     #appendSummaryHtml(PrintWriter)} comment.
+   */
   protected abstract void appendSpecificHtml(PrintWriter writer);
 
   @Override
-  public final synchronized void halfClose() {
-    // Synchronization of close and onCompleted necessary for correct retry 
logic in onNewStream.
+  public final void halfClose() {
     clientClosed.set(true);
-    requestObserver().onCompleted();
+    requestObserver.onCompleted();

Review Comment:
   why is it safe to not synchronize? The comment indicated it was for a 
reason.  In general I think that we need to synchronize using the request 
observer because grpc doesn't expect it to be used simultaneously by multiple 
threads.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -337,8 +337,17 @@ synchronized void printActiveWork(PrintWriter writer, 
Instant now) {
     writer.println(
         "<table border=\"1\" "
             + 
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
+    // Columns.
     writer.println(
-        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th></tr>");
+        "<tr>"
+            + "<th>Key</th>"
+            + "<th>Token</th>"
+            + "<th>Queued</th>"
+            + "<th>Active For</th>"
+            + "<th>State</th>"
+            + "<th>State Active For</th>"
+            + "<th>Produced By</th>"

Review Comment:
   nit: produced is confusing since we use that for shuffle terminology.  How 
about just "Backend"



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +334,54 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+    while (!isShutdown()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException
+          | VerifyException
+          | CancellationException e) {
+        handleShutdown(request);
+        if (!(e instanceof CancellationException)) {
+          throw e;
+        }
       } catch (IOException e) {
         LOG.error("Parsing GetData response failed: ", e);
-        continue;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
+        handleShutdown(request);
         throw new RuntimeException(e);
       } finally {
         pending.remove(request.id());
       }
     }
+
+    // If we have exited the loop here, the stream has been shutdown. Cancel 
the response stream.
+    request.getResponseStream().cancel();
+    throw new WindmillStreamShutdownException(
+        "Cannot send request=[" + request + "] on closed stream.");
+  }
+
+  private void handleShutdown(QueuedRequest request) {
+    if (isShutdown()) {
+      throw new WindmillStreamShutdownException(
+          "Cannot send request=[" + request + "] on closed stream.");

Review Comment:
   if we keep this, should we pass in the exception above to add as a 
suppressed exception?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +334,54 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+    while (!isShutdown()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException
+          | VerifyException
+          | CancellationException e) {
+        handleShutdown(request);

Review Comment:
   Can we make shutdown handling consistent?
   we throw an exception for shutdown here but if we don't run the loop due to 
isShutdown we just return from this function without doing anything.
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +332,60 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private volatile @Nullable StreamObserver<RequestT> 
delegateRequestObserver;

Review Comment:
   You don't need both, synchronization also ensures that memory changes are 
viewed by other threads. Volatile just ensures one fields's changes are visible 
to other threads. 
   
   This seems like a good overview: 
https://blogs.oracle.com/javamagazine/post/java-thread-synchronization-volatile-final-atomic-deadlocks



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() {
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
     newWorkerMetadataPublisher.shutdownNow();
     newWorkerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
+  private void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    consumeEndpoints(newWindmillEndpoints).join();
+  }
+
   /**
    * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
    * new backend worker metadata.
    */
-  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+  private synchronized CompletableFuture<Void> consumeEndpoints(
+      WindmillEndpoints newWindmillEndpoints) {
     LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
     ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
         createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
-
+    CompletableFuture<Void> closeStaleStreams =
+        closeStaleStreams(newWindmillConnections.values(), 
connections.get().windmillStreams());
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
+        createAndStartNewStreams(newWindmillConnections.values()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
             .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
     LOG.info(
         "Setting new connections: {}. Previous connections: {}.",
         newConnectionsState,
         connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+
+    // Close the streams outside the lock.
+    return closeStaleStreams;
+  }
+
+  /** Close the streams that are no longer valid asynchronously. */
+  private CompletableFuture<Void> closeStaleStreams(
+      Collection<WindmillConnection> newWindmillConnections,
+      ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) {
+    return CompletableFuture.allOf(
+        currentStreams.entrySet().stream()
+            .filter(
+                connectionAndStream ->
+                    
!newWindmillConnections.contains(connectionAndStream.getKey()))
+            .map(
+                entry ->
+                    CompletableFuture.runAsync(
+                        () -> {
+                          LOG.debug("Closing streams to {}", 
entry.getKey().backendWorkerToken());

Review Comment:
   debug logs can be enabled via pipeline option when starting the job.
   Arun is adding dynamic job settings, it could be cool if we allowed 
modifying log granularity dynamically in the future to help investigate things.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+
+/**
+ * Indicates that a {@link WindmillStream#shutdown()} was called while waiting 
for some internal
+ * operation to complete. Most common use of this exception should be 
conversion to a {@link
+ * org.apache.beam.runners.dataflow.worker.WorkItemCancelledException} as the 
{@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} being 
processed by {@link
+ * WindmillStream}.
+ */
+final class WindmillStreamShutdownException extends RuntimeException {

Review Comment:
   Good point that's probably just necessary for public stuff.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -265,18 +317,21 @@ public GetWorkerMetadataStream 
createGetWorkerMetadataStream(
         onNewWindmillEndpoints);
   }
 
-  private StreamObserverFactory newStreamObserverFactory() {
+  private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) {
     return StreamObserverFactory.direct(
-        DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
+        hasDeadline ? DEFAULT_DEADLINE_SECONDS : NO_DEADLINE, 
windmillMessagesBetweenIsReadyChecks);
   }
 
   @Override
   public void appendSummaryHtml(PrintWriter writer) {
     writer.write("Active Streams:<br>");
-    for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
-      stream.appendSummaryHtml(writer);
-      writer.write("<br>");
-    }
+    streamRegistry.stream()
+        
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken))
+        .collect(

Review Comment:
   I missed the multimap bit. I agree the grouping is nice. However isn't the 
sorting then unnecessary if we're putting it in a map right away?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() {
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
     newWorkerMetadataPublisher.shutdownNow();
     newWorkerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
+  private void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    consumeEndpoints(newWindmillEndpoints).join();
+  }
+
   /**
    * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
    * new backend worker metadata.
    */
-  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+  private synchronized CompletableFuture<Void> consumeEndpoints(
+      WindmillEndpoints newWindmillEndpoints) {
     LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
     ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
         createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
-
+    CompletableFuture<Void> closeStaleStreams =
+        closeStaleStreams(newWindmillConnections.values(), 
connections.get().windmillStreams());
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
+        createAndStartNewStreams(newWindmillConnections.values()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
             .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
     LOG.info(
         "Setting new connections: {}. Previous connections: {}.",
         newConnectionsState,
         connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+
+    // Close the streams outside the lock.

Review Comment:
   yeah I don't know if we need to block on closing streams. I think we might 
want to block on creating the new ones.
   
   one possible thing is that if we are leaking stuff and they never actually 
close we might not know. But if we have the stream registry I think we'd see 
that. Or we can add some logging to closeAllStreams that it is taking a long 
time.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -122,12 +132,61 @@ void addRequest(QueuedRequest request) {
       byteSize += request.byteSize();
     }
 
-    void countDown() {
+    /** Let waiting for threads know that the request has been successfully 
sent. */
+    synchronized void notifySent() {
       sent.countDown();
     }
 
-    void await() throws InterruptedException {
+    /** Let waiting for threads know that a failure occurred. */
+    synchronized void notifyFailed() {
+      failed = true;
+      sent.countDown();
+    }
+
+    /**
+     * Block until notified of a successful send via {@link #notifySent()} or 
a non-retryable
+     * failure via {@link #notifyFailed()}. On failure, throw an exception to 
on calling threads.
+     */
+    void waitForSendOrFailNotification() throws InterruptedException {
       sent.await();
+      if (failed) {
+        ImmutableList<String> cancelledRequests = 
createStreamCancelledErrorMessage();
+        LOG.error("Requests failed for the following batches: {}", 
cancelledRequests);
+        throw new WindmillStreamShutdownException(
+            "Requests failed for batch containing "
+                + 
cancelledRequests.stream().limit(3).collect(Collectors.joining(", "))
+                + " ... requests. This is most likely due to the stream being 
explicitly closed"
+                + " which happens when the work is marked as invalid on the 
streaming"
+                + " backend when key ranges shuffle around. This is transient 
and corresponding"
+                + " work will eventually be retried.");
+      }
+    }
+
+    ImmutableList<String> createStreamCancelledErrorMessage() {

Review Comment:
   add parameter for the limit or just inline above? We're building up a 
possibly big list just to ignore most of it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -74,69 +76,125 @@ public void onNext(T value) {
     while (true) {
       try {
         synchronized (lock) {
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
-            outboundObserver.onNext(value);
+            tryOnNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          return;
+        }
+
         synchronized (lock) {
           messagesSinceReady = 0;
-          outboundObserver.onNext(value);
+          tryOnNext(value);
           return;
         }
       } catch (TimeoutException e) {
+        if (isReadyNotifier.isTerminated()) {
+          return;
+        }
+
         totalSecondsWaited += waitSeconds;
-        if (totalSecondsWaited > deadlineSeconds) {
-          LOG.error(
-              "Exceeded timeout waiting for the outboundObserver to become 
ready meaning "
-                  + "that the stream deadline was not respected.");
-          throw new RuntimeException(e);
+        if (hasDeadlineExpired(totalSecondsWaited)) {
+          String errorMessage = 
constructStreamCancelledErrorMessage(totalSecondsWaited);
+          LOG.error(errorMessage);
+          throw new StreamObserverCancelledException(errorMessage, e);
         }
+
         if (totalSecondsWaited > 30) {
           LOG.info(
               "Output channel stalled for {}s, outbound thread {}.",
               totalSecondsWaited,
               Thread.currentThread().getName());
         }
+
         waitSeconds = waitSeconds * 2;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+        StreamObserverCancelledException ex = new 
StreamObserverCancelledException(e);
+        LOG.error("Interrupted while waiting for outboundObserver to become 
ready.", ex);
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * Only send the next value if the phaser is not terminated by the time we 
acquire the lock since
+   * the phaser can be terminated at any time.
+   */
+  private void tryOnNext(T value) {

Review Comment:
   Even if you do check here there is still race between checking the phaser 
and calling onNext regardless. Internally the outboundObserver is already 
observing the phaser termination via getPhase() (and also blocking respecting 
phaser termination) so the extra check is just mental overhead I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to