scwhittle commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1690287323


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -151,41 +154,41 @@ private static long debugDuration(long nowMs, long 
startMs) {
    */
   protected abstract void startThrottleTimer();
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
-  }
-
   /** Send a request to the server. */
   protected final void send(RequestT request) {
     lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      // Check if we should send after we acquire the lock.
+      if (isShutdown()) {
+        LOG.warn("Send called on a shutdown stream.");
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      requestObserver.onNext(request);
     }
   }
 
   /** Starts the underlying stream. */
   protected final void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
-    while (true) {
+    while (!isShutdown.get()) {

Review Comment:
   I'd just remove this check since you do it first thing below



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (isStreamDone()) {
+        return;
+      }
+
+      Status status = Status.fromThrowable(t);
+      setLastError(status.toString());
+      // Don't log every error since it will get noisy, and many errors 
transient.
+      if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+        long nowMillis = Instant.now().getMillis();
+        String responseDebug;
+        if (lastResponseTimeMs.get() == 0) {
+          responseDebug = "never received response";
+        } else {
+          responseDebug = "received response " + (nowMillis - 
lastResponseTimeMs.get()) + "ms ago";
+        }
+        LOG.debug(
+            "{} streaming Windmill RPC errors for {}, last was: {} with status 
{}."
+                + " created {}ms ago, {}. This is normal with autoscaling.",
+            AbstractWindmillStream.this.getClass(),
+            errorCount.get(),
+            t,
+            status,
+            nowMillis - startTimeMs.get(),
+            responseDebug);
+      }
+
+      // If the stream was stopped due to a resource exhausted error then we 
are throttled.
+      if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+        startThrottleTimer();
+      }
+
+      try {
+        long sleep = backoff.nextBackOffMillis();
+        sleepUntil.set(Instant.now().getMillis() + sleep);
+        sleeper.sleep(sleep);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (IOException e) {
+        // Ignore.
+      }
+
+      tryRestartStream();
     }
 
     @Override
     public void onCompleted() {
-      onStreamFinished(null);
+      if (isStreamDone()) {
+        return;
+      }
+      errorCount.incrementAndGet();
+      String error =
+          "Stream completed successfully but did not complete requested 
operations, "
+              + "recreating";
+      LOG.warn(error);
+      setLastError(error);
+      tryRestartStream();
     }
 
