scwhittle commented on code in PR #31504: URL: https://github.com/apache/beam/pull/31504#discussion_r1627704626
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java: ########## @@ -49,6 +56,8 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class MetricTrackingWindmillServerStub { + private static final Logger LOG = LoggerFactory.getLogger(MetricTrackingWindmillServerStub.class); + private static final String FAN_OUT_REFRESH_WORK_EXECUTOR = "FanOutActiveWorkRefreshExecutor"; Review Comment: append NAME to variable name ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java: ########## @@ -254,6 +266,27 @@ public Windmill.KeyedGetDataResponse getStateData( } } + public Windmill.KeyedGetDataResponse getStateData( + GetDataStream getDataStream, String computation, Windmill.KeyedGetDataRequest request) { + gcThrashingMonitor.waitForResources("GetStateData"); + activeStateReads.getAndIncrement(); + if (getDataStream.isClosed()) { + throw new WorkItemCancelledException(request.getShardingKey()); Review Comment: not decrementing activeReads in this case, move increment into try block? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -615,8 +676,37 @@ public static void main(String[] args) throws Exception { } JvmInitializers.runBeforeProcessing(options); - worker.startStatusPages(); - worker.start(); + try { + worker.startStatusPages(); + worker.start(); + } catch (Throwable e) { + LOG.error("Harness shutting down due to uncaught exception.", e); + worker.stop(); Review Comment: just throw the exception instead of trying to tear-down cleanly? As is the process exit status will be ok since the exception is being caught, and stop() could get stuck somehow and we'd rather just teardown possibly uncleanly ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -615,8 +676,37 @@ public static void main(String[] args) throws Exception { } JvmInitializers.runBeforeProcessing(options); - worker.startStatusPages(); - worker.start(); + try { + worker.startStatusPages(); + worker.start(); + } catch (Throwable e) { + LOG.error("Harness shutting down due to uncaught exception.", e); + worker.stop(); + } + } + + private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) { + Preconditions.checkArgument( + options.isStreaming(), + "%s instantiated with options indicating batch use", + StreamingDataflowWorker.class.getName()); + + Preconditions.checkArgument( + !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT), + "%s cannot be main() class with beam_fn_api enabled", + StreamingDataflowWorker.class.getSimpleName()); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + boolean isIpV6Enabled = options.getDataflowServiceOptions().contains(ENABLE_IPV6_EXPERIMENT); + if (options.isEnableWindmillServiceDirectPath() && !isIpV6Enabled) { Review Comment: log only if isEnableStreamingEngine is also set? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -139,6 +140,10 @@ public void invalidateStuckCommits(Instant stuckCommitDeadline) { stuckCommitDeadline, this::completeWorkAndScheduleNextWorkForKey); } + public ImmutableListMultimap<ShardedKey, Work> currentActiveWorkReadOnly() { Review Comment: add a comment, presumably Work shouldn't have any mutating methods called on it? Is there a better way to enforce that? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -238,7 +239,7 @@ public final void appendSummaryHtml(PrintWriter writer) { protected abstract void appendSpecificHtml(PrintWriter writer); @Override - public final synchronized void close() { + public synchronized void close() { Review Comment: can this stay final? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ########## @@ -41,6 +41,8 @@ public interface WindmillStream { /** Returns when the stream was opened. */ Instant startTime(); + boolean isClosed(); Review Comment: add comment, does this just reflect if close() is called? or if server closed stream also? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java: ########## @@ -55,31 +54,24 @@ public static Channel localhostChannel(int port) { public static ManagedChannel remoteChannel( WindmillServiceAddress windmillServiceAddress, int windmillServiceRpcChannelTimeoutSec) { switch (windmillServiceAddress.getKind()) { - case IPV6: - return remoteChannel(windmillServiceAddress.ipv6(), windmillServiceRpcChannelTimeoutSec); case GCP_SERVICE_ADDRESS: return remoteChannel( windmillServiceAddress.gcpServiceAddress(), windmillServiceRpcChannelTimeoutSec); - // switch is exhaustive will never happen. case AUTHENTICATED_GCP_SERVICE_ADDRESS: - return remoteDirectChannel( - windmillServiceAddress.authenticatedGcpServiceAddress(), - windmillServiceRpcChannelTimeoutSec); + return remoteDirectChannel(windmillServiceAddress.authenticatedGcpServiceAddress()); + // switch is exhaustive will never happen. default: throw new UnsupportedOperationException( - "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS are supported WindmillServiceAddresses."); + "Only GCP_SERVICE_ADDRESS or AUTHENTICATED_GCP_SERVICE_ADDRESS are supported WindmillServiceAddress Kinds."); } } static ManagedChannel remoteDirectChannel( - AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress, - int windmillServiceRpcChannelTimeoutSec) { - return withDefaultChannelOptions( - AltsChannelBuilder.forAddress( - authenticatedGcpServiceAddress.gcpServiceAddress().getHost(), - authenticatedGcpServiceAddress.gcpServiceAddress().getPort()) - .overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()), - windmillServiceRpcChannelTimeoutSec) + AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress) { + return AltsChannelBuilder.forAddress( Review Comment: I think we still want the withDefaultChannelOptions Probably want the flow control window and ssl context from createRemoteChannel unless we've tested otherwise. ########## runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java: ########## @@ -221,13 +222,11 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillServiceStreamMaxBackoffMillis(int value); - @Description( - "If true, Dataflow streaming pipeline will be running in direct path mode." - + " VMs must have IPv6 enabled for this to work.") - @Default.Boolean(false) - boolean getIsWindmillServiceDirectPathEnabled(); Review Comment: might be safer not to change the name of the option, the previous name seems fine. I've had issues in the past since I think options are in both worker and sdk jars and can have issues if they don't line up. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -255,6 +256,16 @@ public final Instant startTime() { return new Instant(startTimeMs.get()); } + @Override + public boolean isClosed() { Review Comment: can this be final? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamCancelledException.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import org.apache.beam.sdk.annotations.Internal; + +@Internal +public final class WindmillStreamCancelledException extends RuntimeException { Review Comment: How about Closed instead of Cancelled? Cancelled sounds more like an rpc being cancelled due to deadline etc which isn't exactly the same ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java: ########## @@ -171,6 +181,10 @@ public State getState() { return currentState.state(); } + public @Nullable GetDataStream getDataStream() { Review Comment: as mentioned in the other change, could this be an interface restricted to heartbeats intead of full GetDataStream? GetDataStream is confusing since it overlaps with getKeyedDataFn Having it be some interface, also seems like we could possibly share more heartbeat logic between the paths as in general it could be group work by HeartbeatRefreshBatcher (or better name) call batcher.SendHeartbeats(work for that batcher) and internally that could delegate to single GetDataStream for the direct case, or for SE could lookup a data stream from stub pool each time, or for SA could do nothing (or we set the batcher to null to do that). ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -350,7 +396,23 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept batch.countDown(); } else { // Wait for this batch to be sent before parsing the response. - batch.await(); + boolean batchSent = false; + long secondsWaited = 0; + long waitFor = 10; + while (!batchSent) { + if (isClosed()) { Review Comment: by polling we could add latency in cases where the threads are tied up waiting for reads and thus new work is not processed. Do we need some finally (batch.countDown()) above to handle exceptions in sendBatch if the stream is closed? In that case it seems like it woudl unspool without needing this isClosed check here. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServiceAddress.java: ########## @@ -19,38 +19,30 @@ import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; -import java.net.Inet6Address; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** Used to create channels to communicate with Streaming Engine via gRpc. */ @AutoOneOf(WindmillServiceAddress.Kind.class) Review Comment: add internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -209,11 +261,29 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( onNewWindmillEndpoints); } - private StreamObserverFactory newStreamObserverFactory() { + private StreamObserverFactory newBufferringStreamObserverFactory() { return StreamObserverFactory.direct( DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, windmillMessagesBetweenIsReadyChecks); } + /** + * Simple {@link StreamObserverFactory} that does not buffer or provide extra functionality for + * request observers. + * + * @implNote Used to create stream observers for direct path streams that do not share any + * underlying resources between threads. + */ + private StreamObserverFactory newSimpleStreamObserverFactory() { Review Comment: I think that we may still want the isReady checking, as otherwise if outgoing sending is falling behind it just buffers more and more in grpc output stream buffer and can oom. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -209,11 +261,29 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( onNewWindmillEndpoints); } - private StreamObserverFactory newStreamObserverFactory() { + private StreamObserverFactory newBufferringStreamObserverFactory() { Review Comment: this is confusing name since it doesn't buffer beyond what grpc stream internally does. It just respects grpc flow control which it seems like we'd want to do in the direct path case as well. There is a different BufferingStreamObserver for fnapi which does do buffering with a queue. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java: ########## @@ -331,6 +333,22 @@ synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats( .collect(toImmutableList()); } + synchronized ImmutableListMultimap<ShardedKey, Work> getReadOnlyActiveWork() { + // Do not return a reference to the underlying workQueue as iterations over it will cause a + // ConcurrentModificationException as it is not a thread-safe data structure. + ImmutableListMultimap.Builder<ShardedKey, Work> readOnlyActiveWork = + ImmutableListMultimap.builder(); + for (Entry<ShardedKey, Deque<ExecutableWork>> keyedWorkQueues : activeWork.entrySet()) { Review Comment: think you could do activeWork.entrySet().stream().collect(flatteningToImmutableListMultimap(e -> e.getKey(), e -> e.getValue.stream().map(Executable::work)); ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -176,6 +176,7 @@ protected final void startStream() { onNewStream(); if (clientClosed.get()) { close(); + streamRegistry.remove(this); Review Comment: it seems like close should trigger onStreamFinished which would do this ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java: ########## @@ -289,15 +312,18 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); + CompletableFuture<Void> closeStaleStreams = closeStaleStreams(newWindmillConnections.values()); + StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + .setWindmillStreams(createAndStartNewStreams(newWindmillConnections.values()).join()) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); + closeStaleStreams.join(); Review Comment: does building the connection state take any time? Or is there a reasno we can't move this join to after updating connections and the budget refresh below so we start using hte new streams earlier? Otherwise it seems simpler to just keep the closeStaleStreamsAndCreateNewStreams which internally does them in parallel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org