scwhittle commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1627704626


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -49,6 +56,8 @@
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class MetricTrackingWindmillServerStub {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricTrackingWindmillServerStub.class);
+  private static final String FAN_OUT_REFRESH_WORK_EXECUTOR = 
"FanOutActiveWorkRefreshExecutor";

Review Comment:
   append NAME to variable name



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -254,6 +266,27 @@ public Windmill.KeyedGetDataResponse getStateData(
     }
   }
 
+  public Windmill.KeyedGetDataResponse getStateData(
+      GetDataStream getDataStream, String computation, 
Windmill.KeyedGetDataRequest request) {
+    gcThrashingMonitor.waitForResources("GetStateData");
+    activeStateReads.getAndIncrement();
+    if (getDataStream.isClosed()) {
+      throw new WorkItemCancelledException(request.getShardingKey());

Review Comment:
   not decrementing activeReads in this case, move increment into try block?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -615,8 +676,37 @@ public static void main(String[] args) throws Exception {
     }
 
     JvmInitializers.runBeforeProcessing(options);
-    worker.startStatusPages();
-    worker.start();
+    try {
+      worker.startStatusPages();
+      worker.start();
+    } catch (Throwable e) {
+      LOG.error("Harness shutting down due to uncaught exception.", e);
+      worker.stop();

Review Comment:
   just throw the exception instead of trying to tear-down cleanly?
   As is the process exit status will be ok since the exception is being 
caught, and stop() could get stuck somehow and we'd rather just teardown 
possibly uncleanly



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -615,8 +676,37 @@ public static void main(String[] args) throws Exception {
     }
 
     JvmInitializers.runBeforeProcessing(options);
-    worker.startStatusPages();
-    worker.start();
+    try {
+      worker.startStatusPages();
+      worker.start();
+    } catch (Throwable e) {
+      LOG.error("Harness shutting down due to uncaught exception.", e);
+      worker.stop();
+    }
+  }
+
+  private static void validateWorkerOptions(DataflowWorkerHarnessOptions 
options) {
+    Preconditions.checkArgument(
+        options.isStreaming(),
+        "%s instantiated with options indicating batch use",
+        StreamingDataflowWorker.class.getName());
+
+    Preconditions.checkArgument(
+        !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT),
+        "%s cannot be main() class with beam_fn_api enabled",
+        StreamingDataflowWorker.class.getSimpleName());
+  }
+
+  private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions 
options) {
+    boolean isIpV6Enabled = 
options.getDataflowServiceOptions().contains(ENABLE_IPV6_EXPERIMENT);
+    if (options.isEnableWindmillServiceDirectPath() && !isIpV6Enabled) {

Review Comment:
   log only if isEnableStreamingEngine is also set?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -139,6 +140,10 @@ public void invalidateStuckCommits(Instant 
stuckCommitDeadline) {
         stuckCommitDeadline, this::completeWorkAndScheduleNextWorkForKey);
   }
 
+  public ImmutableListMultimap<ShardedKey, Work> currentActiveWorkReadOnly() {

Review Comment:
   add a comment, presumably Work shouldn't have any mutating methods called on 
it? Is there a better way to enforce that?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -238,7 +239,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
   protected abstract void appendSpecificHtml(PrintWriter writer);
 
   @Override
-  public final synchronized void close() {
+  public synchronized void close() {

Review Comment:
   can this stay final?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -41,6 +41,8 @@ public interface WindmillStream {
   /** Returns when the stream was opened. */
   Instant startTime();
 
+  boolean isClosed();

Review Comment:
   add comment, does this just reflect if close() is called? or if server 
closed stream also?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -55,31 +54,24 @@ public static Channel localhostChannel(int port) {
   public static ManagedChannel remoteChannel(
       WindmillServiceAddress windmillServiceAddress, int 
windmillServiceRpcChannelTimeoutSec) {
     switch (windmillServiceAddress.getKind()) {
-      case IPV6:
-        return remoteChannel(windmillServiceAddress.ipv6(), 
windmillServiceRpcChannelTimeoutSec);
       case GCP_SERVICE_ADDRESS:
         return remoteChannel(
             windmillServiceAddress.gcpServiceAddress(), 
windmillServiceRpcChannelTimeoutSec);
-        // switch is exhaustive will never happen.
       case AUTHENTICATED_GCP_SERVICE_ADDRESS:
-        return remoteDirectChannel(
-            windmillServiceAddress.authenticatedGcpServiceAddress(),
-            windmillServiceRpcChannelTimeoutSec);
+        return 
remoteDirectChannel(windmillServiceAddress.authenticatedGcpServiceAddress());
+        // switch is exhaustive will never happen.
       default:
         throw new UnsupportedOperationException(
-            "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS 
are supported WindmillServiceAddresses.");
+            "Only GCP_SERVICE_ADDRESS or AUTHENTICATED_GCP_SERVICE_ADDRESS are 
supported WindmillServiceAddress Kinds.");
     }
   }
 
   static ManagedChannel remoteDirectChannel(
-      AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress,
-      int windmillServiceRpcChannelTimeoutSec) {
-    return withDefaultChannelOptions(
-            AltsChannelBuilder.forAddress(
-                    
authenticatedGcpServiceAddress.gcpServiceAddress().getHost(),
-                    
authenticatedGcpServiceAddress.gcpServiceAddress().getPort())
-                
.overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()),
-            windmillServiceRpcChannelTimeoutSec)
+      AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress) {
+    return AltsChannelBuilder.forAddress(

Review Comment:
   I think we still want the withDefaultChannelOptions
   Probably want the flow control window and ssl context from 
createRemoteChannel unless we've tested otherwise.



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -221,13 +222,11 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceStreamMaxBackoffMillis(int value);
 
-  @Description(
-      "If true, Dataflow streaming pipeline will be running in direct path 
mode."
-          + " VMs must have IPv6 enabled for this to work.")
-  @Default.Boolean(false)
-  boolean getIsWindmillServiceDirectPathEnabled();

Review Comment:
   might be safer not to change the name of the option, the previous name seems 
fine.
   
   I've had issues in the past since I think options are in both worker and sdk 
jars and can have issues if they don't line up.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -255,6 +256,16 @@ public final Instant startTime() {
     return new Instant(startTimeMs.get());
   }
 
+  @Override
+  public boolean isClosed() {

Review Comment:
   can this be final?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamCancelledException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class WindmillStreamCancelledException extends RuntimeException {

Review Comment:
   How about Closed instead of Cancelled? Cancelled sounds more like an rpc 
being cancelled due to deadline etc which isn't exactly the same



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -171,6 +181,10 @@ public State getState() {
     return currentState.state();
   }
 
+  public @Nullable GetDataStream getDataStream() {

Review Comment:
   as mentioned in the other change, could this be an interface restricted to 
heartbeats intead of full GetDataStream?
   GetDataStream is confusing since it overlaps with getKeyedDataFn
   
   Having it be some interface, also seems like we could possibly share more 
heartbeat logic between the paths as in general it could be
   group work by HeartbeatRefreshBatcher (or better name)
   call batcher.SendHeartbeats(work for that batcher)
   
   and internally that could delegate to single GetDataStream for the direct 
case, or for SE could lookup a data stream from stub pool each time, or for SA 
could do nothing (or we set the batcher to null to do that).



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -350,7 +396,23 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
       batch.countDown();
     } else {
       // Wait for this batch to be sent before parsing the response.
-      batch.await();
+      boolean batchSent = false;
+      long secondsWaited = 0;
+      long waitFor = 10;
+      while (!batchSent) {
+        if (isClosed()) {

Review Comment:
   by polling we could add latency in cases where the threads are tied up 
waiting for reads and thus new work is not processed.
   
   Do we need some finally (batch.countDown()) above to handle exceptions in 
sendBatch if the stream is closed? In that case it seems like it woudl unspool 
without needing this isClosed check here.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServiceAddress.java:
##########
@@ -19,38 +19,30 @@
 
 import com.google.auto.value.AutoOneOf;
 import com.google.auto.value.AutoValue;
-import java.net.Inet6Address;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 /** Used to create channels to communicate with Streaming Engine via gRpc. */
 @AutoOneOf(WindmillServiceAddress.Kind.class)

Review Comment:
   add internal



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -209,11 +261,29 @@ public GetWorkerMetadataStream 
createGetWorkerMetadataStream(
         onNewWindmillEndpoints);
   }
 
-  private StreamObserverFactory newStreamObserverFactory() {
+  private StreamObserverFactory newBufferringStreamObserverFactory() {
     return StreamObserverFactory.direct(
         DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
   }
 
+  /**
+   * Simple {@link StreamObserverFactory} that does not buffer or provide 
extra functionality for
+   * request observers.
+   *
+   * @implNote Used to create stream observers for direct path streams that do 
not share any
+   *     underlying resources between threads.
+   */
+  private StreamObserverFactory newSimpleStreamObserverFactory() {

Review Comment:
   I think that we may still want the isReady checking, as otherwise if 
outgoing sending is falling behind it just buffers more and more in grpc output 
stream buffer and can oom.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -209,11 +261,29 @@ public GetWorkerMetadataStream 
createGetWorkerMetadataStream(
         onNewWindmillEndpoints);
   }
 
-  private StreamObserverFactory newStreamObserverFactory() {
+  private StreamObserverFactory newBufferringStreamObserverFactory() {

Review Comment:
   this is confusing name since it doesn't buffer beyond what grpc stream 
internally does. It just respects grpc flow control which it seems like we'd 
want to do in the direct path case as well.
   
   There is a different BufferingStreamObserver for fnapi which does do 
buffering with a queue.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -331,6 +333,22 @@ synchronized ImmutableList<HeartbeatRequest> 
getKeyHeartbeats(
         .collect(toImmutableList());
   }
 
+  synchronized ImmutableListMultimap<ShardedKey, Work> getReadOnlyActiveWork() 
{
+    // Do not return a reference to the underlying workQueue as iterations 
over it will cause a
+    // ConcurrentModificationException as it is not a thread-safe data 
structure.
+    ImmutableListMultimap.Builder<ShardedKey, Work> readOnlyActiveWork =
+        ImmutableListMultimap.builder();
+    for (Entry<ShardedKey, Deque<ExecutableWork>> keyedWorkQueues : 
activeWork.entrySet()) {

Review Comment:
   think you could do
   activeWork.entrySet().stream().collect(flatteningToImmutableListMultimap(e 
-> e.getKey(), e -> e.getValue.stream().map(Executable::work));



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -176,6 +176,7 @@ protected final void startStream() {
           onNewStream();
           if (clientClosed.get()) {
             close();
+            streamRegistry.remove(this);

Review Comment:
   it seems like close should trigger onStreamFinished which would do this



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##########
@@ -289,15 +312,18 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
     ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
         createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
 
+    CompletableFuture<Void> closeStaleStreams = 
closeStaleStreams(newWindmillConnections.values());
+
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
             .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            
.setWindmillStreams(createAndStartNewStreams(newWindmillConnections.values()).join())
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
 
+    closeStaleStreams.join();

Review Comment:
   does building the connection state take any time? Or is there a reasno we 
can't move this join to after updating connections and the budget refresh below 
so we start using hte new streams earlier?
   
   Otherwise it seems simpler to just keep the 
closeStaleStreamsAndCreateNewStreams which internally does them in parallel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to