This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3693174c042 [Dataflow Streaming] Add Channelz staus page exporting 
GRPC channelz data (#30211)
3693174c042 is described below

commit 3693174c0421d0ff049042ca283db633431892ef
Author: Arun Pandian <arunpandi...@gmail.com>
AuthorDate: Fri Feb 16 03:51:01 2024 -0800

    [Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data 
(#30211)
    
    Co-authored-by: Arun Pandian <pandi...@google.com>
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../options/DataflowPipelineDebugOptions.java      |   6 +
 .../dataflow/worker/StreamingDataflowWorker.java   |  10 +
 .../worker/windmill/WindmillServerBase.java        |   6 +
 .../worker/windmill/WindmillServerStub.java        |   6 +
 .../windmill/client/grpc/ChannelzServlet.java      | 292 +++++++++++++++++++++
 .../windmill/client/grpc/GrpcDispatcherClient.java |   4 +
 .../windmill/client/grpc/GrpcWindmillServer.java   |   5 +
 .../client/grpc/stubs/WindmillChannelFactory.java  |   2 +
 .../dataflow/worker/FakeWindmillServer.java        |  22 +-
 .../windmill/client/grpc/ChannelzServletTest.java  | 104 ++++++++
 11 files changed, 454 insertions(+), 5 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index fe80b826c56..2376a2c9bbc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin<Project> {
         testcontainers_rabbitmq                     : 
"org.testcontainers:rabbitmq:$testcontainers_version",
         truth                                       : 
"com.google.truth:truth:1.1.5",
         threetenbp                                  : 
"org.threeten:threetenbp:1.6.8",
-        vendored_grpc_1_60_1                        : 
"org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
+        vendored_grpc_1_60_1                        : 
"org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
         vendored_guava_32_1_2_jre                   : 
"org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
         vendored_calcite_1_28_0                     : 
"org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
         woodstox_core_asl                           : 
"org.codehaus.woodstox:woodstox-core-asl:4.4.1",
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 888b0d3f0b6..9b06fa9b7e2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -311,6 +311,12 @@ public interface DataflowPipelineDebugOptions
 
   void setWindmillGetDataStreamCount(int value);
 
+  @Description("If true, will only show windmill service channels on 
/channelz")
+  @Default.Boolean(true)
+  boolean getChannelzShowOnlyWindmillServiceChannels();
+
+  void setChannelzShowOnlyWindmillServiceChannels(boolean value);
+
   /**
    * The amount of time before UnboundedReaders are considered idle and closed 
during streaming
    * execution.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 825c3fb78c7..2e0156bae77 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -114,6 +114,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApp
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
@@ -211,6 +212,7 @@ public class StreamingDataflowWorker {
 
   private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = 
Duration.standardMinutes(5);
   private static final Random clientIdGenerator = new Random();
+  private static final String CHANNELZ_PATH = "/channelz";
   final WindmillStateCache stateCache;
   // Maps from computation ids to per-computation state.
   private final ConcurrentMap<String, ComputationState> computationMap;
@@ -735,6 +737,13 @@ public class StreamingDataflowWorker {
     if (debugCaptureManager != null) {
       debugCaptureManager.start();
     }
+
+    if (windmillServiceEnabled) {
+      ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, 
options, windmillServer);
+      statusPages.addServlet(channelzServlet);
+      statusPages.addCapturePage(channelzServlet);
+    }
+
     statusPages.addServlet(stateCache.statusServlet());
     statusPages.addServlet(new SpecsServlet());
 
@@ -2081,6 +2090,7 @@ public class StreamingDataflowWorker {
   }
 
   private class MetricsDataProvider implements StatusDataProvider {
+
     @Override
     public void appendSummaryHtml(PrintWriter writer) {
       writer.println(workUnitExecutor.summaryHtml());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
index 8caa79cd3f7..a1160b4f98d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
@@ -23,6 +23,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Co
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 /**
@@ -53,6 +54,11 @@ public class WindmillServerBase extends WindmillServerStub {
     // This class is used for windmill appliance and local runner tests.
   }
 
+  @Override
+  public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+    return ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady() {
     return true;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index c327e68d7e9..34461ab471f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -25,6 +25,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Co
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 /** Stub for communicating with a Windmill server. */
@@ -40,6 +41,11 @@ public abstract class WindmillServerStub implements 
StatusDataProvider {
    */
   public abstract void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) 
throws IOException;
 
+  /*
+   * Returns the windmill service endpoints set by setWindmillServiceEndpoints
+   */
+  public abstract ImmutableSet<HostAndPort> getWindmillServiceEndpoints();
+
   /** Returns true iff this WindmillServerStub is ready for making API calls. 
*/
   public abstract boolean isReady();
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
new file mode 100644
index 00000000000..9ab02788603
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
+
+/** Respond to /path with the GRPC channelz data. */
+@Internal
+public class ChannelzServlet extends BaseStatusServlet implements 
DebugCapture.Capturable {
+
+  private static final int MAX_TOP_CHANNELS_TO_RETURN = 500;
+
+  private final ChannelzService channelzService;
+  private final WindmillServerStub windmillServerStub;
+  private final boolean showOnlyWindmillServiceChannels;
+
+  public ChannelzServlet(
+      String path, StreamingDataflowWorkerOptions options, WindmillServerStub 
windmillServerStub) {
+    super(path);
+    channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);
+    this.windmillServerStub = windmillServerStub;
+    showOnlyWindmillServiceChannels = 
options.getChannelzShowOnlyWindmillServiceChannels();
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest request, HttpServletResponse 
response)
+      throws IOException, ServletException {
+    response.setStatus(HttpServletResponse.SC_OK);
+    PrintWriter writer = response.getWriter();
+    captureData(writer);
+  }
+
+  @Override
+  public String pageName() {
+    return getPath();
+  }
+
+  @Override
+  public void captureData(PrintWriter writer) {
+    writer.println("<html>");
+    writer.println("<h1>Channelz</h1>");
+    appendTopChannels(writer);
+    writer.println("</html>");
+  }
+
+  // channelz proto says there won't be cycles in the ref graph.
+  // we track visited ids to be defensive and prevent any accidental cycles.
+  private static class VisitedSets {
+
+    Set<Long> channels = new HashSet<>();
+    Set<Long> subchannels = new HashSet<>();
+  }
+
+  private void appendTopChannels(PrintWriter writer) {
+    SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
+    // IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels
+    // in the worker, we might not return all the windmill channels. If we run 
into
+    // such situations, this logic can be modified to loop till we see an empty
+    // GetTopChannelsResponse response with the end bit set.
+    channelzService.getTopChannels(
+        GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future));
+    GetTopChannelsResponse topChannelsResponse;
+    try {
+      topChannelsResponse = future.get();
+    } catch (Exception e) {
+      String msg = "Failed to get channelz: " + e.getMessage();
+      writer.println(msg);
+      return;
+    }
+
+    List<Channel> topChannels = topChannelsResponse.getChannelList();
+    if (showOnlyWindmillServiceChannels) {
+      topChannels = filterWindmillChannels(topChannels);
+    }
+    writer.println("<h2>Top Level Channels</h2>");
+    writer.println("<table border='1'>");
+    VisitedSets visitedSets = new VisitedSets();
+    for (Channel channel : topChannels) {
+      writer.println("<tr>");
+      writer.println("<td>");
+      writer.println("TopChannelId: " + channel.getRef().getChannelId());
+      writer.println("</td>");
+      writer.println("<td>");
+      appendChannel(channel, writer, visitedSets);
+      writer.println("</td>");
+      writer.println("</tr>");
+    }
+    writer.println("</table>");
+  }
+
+  private List<Channel> filterWindmillChannels(List<Channel> channels) {
+    ImmutableSet<HostAndPort> windmillServiceEndpoints =
+        windmillServerStub.getWindmillServiceEndpoints();
+    Set<String> windmillServiceHosts =
+        
windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet());
+    List<Channel> windmillChannels = new ArrayList<>();
+    for (Channel channel : channels) {
+      for (String windmillServiceHost : windmillServiceHosts) {
+        if (channel.getData().getTarget().contains(windmillServiceHost)) {
+          windmillChannels.add(channel);
+          break;
+        }
+      }
+    }
+    return windmillChannels;
+  }
+
+  private void appendChannels(
+      List<ChannelRef> channelRefs, PrintWriter writer, VisitedSets 
visitedSets) {
+    for (ChannelRef channelRef : channelRefs) {
+      writer.println("<tr>");
+      writer.println("<td>");
+      writer.println("Channel: " + channelRef.getChannelId());
+      writer.println("</td>");
+      writer.println("<td>");
+      appendChannel(channelRef, writer, visitedSets);
+      writer.println("</td>");
+      writer.println("</tr>");
+    }
+  }
+
+  private void appendChannel(ChannelRef channelRef, PrintWriter writer, 
VisitedSets visitedSets) {
+    if (visitedSets.channels.contains(channelRef.getChannelId())) {
+      String msg = "Duplicate Channel Id: " + channelRef;
+      writer.println(msg);
+      return;
+    }
+    visitedSets.channels.add(channelRef.getChannelId());
+    SettableFuture<GetChannelResponse> future = SettableFuture.create();
+    channelzService.getChannel(
+        
GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(),
+        getStreamObserver(future));
+    Channel channel;
+    try {
+      channel = future.get().getChannel();
+    } catch (Exception e) {
+      String msg = "Failed to get Channel: " + channelRef;
+      writer.println(msg + " Exception: " + e.getMessage());
+      return;
+    }
+    appendChannel(channel, writer, visitedSets);
+  }
+
+  private void appendChannel(Channel channel, PrintWriter writer, VisitedSets 
visitedSets) {
+    writer.println("<table border='1'>");
+    writer.println("<tr>");
+    writer.println("<td>");
+    writer.println("ChannelId: " + channel.getRef().getChannelId());
+    writer.println("</td>");
+    writer.println("<td><pre>" + channel);
+    writer.println("</pre></td>");
+    writer.println("</tr>");
+    appendChannels(channel.getChannelRefList(), writer, visitedSets);
+    appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets);
+    appendSockets(channel.getSocketRefList(), writer);
+    writer.println("</table>");
+  }
+
+  private void appendSubChannels(
+      List<SubchannelRef> subchannelRefList, PrintWriter writer, VisitedSets 
visitedSets) {
+    for (SubchannelRef subchannelRef : subchannelRefList) {
+      writer.println("<tr>");
+      writer.println("<td>");
+      writer.println("Sub Channel: " + subchannelRef.getSubchannelId());
+      writer.println("</td>");
+      writer.println("<td>");
+      appendSubchannel(subchannelRef, writer, visitedSets);
+      writer.println("</td>");
+      writer.println("</tr>");
+    }
+  }
+
+  private void appendSubchannel(
+      SubchannelRef subchannelRef, PrintWriter writer, VisitedSets 
visitedSets) {
+    if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) {
+      String msg = "Duplicate Subchannel Id: " + subchannelRef;
+      writer.println(msg);
+      return;
+    }
+    visitedSets.subchannels.add(subchannelRef.getSubchannelId());
+    SettableFuture<GetSubchannelResponse> future = SettableFuture.create();
+    channelzService.getSubchannel(
+        
GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(),
+        getStreamObserver(future));
+    Subchannel subchannel;
+    try {
+      subchannel = future.get().getSubchannel();
+    } catch (Exception e) {
+      String msg = "Failed to get Subchannel: " + subchannelRef;
+      writer.println(msg + " Exception: " + e.getMessage());
+      return;
+    }
+
+    writer.println("<table border='1'>");
+    writer.println("<tr>");
+    writer.println("<td>SubchannelId: " + subchannelRef.getSubchannelId());
+    writer.println("</td>");
+    writer.println("<td><pre>" + subchannel.toString());
+    writer.println("</pre></td>");
+    writer.println("</tr>");
+    appendChannels(subchannel.getChannelRefList(), writer, visitedSets);
+    appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets);
+    appendSockets(subchannel.getSocketRefList(), writer);
+    writer.println("</table>");
+  }
+
+  private void appendSockets(List<SocketRef> socketRefList, PrintWriter 
writer) {
+    for (SocketRef socketRef : socketRefList) {
+      writer.println("<tr>");
+      writer.println("<td>");
+      writer.println("Socket: " + socketRef.getSocketId());
+      writer.println("</td>");
+      writer.println("<td>");
+      appendSocket(socketRef, writer);
+      writer.println("</td>");
+      writer.println("</tr>");
+    }
+  }
+
+  private void appendSocket(SocketRef socketRef, PrintWriter writer) {
+    SettableFuture<GetSocketResponse> future = SettableFuture.create();
+    channelzService.getSocket(
+        
GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(),
+        getStreamObserver(future));
+    Socket socket;
+    try {
+      socket = future.get().getSocket();
+    } catch (Exception e) {
+      String msg = "Failed to get Socket: " + socketRef;
+      writer.println(msg + " Exception: " + e.getMessage());
+      return;
+    }
+    writer.println("<pre>" + socket + "</pre>");
+  }
+
+  private <T> StreamObserver<T> getStreamObserver(SettableFuture<T> future) {
+    return new StreamObserver<T>() {
+      @Nullable T response = null;
+
+      @Override
+      public void onNext(T message) {
+        response = message;
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        future.setException(throwable);
+      }
+
+      @Override
+      public void onCompleted() {
+        future.set(response);
+      }
+    };
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index aa15e0a5e1a..845d54588e7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -97,6 +97,10 @@ class GrpcDispatcherClient {
         : randomlySelectNextStub(windmillServiceStubs));
   }
 
