scwhittle commented on code in PR #31902: URL: https://github.com/apache/beam/pull/31902#discussion_r1758496171
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -278,24 +329,72 @@ public String backendWorkerToken() { } @Override - public void shutdown() { + public final void shutdown() { + // Don't lock here as isShutdown checks are used in the stream to free blocked + // threads or as exit conditions to loops. if (isShutdown.compareAndSet(false, true)) { requestObserver() .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + shutdownInternal(); } } - private void setLastError(String error) { - lastError.set(error); - lastErrorTime.set(DateTime.now()); + private void recordRestartReason(String error) { + lastRestartReason.set(error); + lastRestartTime.set(DateTime.now()); } + protected abstract void shutdownInternal(); + public static class WindmillStreamShutdownException extends RuntimeException { public WindmillStreamShutdownException(String message) { super(message); } } + /** + * Request observer that allows resetting its internal delegate using the given {@link + * #requestObserverSupplier}. + */ + @ThreadSafe + private static class ResettableRequestObserver<RequestT> implements StreamObserver<RequestT> { + + private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; + + @GuardedBy("this") + private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; + + private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) { + this.requestObserverSupplier = requestObserverSupplier; + this.delegateRequestObserver = null; + } + + private synchronized StreamObserver<RequestT> delegate() { + return Preconditions.checkNotNull( + delegateRequestObserver, + "requestObserver cannot be null. Missing a call to startStream() to initialize."); + } + + private synchronized void reset() { + delegateRequestObserver = requestObserverSupplier.get(); + } + + @Override + public void onNext(RequestT requestT) { + delegate().onNext(requestT); + } + + @Override + public void onError(Throwable throwable) { + delegate().onError(throwable); + } + + @Override + public synchronized void onCompleted() { Review Comment: rm synchronized? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ########## @@ -264,24 +284,18 @@ protected void startThrottleTimer() { @Override public void adjustBudget(long itemsDelta, long bytesDelta) { Review Comment: can we name this setBudget? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -251,14 +300,16 @@ public final void appendSummaryHtml(PrintWriter writer) { streamClosed.get()); } - // Don't require synchronization on stream, see the appendSummaryHtml comment. + /** + * @implNote Don't require synchronization on stream, see the {@link + * #appendSummaryHtml(PrintWriter)} comment. + */ protected abstract void appendSpecificHtml(PrintWriter writer); @Override - public final synchronized void halfClose() { - // Synchronization of close and onCompleted necessary for correct retry logic in onNewStream. + public final void halfClose() { clientClosed.set(true); - requestObserver().onCompleted(); + requestObserver.onCompleted(); Review Comment: why is it safe to not synchronize? The comment indicated it was for a reason. In general I think that we need to synchronize using the request observer because grpc doesn't expect it to be used simultaneously by multiple threads. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java: ########## @@ -337,8 +337,17 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println( "<table border=\"1\" " + "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">"); + // Columns. writer.println( - "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>"); + "<tr>" + + "<th>Key</th>" + + "<th>Token</th>" + + "<th>Queued</th>" + + "<th>Active For</th>" + + "<th>State</th>" + + "<th>State Active For</th>" + + "<th>Produced By</th>" Review Comment: nit: produced is confusing since we use that for shuffle terminology. How about just "Backend" ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -302,38 +334,54 @@ public void appendSpecificHtml(PrintWriter writer) { } private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) { - while (true) { + while (!isShutdown()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { - // Retry issuing the request since the response stream was cancelled. - continue; + } catch (AppendableInputStream.InvalidInputStreamStateException + | VerifyException + | CancellationException e) { + handleShutdown(request); + if (!(e instanceof CancellationException)) { + throw e; + } } catch (IOException e) { LOG.error("Parsing GetData response failed: ", e); - continue; } catch (InterruptedException e) { Thread.currentThread().interrupt(); + handleShutdown(request); throw new RuntimeException(e); } finally { pending.remove(request.id()); } } + + // If we have exited the loop here, the stream has been shutdown. Cancel the response stream. + request.getResponseStream().cancel(); + throw new WindmillStreamShutdownException( + "Cannot send request=[" + request + "] on closed stream."); + } + + private void handleShutdown(QueuedRequest request) { + if (isShutdown()) { + throw new WindmillStreamShutdownException( + "Cannot send request=[" + request + "] on closed stream."); Review Comment: if we keep this, should we pass in the exception above to add as a suppressed exception? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -302,38 +334,54 @@ public void appendSpecificHtml(PrintWriter writer) { } private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) { - while (true) { + while (!isShutdown()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { - // Retry issuing the request since the response stream was cancelled. - continue; + } catch (AppendableInputStream.InvalidInputStreamStateException + | VerifyException + | CancellationException e) { + handleShutdown(request); Review Comment: Can we make shutdown handling consistent? we throw an exception for shutdown here but if we don't run the loop due to isShutdown we just return from this function without doing anything. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -290,12 +332,60 @@ private void setLastError(String error) { lastErrorTime.set(DateTime.now()); } + protected abstract void shutdownInternal(); + public static class WindmillStreamShutdownException extends RuntimeException { public WindmillStreamShutdownException(String message) { super(message); } } + /** + * Request observer that allows resetting its internal delegate using the given {@link + * #requestObserverSupplier}. + */ + @ThreadSafe + private static class ResettableRequestObserver<RequestT> implements StreamObserver<RequestT> { + + private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; + + @GuardedBy("this") + private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; Review Comment: You don't need both, synchronization also ensures that memory changes are viewed by other threads. Volatile just ensures one fields's changes are visible to other threads. This seems like a good overview: https://blogs.oracle.com/javamagazine/post/java-thread-synchronization-volatile-final-atomic-deadlocks ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() { public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); getWorkerMetadataStream.get().halfClose(); - getWorkBudgetRefresher.stop(); newWorkerMetadataPublisher.shutdownNow(); newWorkerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } + private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + consumeEndpoints(newWindmillEndpoints).join(); + } + /** * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on * new backend worker metadata. */ - private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - isBudgetRefreshPaused.set(true); + private synchronized CompletableFuture<Void> consumeEndpoints( + WindmillEndpoints newWindmillEndpoints) { LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - + CompletableFuture<Void> closeStaleStreams = + closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); + ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams = + createAndStartNewStreams(newWindmillConnections.values()).join(); StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); - LOG.info( "Setting new connections: {}. Previous connections: {}.", newConnectionsState, connections.get()); connections.set(newConnectionsState); - isBudgetRefreshPaused.set(false); - getWorkBudgetRefresher.requestBudgetRefresh(); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + + // Close the streams outside the lock. + return closeStaleStreams; + } + + /** Close the streams that are no longer valid asynchronously. */ + private CompletableFuture<Void> closeStaleStreams( + Collection<WindmillConnection> newWindmillConnections, + ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) { + return CompletableFuture.allOf( + currentStreams.entrySet().stream() + .filter( + connectionAndStream -> + !newWindmillConnections.contains(connectionAndStream.getKey())) + .map( + entry -> + CompletableFuture.runAsync( + () -> { + LOG.debug("Closing streams to {}", entry.getKey().backendWorkerToken()); Review Comment: debug logs can be enabled via pipeline option when starting the job. Arun is adding dynamic job settings, it could be cool if we allowed modifying log granularity dynamically in the future to help investigate things. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; + +/** + * Indicates that a {@link WindmillStream#shutdown()} was called while waiting for some internal + * operation to complete. Most common use of this exception should be conversion to a {@link + * org.apache.beam.runners.dataflow.worker.WorkItemCancelledException} as the {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} being processed by {@link + * WindmillStream}. + */ +final class WindmillStreamShutdownException extends RuntimeException { Review Comment: Good point that's probably just necessary for public stuff. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -265,18 +317,21 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( onNewWindmillEndpoints); } - private StreamObserverFactory newStreamObserverFactory() { + private StreamObserverFactory newStreamObserverFactory(boolean hasDeadline) { return StreamObserverFactory.direct( - DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, windmillMessagesBetweenIsReadyChecks); + hasDeadline ? DEFAULT_DEADLINE_SECONDS : NO_DEADLINE, windmillMessagesBetweenIsReadyChecks); } @Override public void appendSummaryHtml(PrintWriter writer) { writer.write("Active Streams:<br>"); - for (AbstractWindmillStream<?, ?> stream : streamRegistry) { - stream.appendSummaryHtml(writer); - writer.write("<br>"); - } + streamRegistry.stream() + .sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken)) + .collect( Review Comment: I missed the multimap bit. I agree the grouping is nice. However isn't the sorting then unnecessary if we're putting it in a map right away? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -272,42 +263,106 @@ private void startWorkerMetadataConsumer() { public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); getWorkerMetadataStream.get().halfClose(); - getWorkBudgetRefresher.stop(); newWorkerMetadataPublisher.shutdownNow(); newWorkerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } + private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + consumeEndpoints(newWindmillEndpoints).join(); + } + /** * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on * new backend worker metadata. */ - private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - isBudgetRefreshPaused.set(true); + private synchronized CompletableFuture<Void> consumeEndpoints( + WindmillEndpoints newWindmillEndpoints) { LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - + CompletableFuture<Void> closeStaleStreams = + closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); + ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams = + createAndStartNewStreams(newWindmillConnections.values()).join(); StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); - LOG.info( "Setting new connections: {}. Previous connections: {}.", newConnectionsState, connections.get()); connections.set(newConnectionsState); - isBudgetRefreshPaused.set(false); - getWorkBudgetRefresher.requestBudgetRefresh(); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + + // Close the streams outside the lock. Review Comment: yeah I don't know if we need to block on closing streams. I think we might want to block on creating the new ones. one possible thing is that if we are leaking stuff and they never actually close we might not know. But if we have the stream registry I think we'd see that. Or we can add some logging to closeAllStreams that it is taking a long time. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java: ########## @@ -122,12 +132,61 @@ void addRequest(QueuedRequest request) { byteSize += request.byteSize(); } - void countDown() { + /** Let waiting for threads know that the request has been successfully sent. */ + synchronized void notifySent() { sent.countDown(); } - void await() throws InterruptedException { + /** Let waiting for threads know that a failure occurred. */ + synchronized void notifyFailed() { + failed = true; + sent.countDown(); + } + + /** + * Block until notified of a successful send via {@link #notifySent()} or a non-retryable + * failure via {@link #notifyFailed()}. On failure, throw an exception to on calling threads. + */ + void waitForSendOrFailNotification() throws InterruptedException { sent.await(); + if (failed) { + ImmutableList<String> cancelledRequests = createStreamCancelledErrorMessage(); + LOG.error("Requests failed for the following batches: {}", cancelledRequests); + throw new WindmillStreamShutdownException( + "Requests failed for batch containing " + + cancelledRequests.stream().limit(3).collect(Collectors.joining(", ")) + + " ... requests. This is most likely due to the stream being explicitly closed" + + " which happens when the work is marked as invalid on the streaming" + + " backend when key ranges shuffle around. This is transient and corresponding" + + " work will eventually be retried."); + } + } + + ImmutableList<String> createStreamCancelledErrorMessage() { Review Comment: add parameter for the limit or just inline above? We're building up a possibly big list just to ignore most of it. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -74,69 +76,125 @@ public void onNext(T value) { while (true) { try { synchronized (lock) { + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { + awaitPhase = isReadyNotifier.getPhase(); + // If getPhase() returns a value less than 0, the phaser has been terminated. + if (awaitPhase < 0) { + return; + } + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { - outboundObserver.onNext(value); + tryOnNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { - awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. - phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + int nextPhase = + isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + // If nextPhase is a value less than 0, the phaser has been terminated. + if (nextPhase < 0) { + return; + } + synchronized (lock) { messagesSinceReady = 0; - outboundObserver.onNext(value); + tryOnNext(value); return; } } catch (TimeoutException e) { + if (isReadyNotifier.isTerminated()) { + return; + } + totalSecondsWaited += waitSeconds; - if (totalSecondsWaited > deadlineSeconds) { - LOG.error( - "Exceeded timeout waiting for the outboundObserver to become ready meaning " - + "that the stream deadline was not respected."); - throw new RuntimeException(e); + if (hasDeadlineExpired(totalSecondsWaited)) { + String errorMessage = constructStreamCancelledErrorMessage(totalSecondsWaited); + LOG.error(errorMessage); + throw new StreamObserverCancelledException(errorMessage, e); } + if (totalSecondsWaited > 30) { LOG.info( "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); } + waitSeconds = waitSeconds * 2; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + StreamObserverCancelledException ex = new StreamObserverCancelledException(e); + LOG.error("Interrupted while waiting for outboundObserver to become ready.", ex); + throw ex; + } + } + } + + /** + * Only send the next value if the phaser is not terminated by the time we acquire the lock since + * the phaser can be terminated at any time. + */ + private void tryOnNext(T value) { Review Comment: Even if you do check here there is still race between checking the phaser and calling onNext regardless. Internally the outboundObserver is already observing the phaser termination via getPhase() (and also blocking respecting phaser termination) so the extra check is just mental overhead I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org