-    private void onStreamFinished(@Nullable Throwable t) {
-      synchronized (this) {
-        if (clientClosed.get() && !hasPendingRequests()) {
-          streamRegistry.remove(AbstractWindmillStream.this);
-          finishLatch.countDown();
-          return;
-        }
+    private void tryRestartStream() {
+      if (!isShutdown()) {
+        executeSafely(AbstractWindmillStream.this::startStream);
       }
-      if (t != null) {
-        Status status = null;
-        if (t instanceof StatusRuntimeException) {
-          status = ((StatusRuntimeException) t).getStatus();
-        }
-        String statusError = status == null ? "" : status.toString();
-        setLastError(statusError);
-        if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
-          long nowMillis = Instant.now().getMillis();
-          String responseDebug;
-          if (lastResponseTimeMs.get() == 0) {
-            responseDebug = "never received response";
-          } else {
-            responseDebug =
-                "received response " + (nowMillis - lastResponseTimeMs.get()) 
+ "ms ago";
-          }
-          LOG.debug(
-              "{} streaming Windmill RPC errors for {}, last was: {} with 
status {}."
-                  + " created {}ms ago, {}. This is normal with autoscaling.",
-              AbstractWindmillStream.this.getClass(),
-              errorCount.get(),
-              t,
-              statusError,
-              nowMillis - startTimeMs.get(),
-              responseDebug);
-        }
-        // If the stream was stopped due to a resource exhausted error then we 
are throttled.
-        if (status != null && status.getCode() == 
Status.Code.RESOURCE_EXHAUSTED) {
-          startThrottleTimer();
-        }
+    }
 
-        try {
-          long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        } catch (IOException e) {
-          // Ignore.
-        }
-      } else {
-        errorCount.incrementAndGet();
-        String error =
-            "Stream completed successfully but did not complete requested 
operations, "
-                + "recreating";
-        LOG.warn(error);
-        setLastError(error);
+    private synchronized boolean isStreamDone() {
+      if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) {
+        streamRegistry.remove(AbstractWindmillStream.this);

Review Comment:
   I think this kind of side effect is confusing in method that just sounds 
like an accessor
   
   how about maybeTeardownStream()?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -295,34 +367,13 @@ private StreamingDataflowWorker(
             .setStatusPages(workerStatusPages)
             .setStateCache(stateCache)
             .setComputationStateCache(this.computationStateCache)
-            
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
-            .setGetDataStatusProvider(getDataClient::printHtml)
+            .setCurrentActiveCommitBytes(currentActiveCommitBytes)

Review Comment:
   could set these above directly and get rid of currentActiveCommitBytes and 
getDataStatusProvider variables



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -176,7 +180,7 @@ private StreamingDataflowWorker(
       DataflowWorkerHarnessOptions options,
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
-      StreamingWorkerStatusReporter workerStatusReporter,
+      Function<Supplier<Long>, StreamingWorkerStatusReporter> 
streamingWorkerStatusReporterFactory,

Review Comment:
   should this be a functional interface so you can document? it's unclear what 
the long supplier is



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -151,41 +154,41 @@ private static long debugDuration(long nowMs, long 
startMs) {
    */
   protected abstract void startThrottleTimer();
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
-  }
-
   /** Send a request to the server. */
   protected final void send(RequestT request) {
     lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      // Check if we should send after we acquire the lock.
+      if (isShutdown()) {
+        LOG.warn("Send called on a shutdown stream.");

Review Comment:
   if this is possible, don't log as customers don't like warning logs and open 
issues about them
   
   if this should not be possible, perhaps better to throw an exception so that 
we notice and fix it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (isStreamDone()) {
+        return;
+      }
+
+      Status status = Status.fromThrowable(t);
+      setLastError(status.toString());
+      // Don't log every error since it will get noisy, and many errors 
transient.
+      if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+        long nowMillis = Instant.now().getMillis();
+        String responseDebug;
+        if (lastResponseTimeMs.get() == 0) {
+          responseDebug = "never received response";
+        } else {
+          responseDebug = "received response " + (nowMillis - 
lastResponseTimeMs.get()) + "ms ago";
+        }
+        LOG.debug(
+            "{} streaming Windmill RPC errors for {}, last was: {} with status 
{}."
+                + " created {}ms ago, {}. This is normal with autoscaling.",
+            AbstractWindmillStream.this.getClass(),
+            errorCount.get(),
+            t,
+            status,
+            nowMillis - startTimeMs.get(),
+            responseDebug);
+      }
+
+      // If the stream was stopped due to a resource exhausted error then we 
are throttled.
+      if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+        startThrottleTimer();
+      }
+
+      try {
+        long sleep = backoff.nextBackOffMillis();
+        sleepUntil.set(Instant.now().getMillis() + sleep);
+        sleeper.sleep(sleep);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (IOException e) {
+        // Ignore.
+      }
+
+      tryRestartStream();
     }
 
     @Override
     public void onCompleted() {
-      onStreamFinished(null);
+      if (isStreamDone()) {
+        return;
+      }
+      errorCount.incrementAndGet();
+      String error =
+          "Stream completed successfully but did not complete requested 
operations, "
+              + "recreating";
+      LOG.warn(error);
+      setLastError(error);
+      tryRestartStream();
     }
 
-    private void onStreamFinished(@Nullable Throwable t) {
-      synchronized (this) {
-        if (clientClosed.get() && !hasPendingRequests()) {
-          streamRegistry.remove(AbstractWindmillStream.this);
-          finishLatch.countDown();
-          return;
-        }
+    private void tryRestartStream() {
+      if (!isShutdown()) {

Review Comment:
   this looks racy, we check shutdown above in isStreamDone with 
synchronization, but then if it is shutdown before here, we end up with an 
error but won't restart the stream or remove it from the registry.
   
   I would remove this one



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("delegateRequestObserver")
+    private final AtomicReference<StreamObserver<RequestT>> 
delegateRequestObserver;

Review Comment:
   All of the reads appear to be guarded, seems like you coudl either not have 
the atomicref or change delegate() to just return atomic ref without locking.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -166,7 +170,6 @@ private static Watermarks createWatermarks(
   }
 
   private void sendRequestExtension(GetWorkBudget adjustment) {
-    inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment));

Review Comment:
   what is inflight budget tracking if we're not incrementing it here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -69,6 +70,7 @@ private GrpcCommitWorkStream(
       AtomicLong idGenerator,
       int streamingRpcBatchLimit) {
     super(
+        LOG,

Review Comment:
   can you change type of pending to ConcurrentHashMap above,  no reason to use 
Map if we're relying on concurrency and not exposing it directly.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -163,7 +165,7 @@ protected void onResponse(StreamingCommitResponse response) 
{
         continue;
       }
       PendingRequest done = pending.remove(requestId);
-      if (done == null) {
+      if (done == null && !isShutdown()) {

Review Comment:
   this is going to get nullptr exception below, instead move the isShutdown 
check to whether or not to log inside this if



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget 
adjustment) {
                     .setMaxBytes(adjustment.bytes()))
             .build();
 
-    executor()
-        .execute(
-            () -> {
-              try {
-                send(extension);
-              } catch (IllegalStateException e) {
-                // Stream was closed.
-              }
-            });
+    executeSafely(
+        () -> {
+          try {
+            send(extension);
+          } catch (IllegalStateException e) {

Review Comment:
   should we handle illegalstateexception internally? or rely on executeSafely 
to catch it?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("delegateRequestObserver")
+    private final AtomicReference<StreamObserver<RequestT>> 
delegateRequestObserver;
+
+    @GuardedBy("delegateRequestObserver")
+    /* Indicates if onCompleted() has been called for the current 
delegateRequestObserver instance.
+    Reset to false when reset() is called. */
+    private volatile boolean isClosed;

Review Comment:
   ditto seems like this could be just a normal guarded boolean not volatile



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget 
adjustment) {
                     .setMaxBytes(adjustment.bytes()))
             .build();
 
-    executor()
-        .execute(
-            () -> {
-              try {
-                send(extension);
-              } catch (IllegalStateException e) {
-                // Stream was closed.
-              }
-            });
+    executeSafely(
+        () -> {
+          try {
+            send(extension);
+          } catch (IllegalStateException e) {
+            // Stream was closed.
+          }
+        });
   }
 
   @Override
   protected synchronized void onNewStream() {
     workItemAssemblers.clear();
-    // Add the current in-flight budget to the next adjustment. Only positive 
values are allowed
-    // here
-    // with negatives defaulting to 0, since GetWorkBudgets cannot be created 
with negative values.
-    GetWorkBudget budgetAdjustment = 
nextBudgetAdjustment.get().apply(inFlightBudget.get());
-    inFlightBudget.set(budgetAdjustment);
-    send(
-        StreamingGetWorkRequest.newBuilder()
-            .setRequest(
-                request
-                    .toBuilder()
-                    .setMaxBytes(budgetAdjustment.bytes())
-                    .setMaxItems(budgetAdjustment.items()))
-            .build());
+    if (!isShutdown()) {
+      // Add the current in-flight budget to the next adjustment. Only 
positive values are allowed
+      // here with negatives defaulting to 0, since GetWorkBudgets cannot be 
created with negative
+      // values. We just sent the budget, reset it.
+      GetWorkBudget currentBudgetAdjustment =
+          nextBudgetAdjustment.getAndUpdate(ignored -> 
GetWorkBudget.noBudget());
+      GetWorkBudget budgetAdjustment = 
currentBudgetAdjustment.apply(inFlightBudget.get());
 
-    // We just sent the budget, reset it.
-    nextBudgetAdjustment.set(GetWorkBudget.noBudget());
+      inFlightBudget.updateAndGet(budget -> 
budget.apply(currentBudgetAdjustment));

Review Comment:
   this doesn't make sense to me, currentBudgetAdjustment is 
   (previous nextBudgetAdjustment + inflightBudget) so this is 2*inflightBudget 
+ nextBudgetAdjustment
   
   Also it would be better to use the result of this instead of a separate 
inflightBudget.get()



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -117,7 +119,7 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   @Override
-  protected synchronized void onNewStream() {
+  protected void onNewStream() {

Review Comment:
   why remove the synchronized? it is beneath synchronized block in superclass 
anyway so I think it executes equivalently but it is clearer since we want 
send/pendign to be consistent to match the sends below.
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("delegateRequestObserver")
+    private final AtomicReference<StreamObserver<RequestT>> 
delegateRequestObserver;
+
+    @GuardedBy("delegateRequestObserver")
+    /* Indicates if onCompleted() has been called for the current 
delegateRequestObserver instance.
+    Reset to false when reset() is called. */
+    private volatile boolean isClosed;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = new AtomicReference<>();
+      this.isClosed = false;
+    }
+
+    private StreamObserver<RequestT> delegate() {
+      synchronized (delegateRequestObserver) {

Review Comment:
   can you just simplify on this and use synchronized methods?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -167,67 +170,103 @@ private StreamObserver<RequestT> requestObserver() {
   }
 
   /** Send a request to the server. */
-  protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
-    synchronized (this) {
-      if (streamClosed.get()) {
-        throw new IllegalStateException("Send called on a client closed 
stream.");
+  protected final synchronized void send(RequestT request) {
+    if (isShutdown()) {
+      logger.debug(
+          "Send called on a shutdown stream={} to worker{}.", getClass(), 
backendWorkerToken);
+      return;
+    }
+
+    if (requestObserver.isClosed()) {
+      throw new IllegalStateException("Send called on a client closed 
stream.");
+    }
+
+    try {
+      lastSendTimeMs.set(Instant.now().getMillis());
+      requestObserver.onNext(request);
+    } catch (StreamObserverCancelledException e) {
+      if (isShutdown()) {
+        logger.debug("Stream was closed or shutdown during send.", e);
+        return;
       }
 
-      requestObserver().onNext(request);
+      logger.error(
+          "StreamObserver was unexpectedly cancelled for stream={}, worker={}. 
stacktrace={}",
+          getClass(),
+          backendWorkerToken,
+          e.getStackTrace(),
+          e);
+      throw e;
     }
   }
 
   /** Starts the underlying stream. */
   protected final void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
-    while (true) {
+    while (!isShutdown.get()) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed.get() && !isShutdown()) {

Review Comment:
   whatever isShutdown() is protecting against it seems racy because it could 
become true right after you observe it.
   
   seems like we could attempt to close the response observer  that shutdown 
called onError on.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -117,8 +121,8 @@ private GrpcDirectGetWorkStream(
     this.heartbeatSender = Suppliers.memoize(heartbeatSender::get);
     this.workCommitter = Suppliers.memoize(workCommitter::get);
     this.getDataClient = Suppliers.memoize(getDataClient::get);
-    this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
     this.nextBudgetAdjustment = new 
AtomicReference<>(GetWorkBudget.noBudget());
+    this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());

Review Comment:
   revert? better to avoid no-op changes



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
     lastErrorTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("delegateRequestObserver")
+    private final AtomicReference<StreamObserver<RequestT>> 
delegateRequestObserver;
+
+    @GuardedBy("delegateRequestObserver")
+    /* Indicates if onCompleted() has been called for the current 
delegateRequestObserver instance.
+    Reset to false when reset() is called. */
+    private volatile boolean isClosed;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = new AtomicReference<>();
+      this.isClosed = false;
+    }
+
+    private StreamObserver<RequestT> delegate() {
+      synchronized (delegateRequestObserver) {
+        if (delegateRequestObserver.get() == null) {
+          throw new NullPointerException(
+              "requestObserver cannot be null. Missing a call to startStream() 
to initialize.");
+        }
+
+        return delegateRequestObserver.get();
+      }
+    }
+
+    private void reset() {
+      synchronized (delegateRequestObserver) {
+        delegateRequestObserver.set(requestObserverSupplier.get());
+        isClosed = false;
+      }
+    }
+
+    @Override
+    public void onNext(RequestT requestT) {
+      delegate().onNext(requestT);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      delegate().onError(throwable);
+    }
+
+    @Override
+    public void onCompleted() {
+      synchronized (delegateRequestObserver) {
+        if (!isClosed) {
+          // onCompleted() can only be called once for each StreamObserver 
instance, or else an
+          // IllegalStateException is thrown.
+          delegate().onCompleted();
+          isClosed = true;
+        }
+      }
+    }
+
+    public boolean isClosed() {

Review Comment:
   exposing this for non-status pages seems risky since the synchronized state 
can change between when this is checked and another method is called.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -182,6 +184,12 @@ protected void onResponse(StreamingCommitResponse 
response) {
     }
   }
 
+  @Override
+  protected void shutdownInternal() {
+    pending.values().forEach(pendingRequest -> 
pendingRequest.onDone.accept(CommitStatus.ABORTED));

Review Comment:
   I'm worried if pending has somethign inserted between iterating and clear
   
   can you instead use an iterator where you remove as you go so everything 
removed is guaranteed to be aborted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to