+  ImmutableSet<HostAndPort> getDispatcherEndpoints() {
+    return dispatcherStubs.get().dispatcherEndpoints();
+  }
+
   CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
     ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
         dispatcherStubs.get().windmillMetadataServiceStubs();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 858aeb15985..f94fc09ac53 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -267,6 +267,11 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
     
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
   }
 
+  @Override
+  public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+    return dispatcherClient.getDispatcherEndpoints();
+  }
+
   @Override
   public boolean isReady() {
     return dispatcherClient.hasInitializedEndpoints();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
index cf31436d364..d8e4c064e97 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPor
 public final class WindmillChannelFactory {
   public static final String LOCALHOST = "localhost";
   private static final int DEFAULT_GRPC_PORT = 443;
+  private static final int MAX_REMOTE_TRACE_EVENTS = 100;
 
   private WindmillChannelFactory() {}
 
@@ -139,6 +140,7 @@ public final class WindmillChannelFactory {
 
     return channelBuilder
         .maxInboundMessageSize(Integer.MAX_VALUE)
+        .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
         .maxInboundMetadataSize(1024 * 1024);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 069fcac07c8..e4985193d1c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -64,6 +64,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Ge
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.Duration;
@@ -73,7 +74,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** An in-memory Windmill server that offers provided work and data. */
-final class FakeWindmillServer extends WindmillServerStub {
+public class FakeWindmillServer extends WindmillServerStub {
   private static final Logger LOG = 
LoggerFactory.getLogger(FakeWindmillServer.class);
   private final ResponseQueue<Windmill.GetWorkRequest, 
Windmill.GetWorkResponse> workToOffer;
   private final ResponseQueue<GetDataRequest, GetDataResponse> dataToOffer;
@@ -91,6 +92,9 @@ final class FakeWindmillServer extends WindmillServerStub {
   private boolean dropStreamingCommits = false;
   private final Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses;
 
+  @GuardedBy("this")
+  private ImmutableSet<HostAndPort> dispatcherEndpoints;
+
   public FakeWindmillServer(
       ErrorCollector errorCollector,
       Function<String, Optional<ComputationState>> computationStateFetcher) {
@@ -475,8 +479,18 @@ final class FakeWindmillServer extends WindmillServerStub {
   }
 
   @Override
-  public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws 
IOException {
-    isReady = true;
+  public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+    synchronized (this) {
+      this.dispatcherEndpoints = ImmutableSet.copyOf(endpoints);
+      isReady = true;
+    }
+  }
+
+  @Override
+  public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+    synchronized (this) {
+      return dispatcherEndpoints;
+    }
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
new file mode 100644
index 00000000000..3ec951d9c14
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
+import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelzServletTest {
+
+  @Test
+  public void testRendersAllChannels() throws UnsupportedEncodingException {
+    String windmill1 = "WindmillHost1";
+    String windmill2 = "WindmillHost2";
+    String nonWindmill1 = "NonWindmillHost1";
+    String someOtherHost1 = "SomeOtherHost2";
+    ManagedChannel[] unusedChannels =
+        new ManagedChannel[] {
+          InProcessChannelBuilder.forName(windmill1).build(),
+          InProcessChannelBuilder.forName(windmill2).build(),
+          InProcessChannelBuilder.forName(nonWindmill1).build(),
+          InProcessChannelBuilder.forName(someOtherHost1).build()
+        };
+    StreamingDataflowWorkerOptions options =
+        
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+    FakeWindmillServer fakeWindmillServer =
+        new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty());
+    fakeWindmillServer.setWindmillServiceEndpoints(
+        ImmutableSet.of(HostAndPort.fromHost(windmill1), 
HostAndPort.fromHost(windmill2)));
+    options.setChannelzShowOnlyWindmillServiceChannels(false);
+    ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", 
options, fakeWindmillServer);
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter writer = new PrintWriter(stringWriter);
+    channelzServlet.captureData(writer);
+    writer.flush();
+    String channelzData = stringWriter.toString();
+    assertTrue(channelzData.contains(windmill1));
+    assertTrue(channelzData.contains(windmill2));
+    assertTrue(channelzData.contains(nonWindmill1));
+    assertTrue(channelzData.contains(someOtherHost1));
+  }
+
+  @Test
+  public void testRendersOnlyWindmillChannels() throws 
UnsupportedEncodingException {
+    String windmill1 = "WindmillHost1";
+    String windmill2 = "WindmillHost2";
+    String nonWindmill1 = "NonWindmillHost1";
+    String someOtherHost1 = "SomeOtherHost2";
+    ManagedChannel[] unusedChannels =
+        new ManagedChannel[] {
+          InProcessChannelBuilder.forName(windmill1).build(),
+          InProcessChannelBuilder.forName(windmill2).build(),
+          InProcessChannelBuilder.forName(nonWindmill1).build(),
+          InProcessChannelBuilder.forName(someOtherHost1).build()
+        };
+    StreamingDataflowWorkerOptions options =
+        
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+    FakeWindmillServer fakeWindmillServer =
+        new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty());
+    fakeWindmillServer.setWindmillServiceEndpoints(
+        ImmutableSet.of(HostAndPort.fromHost(windmill1), 
HostAndPort.fromHost(windmill2)));
+    options.setChannelzShowOnlyWindmillServiceChannels(true);
+    ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", 
options, fakeWindmillServer);
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter writer = new PrintWriter(stringWriter);
+    channelzServlet.captureData(writer);
+    writer.flush();
+    String channelzData = stringWriter.toString();
+    assertTrue(channelzData.contains(windmill1));
+    assertTrue(channelzData.contains(windmill2));
+    // The logic does a substring match on the target
+    // NonWindmillHost1 matches since it contains WindmillHost1 which is a 
windmill host
+    assertTrue(channelzData.contains(nonWindmill1));
+    assertFalse(channelzData.contains(someOtherHost1));
+  }
+}

Reply via email to