scwhittle commented on code in PR #31902: URL: https://github.com/apache/beam/pull/31902#discussion_r1756596636
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java: ########## @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { "<table border=\"1\" " + "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">"); writer.println( - "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>"); + "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Produced By</th></tr>"); Review Comment: don't see matching row change for header change ########## runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java: ########## @@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) { return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; } } + + /** EnableStreamingEngine defaults to false unless one of the two experiments is set. */ Review Comment: looks like just 1 experiment ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -82,23 +83,25 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class); private static final String PUBLISH_NEW_WORKER_METADATA_THREAD = "PublishNewWorkerMetadataThread"; private static final String CONSUME_NEW_WORKER_METADATA_THREAD = "ConsumeNewWorkerMetadataThread"; + private static final String STREAM_STARTER_THREAD = "WindmillStreamStarter"; Review Comment: add int to format for these, thread name takes in thread count for name ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -290,12 +332,60 @@ private void setLastError(String error) { lastErrorTime.set(DateTime.now()); } + protected abstract void shutdownInternal(); + public static class WindmillStreamShutdownException extends RuntimeException { public WindmillStreamShutdownException(String message) { super(message); } } + /** + * Request observer that allows resetting its internal delegate using the given {@link + * #requestObserverSupplier}. + */ + @ThreadSafe + private static class ResettableRequestObserver<RequestT> implements StreamObserver<RequestT> { + + private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; + + @GuardedBy("this") + private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; + + private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) { + this.requestObserverSupplier = requestObserverSupplier; + this.delegateRequestObserver = null; + } + + private synchronized StreamObserver<RequestT> delegate() { + if (delegateRequestObserver == null) { + throw new NullPointerException( Review Comment: use Precondtions.checkNotNull? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -317,29 +352,38 @@ public boolean commitWorkItem( if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { return false; } - PendingRequest request = new PendingRequest(computation, commitRequest, onDone); + + PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); add(idGenerator.incrementAndGet(), request); return true; } /** Flushes any pending work items to the wire. */ @Override public void flush() { - flushInternal(queue); - queuedBytes = 0; - queue.clear(); + if (!isShutdown()) { + try { + flushInternal(queue); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queuedBytes = 0; + queue.clear(); Review Comment: perhaps the finally should cover the isShutdown case too? seems like we might want to clear teh queue anyway ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -156,29 +162,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); - RuntimeException finalException = null; + @Nullable RuntimeException failure = null; for (int i = 0; i < response.getRequestIdCount(); ++i) { long requestId = response.getRequestId(i); if (requestId == HEARTBEAT_REQUEST_ID) { continue; } - PendingRequest done = pending.remove(requestId); - if (done == null) { - LOG.error("Got unknown commit request ID: {}", requestId); + PendingRequest pendingRequest = pending.remove(requestId); + if (pendingRequest == null) { + // Skip responses when the stream is shutdown since they are now invalid. + if (!isShutdown()) { + LOG.error("Got unknown commit request ID: {}", requestId); + } } else { try { - done.onDone.accept( + pendingRequest.completeWithStatus( (i < response.getStatusCount()) ? response.getStatus(i) : CommitStatus.OK); } catch (RuntimeException e) { // Catch possible exceptions to ensure that an exception for one commit does not prevent - // other commits from being processed. + // other commits from being processed. Aggregate all the failures to throw after + // processing the response if they exist. LOG.warn("Exception while processing commit response.", e); - finalException = e; + if (failure == null) failure = e; + else failure.addSuppressed(e); } } } - if (finalException != null) { - throw finalException; + if (failure != null) { + throw failure; + } + } + + @Override + protected void shutdownInternal() { + Iterator<PendingRequest> pendingRequests = pending.values().iterator(); + while (pendingRequests.hasNext()) { + PendingRequest pendingRequest = pendingRequests.next(); Review Comment: what are concurrenthashmap guarantees around concurrent iteration and removal? From docs it apperas iteration is weakly-consistent so it may see removed things and it is possible that we would call completeWithStatus twice on the same request if the iteration here and the onResponse happened at the same time. To ensure we don't you could remove everything you iterate and examine return value like: PendingRequest r = pendingRequests.remove(iter.next()) if (r != null) { r.completeWithStatus(); } ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ########## @@ -165,46 +174,52 @@ private static Watermarks createWatermarks( .build(); } - private void sendRequestExtension(GetWorkBudget adjustment) { - inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment)); - StreamingGetWorkRequest extension = - StreamingGetWorkRequest.newBuilder() - .setRequestExtension( - Windmill.StreamingGetWorkRequestExtension.newBuilder() - .setMaxItems(adjustment.items()) - .setMaxBytes(adjustment.bytes())) - .build(); + private void sendRequestExtension() { Review Comment: add impl note about not synchronizing here due to this running on grpc serial executor for message which can deadlock since we send on the stream beneath the synchronization ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -74,69 +76,125 @@ public void onNext(T value) { while (true) { try { synchronized (lock) { + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { + awaitPhase = isReadyNotifier.getPhase(); + // If getPhase() returns a value less than 0, the phaser has been terminated. + if (awaitPhase < 0) { + return; + } + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { - outboundObserver.onNext(value); + tryOnNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { - awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. - phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + int nextPhase = + isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + // If nextPhase is a value less than 0, the phaser has been terminated. + if (nextPhase < 0) { + return; + } + synchronized (lock) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } catch (TimeoutException e) { + if (isReadyNotifier.isTerminated()) { Review Comment: don't think you need this, awaitAdvanceInterruptibly will return -1 if it's terminated. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -265,18 +317,21 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( onNewWindmillEndpoints); } - private StreamObserverFactory newStreamObserverFactory() { + private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) { return StreamObserverFactory.direct( - DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, windmillMessagesBetweenIsReadyChecks); + hasDeadline ? DEFAULT_DEADLINE_SECONDS : NO_DEADLINE, windmillMessagesBetweenIsReadyChecks); } @Override public void appendSummaryHtml(PrintWriter writer) { writer.write("Active Streams:<br>"); - for (AbstractWindmillStream<?, ?> stream : streamRegistry) { - stream.appendSummaryHtml(writer); - writer.write("<br>"); - } + streamRegistry.stream() + .sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken)) + .collect( Review Comment: seems like if you've sorted you can remove the to map and just apply printSummaryHtmlForWorker to the AbstractWindmillStream iterable ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java: ########## @@ -110,7 +110,18 @@ public static ProcessingContext createProcessingContext( GetDataClient getDataClient, Consumer<Commit> workCommitter, HeartbeatSender heartbeatSender) { - return ProcessingContext.create(computationId, getDataClient, workCommitter, heartbeatSender); + return ProcessingContext.create( + computationId, getDataClient, workCommitter, heartbeatSender, ""); + } + + public static ProcessingContext createProcessingContext( + String backendWorkerToken, Review Comment: nit: put token last to match constructor ordering? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java: ########## @@ -312,7 +312,7 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt( if (executableWork != null) { Work work = executableWork.work(); if (work.isStuckCommittingAt(stuckCommitDeadline)) { - LOG.error( Review Comment: think this should remain an error, if we have such stuck commits we should debug why ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -125,23 +127,22 @@ private FanOutStreamingEngineWorkerHarness( this.connections = new AtomicReference<>(StreamingEngineConnectionState.EMPTY); this.channelCachingStubFactory = channelCachingStubFactory; this.dispatcherClient = dispatcherClient; - this.isBudgetRefreshPaused = new AtomicBoolean(false); this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); + this.windmillStreamStarter = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(STREAM_STARTER_THREAD).build()); + this.windmillStreamCloser = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(STREAM_CLOSER_THREAD).build()); Review Comment: just single thread pool for starting/closing as stream ops? The stack will be pretty clear what the operation is. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -290,12 +332,60 @@ private void setLastError(String error) { lastErrorTime.set(DateTime.now()); } + protected abstract void shutdownInternal(); + public static class WindmillStreamShutdownException extends RuntimeException { public WindmillStreamShutdownException(String message) { super(message); } } + /** + * Request observer that allows resetting its internal delegate using the given {@link + * #requestObserverSupplier}. + */ + @ThreadSafe + private static class ResettableRequestObserver<RequestT> implements StreamObserver<RequestT> { + + private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; + + @GuardedBy("this") + private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; Review Comment: remove the volatile if synchronized. Though it seems like volatile might be enough if you just read it once in delegate. can do that by assigning to stack variable or using precondtions return value. return Preconditions.checkNotNull(delegateRequestObserver); ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() { public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); getWorkerMetadataStream.get().halfClose(); - getWorkBudgetRefresher.stop(); newWorkerMetadataPublisher.shutdownNow(); newWorkerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } + private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + consumeEndpoints(newWindmillEndpoints).join(); + } + /** * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on * new backend worker metadata. */ - private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - isBudgetRefreshPaused.set(true); + private synchronized CompletableFuture<Void> consumeEndpoints( + WindmillEndpoints newWindmillEndpoints) { LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - + CompletableFuture<Void> closeStaleStreams = + closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); + ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams = + createAndStartNewStreams(newWindmillConnections.values()).join(); StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); - LOG.info( "Setting new connections: {}. Previous connections: {}.", newConnectionsState, connections.get()); connections.set(newConnectionsState); - isBudgetRefreshPaused.set(false); - getWorkBudgetRefresher.requestBudgetRefresh(); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + + // Close the streams outside the lock. + return closeStaleStreams; + } + + /** Close the streams that are no longer valid asynchronously. */ + private CompletableFuture<Void> closeStaleStreams( + Collection<WindmillConnection> newWindmillConnections, + ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) { + return CompletableFuture.allOf( + currentStreams.entrySet().stream() + .filter( + connectionAndStream -> + !newWindmillConnections.contains(connectionAndStream.getKey())) + .map( + entry -> + CompletableFuture.runAsync( + () -> { + LOG.debug("Closing streams to {}", entry.getKey().backendWorkerToken()); Review Comment: log the full connection here and below? the endpoint matters too we can also consider if we want these to be info logs ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -185,49 +202,69 @@ protected final void startStream() { while (true) { try { synchronized (this) { + if (isShutdown.get()) { + break; + } startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + requestObserver.reset(); onNewStream(); if (clientClosed.get()) { halfClose(); } return; } } catch (Exception e) { - LOG.error("Failed to create new stream, retrying: ", e); + logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); - } catch (InterruptedException | IOException i) { + sleeper.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; Review Comment: rethrow the interrupted exception? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() { public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); getWorkerMetadataStream.get().halfClose(); - getWorkBudgetRefresher.stop(); newWorkerMetadataPublisher.shutdownNow(); newWorkerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } + private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + consumeEndpoints(newWindmillEndpoints).join(); + } + /** * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on * new backend worker metadata. */ - private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - isBudgetRefreshPaused.set(true); + private synchronized CompletableFuture<Void> consumeEndpoints( + WindmillEndpoints newWindmillEndpoints) { LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - + CompletableFuture<Void> closeStaleStreams = + closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); + ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams = + createAndStartNewStreams(newWindmillConnections.values()).join(); StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); - LOG.info( "Setting new connections: {}. Previous connections: {}.", newConnectionsState, connections.get()); connections.set(newConnectionsState); - isBudgetRefreshPaused.set(false); - getWorkBudgetRefresher.requestBudgetRefresh(); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + + // Close the streams outside the lock. Review Comment: instead of synchronized method how about using a synchronized block to keep the join in same method? CompletableFuture<Void> blocker; synchronized(this) {..} blocker.join(); ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -185,49 +202,69 @@ protected final void startStream() { while (true) { try { synchronized (this) { + if (isShutdown.get()) { + break; + } startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + requestObserver.reset(); onNewStream(); if (clientClosed.get()) { halfClose(); } return; } } catch (Exception e) { - LOG.error("Failed to create new stream, retrying: ", e); + logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); - } catch (InterruptedException | IOException i) { + sleeper.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } catch (IOException ioe) { // Keep trying to create the stream. } } } + + // We were never able to start the stream, remove it from the stream registry. + streamRegistry.remove(this); Review Comment: if rethrowing above, this will need to be done in that case as well. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -82,13 +86,14 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win private final CountDownLatch finishLatch; private final Set<AbstractWindmillStream<?, ?>> streamRegistry; private final int logEveryNStreamFailures; - private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; - // Indicates if the current stream in requestObserver is closed by calling close() method - private final AtomicBoolean streamClosed; private final String backendWorkerToken; - private @Nullable StreamObserver<RequestT> requestObserver; + private final ResettableRequestObserver<RequestT> requestObserver; + private final AtomicBoolean isShutdown; + private final AtomicBoolean streamClosed; Review Comment: keep comment for streamclosed ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -156,29 +162,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); - RuntimeException finalException = null; + @Nullable RuntimeException failure = null; for (int i = 0; i < response.getRequestIdCount(); ++i) { long requestId = response.getRequestId(i); if (requestId == HEARTBEAT_REQUEST_ID) { continue; } - PendingRequest done = pending.remove(requestId); - if (done == null) { - LOG.error("Got unknown commit request ID: {}", requestId); + PendingRequest pendingRequest = pending.remove(requestId); + if (pendingRequest == null) { + // Skip responses when the stream is shutdown since they are now invalid. + if (!isShutdown()) { + LOG.error("Got unknown commit request ID: {}", requestId); + } } else { try { - done.onDone.accept( + pendingRequest.completeWithStatus( (i < response.getStatusCount()) ? response.getStatus(i) : CommitStatus.OK); } catch (RuntimeException e) { // Catch possible exceptions to ensure that an exception for one commit does not prevent - // other commits from being processed. + // other commits from being processed. Aggregate all the failures to throw after + // processing the response if they exist. LOG.warn("Exception while processing commit response.", e); - finalException = e; + if (failure == null) failure = e; Review Comment: think spotless likes braces for if statements ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ########## @@ -165,46 +174,52 @@ private static Watermarks createWatermarks( .build(); } - private void sendRequestExtension(GetWorkBudget adjustment) { - inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment)); - StreamingGetWorkRequest extension = - StreamingGetWorkRequest.newBuilder() - .setRequestExtension( - Windmill.StreamingGetWorkRequestExtension.newBuilder() - .setMaxItems(adjustment.items()) - .setMaxBytes(adjustment.bytes())) - .build(); + private void sendRequestExtension() { + GetWorkBudget currentInFlightBudget = inFlightBudget.get(); + GetWorkBudget currentMaxBudget = maxGetWorkBudget.get(); - executor() - .execute( + // If the outstanding items or bytes limit has gotten too low, top both off with a + // GetWorkExtension. The goal is to keep the limits relatively close to their maximum + // values without sending too many extension requests. + if (currentInFlightBudget.items() < currentMaxBudget.items() / 2 + || currentInFlightBudget.bytes() < currentMaxBudget.bytes() / 2) { + GetWorkBudget extension = currentMaxBudget.subtract(currentInFlightBudget); + if (extension.items() > 0 || extension.bytes() > 0) { Review Comment: it seems like at least one is positive by the above if. But it seems like we might want to ensure the extension is non-negative for both fields? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -311,71 +401,87 @@ public void onNext(ResponseT response) { @Override public void onError(Throwable t) { - onStreamFinished(t); + if (maybeTeardownStream()) { + return; + } + + Status status = Status.fromThrowable(t); + setLastError(status.toString()); + + if (t instanceof StreamObserverCancelledException) { + logger.error( + "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}", + getClass(), + backendWorkerToken, + t.getStackTrace(), + t); + } else if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { Review Comment: factor out the error handling/logging and share it with the error path in onCompleted ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; + +/** + * Indicates that a {@link WindmillStream#shutdown()} was called while waiting for some internal + * operation to complete. Most common use of this exception should be conversion to a {@link + * org.apache.beam.runners.dataflow.worker.WorkItemCancelledException} as the {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} being processed by {@link + * WindmillStream}. + */ +final class WindmillStreamShutdownException extends RuntimeException { Review Comment: internal? this should be translated before it gets to a user correct? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -265,18 +317,21 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( onNewWindmillEndpoints); } - private StreamObserverFactory newStreamObserverFactory() { + private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) { Review Comment: how about passing in deadline instead? easier to read at call-sites ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -39,7 +39,9 @@ @ThreadSafe public final class DirectStreamObserver<T> implements StreamObserver<T> { Review Comment: maybe this change can be submitted separately? Would be nice to have a test for it showing the previous bug as well. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java: ########## @@ -67,17 +70,18 @@ public <T extends GetWorkBudgetSpender> void distributeBudget( GetWorkBudgetSpender getWorkBudgetSpender = streamAndDesiredBudget.getKey(); GetWorkBudget desired = streamAndDesiredBudget.getValue(); GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); - if (isBelowFiftyPercentOfTarget(remaining, desired)) { + if (isBelowFiftyPercentOfTarget(remaining, desired) && isActiveWorkBudgetAware) { Review Comment: this is unclear to me. It seems like in one case the adjustment is addtiive and another is resetting Wouldn't we want to only increase by the desired-remaining in either case? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -74,69 +76,125 @@ public void onNext(T value) { while (true) { try { synchronized (lock) { + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { + awaitPhase = isReadyNotifier.getPhase(); + // If getPhase() returns a value less than 0, the phaser has been terminated. + if (awaitPhase < 0) { + return; + } + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { - outboundObserver.onNext(value); + tryOnNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { - awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. - phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + int nextPhase = + isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + // If nextPhase is a value less than 0, the phaser has been terminated. + if (nextPhase < 0) { + return; + } + synchronized (lock) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } catch (TimeoutException e) { + if (isReadyNotifier.isTerminated()) { + return; + } + totalSecondsWaited += waitSeconds; - if (totalSecondsWaited > deadlineSeconds) { - LOG.error( - "Exceeded timeout waiting for the outboundObserver to become ready meaning " - + "that the stream deadline was not respected."); - throw new RuntimeException(e); + if (hasDeadlineExpired(totalSecondsWaited)) { + String errorMessage = constructStreamCancelledErrorMessage(totalSecondsWaited); + LOG.error(errorMessage); + throw new StreamObserverCancelledException(errorMessage, e); } + if (totalSecondsWaited > 30) { LOG.info( "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); } + waitSeconds = waitSeconds * 2; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + StreamObserverCancelledException ex = new StreamObserverCancelledException(e); + LOG.error("Interrupted while waiting for outboundObserver to become ready.", ex); + throw ex; + } + } + } + + /** + * Only send the next value if the phaser is not terminated by the time we acquire the lock since + * the phaser can be terminated at any time. + */ + private void tryOnNext(T value) { Review Comment: not sure these phaser checks are necessary since the outboundObserver itself should stop blocking for onNext if the notifier is terminated. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -303,71 +368,78 @@ public void onNext(ResponseT response) { @Override public void onError(Throwable t) { - onStreamFinished(t); + if (isStreamDone()) { + return; + } + + Status status = Status.fromThrowable(t); + setLastError(status.toString()); + // Don't log every error since it will get noisy, and many errors transient. + if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { + long nowMillis = Instant.now().getMillis(); + String responseDebug; + if (lastResponseTimeMs.get() == 0) { + responseDebug = "never received response"; + } else { + responseDebug = "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; + } + LOG.debug( + "{} streaming Windmill RPC errors for {}, last was: {} with status {}." + + " created {}ms ago, {}. This is normal with autoscaling.", + AbstractWindmillStream.this.getClass(), + errorCount.get(), + t, + status, + nowMillis - startTimeMs.get(), + responseDebug); + } + + // If the stream was stopped due to a resource exhausted error then we are throttled. + if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { + startThrottleTimer(); + } + + try { + long sleep = backoff.nextBackOffMillis(); + sleepUntil.set(Instant.now().getMillis() + sleep); + sleeper.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + // Ignore. + } + + tryRestartStream(); } @Override public void onCompleted() { - onStreamFinished(null); + if (isStreamDone()) { + return; + } + errorCount.incrementAndGet(); + String error = + "Stream completed successfully but did not complete requested operations, " + + "recreating"; + LOG.warn(error); + setLastError(error); + tryRestartStream(); } - private void onStreamFinished(@Nullable Throwable t) { - synchronized (this) { - if (clientClosed.get() && !hasPendingRequests()) { - streamRegistry.remove(AbstractWindmillStream.this); - finishLatch.countDown(); - return; - } + private void tryRestartStream() { + if (!isShutdown()) { + executeSafely(AbstractWindmillStream.this::startStream); } - if (t != null) { - Status status = null; - if (t instanceof StatusRuntimeException) { - status = ((StatusRuntimeException) t).getStatus(); - } - String statusError = status == null ? "" : status.toString(); - setLastError(statusError); - if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { - long nowMillis = Instant.now().getMillis(); - String responseDebug; - if (lastResponseTimeMs.get() == 0) { - responseDebug = "never received response"; - } else { - responseDebug = - "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; - } - LOG.debug( - "{} streaming Windmill RPC errors for {}, last was: {} with status {}." - + " created {}ms ago, {}. This is normal with autoscaling.", - AbstractWindmillStream.this.getClass(), - errorCount.get(), - t, - statusError, - nowMillis - startTimeMs.get(), - responseDebug); - } - // If the stream was stopped due to a resource exhausted error then we are throttled. - if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { - startThrottleTimer(); - } + } - try { - long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - // Ignore. - } - } else { - errorCount.incrementAndGet(); - String error = - "Stream completed successfully but did not complete requested operations, " - + "recreating"; - LOG.warn(error); - setLastError(error); + private synchronized boolean isStreamDone() { + if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) { + streamRegistry.remove(AbstractWindmillStream.this); Review Comment: Add a comment on return value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org