This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-30425-mt-shutdown-channels
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8681ab9901d38af3c826eb75bbf0816cc55b0627
Author: Yi Hu <huu...@gmail.com>
AuthorDate: Wed Mar 20 14:27:29 2024 -0400

    Revert "Cache and close windmill grpc channels (#30425)"
    
    This reverts commit c1c255a0a433d7cdf1b5f0bc61986d395c7703ad.
---
 .../google-cloud-dataflow-java/worker/build.gradle |  12 --
 .../dataflow/worker/StreamingDataflowWorker.java   |  11 +-
 .../worker/windmill/WindmillConnection.java        |   5 -
 .../windmill/client/grpc/GrpcDispatcherClient.java |   4 +-
 .../windmill/client/grpc/GrpcWindmillServer.java   |  29 +---
 .../client/grpc/StreamingEngineClient.java         |  25 ++--
 .../windmill/client/grpc/stubs/ChannelCache.java   | 117 ----------------
 .../grpc/stubs/ChannelCachingStubFactory.java      |  38 ------
 .../client/grpc/stubs/IsolationChannel.java        |   2 +-
 ...Factory.java => RemoteWindmillStubFactory.java} |  38 +++---
 .../client/grpc/stubs/WindmillChannelFactory.java  |   2 +-
 .../client/grpc/GrpcWindmillServerTest.java        |   3 +-
 .../client/grpc/StreamingEngineClientTest.java     |  31 +++--
 .../client/grpc/WindmillStreamSenderTest.java      |   3 +-
 .../client/grpc/stubs/ChannelCacheTest.java        | 150 ---------------------
 .../windmill/testing/FakeWindmillStubFactory.java  |  27 ++--
 .../budget/EvenGetWorkBudgetDistributorTest.java   |   7 +-
 17 files changed, 78 insertions(+), 426 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle 
b/runners/google-cloud-dataflow-java/worker/build.gradle
index 4ddb3e2755e..124c11026df 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -71,10 +71,6 @@ def excluded_dependencies = [
         library.java.truth                           // Test only
 ]
 
-// For Java8+ and less than Java11, use versions 2.x.x.
-// Java11+ can use versions 3.x.x per https://github.com/ben-manes/caffeine.
-def caffeine_cache_version = "2.9.3"
-
 applyJavaNature(
         automaticModuleName: 'org.apache.beam.runners.dataflow.worker',
         archivesBaseName: 
'beam-runners-google-cloud-dataflow-java-legacy-worker',
@@ -141,13 +137,6 @@ applyJavaNature(
             relocate("org.eclipse.jetty", 
getWorkerRelocatedPath("org.eclipse.jetty"))
             relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet"))
 
-            // Use Caffeine cache instead of Guava cache.
-            // Context: 
https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder
-            dependencies {
-                
include(dependency("com.github.ben-manes.caffeine:caffeine:${caffeine_cache_version}}"))
-            }
-            relocate("com.github.ben-manes.caffeine", 
getWorkerRelocatedPath("com.github.ben-manes.caffeine"))
-
             // We don't relocate windmill since it is already underneath the 
org.apache.beam.runners.dataflow.worker namespace and never
             // expect a user pipeline to include it. There is also a JNI 
component that windmill server relies on which makes
             // arbitrary relocation more difficult.
@@ -213,7 +202,6 @@ dependencies {
     implementation "javax.servlet:javax.servlet-api:3.1.0"
     implementation "org.eclipse.jetty:jetty-server:9.2.10.v20150310"
     implementation "org.eclipse.jetty:jetty-servlet:9.2.10.v20150310"
-    implementation 
"com.github.ben-manes.caffeine:caffeine:${caffeine_cache_version}"
     implementation library.java.avro
     implementation library.java.jackson_annotations
     implementation library.java.jackson_core
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index cc5b890bc60..4c3ffd08a0b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -457,13 +457,12 @@ public class StreamingDataflowWorker {
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     ConcurrentMap<String, ComputationState> computationMap = new 
ConcurrentHashMap<>();
     long clientId = clientIdGenerator.nextLong();
-
-    Consumer<List<Windmill.ComputationHeartbeatResponse>> 
workHeartbeatResponseProcessor =
-        new WorkHeartbeatResponseProcessor(
-            computationId -> 
Optional.ofNullable(computationMap.get(computationId)));
-
     return new StreamingDataflowWorker(
-        createWindmillServerStub(options, clientId, 
workHeartbeatResponseProcessor),
+        createWindmillServerStub(
+            options,
+            clientId,
+            new WorkHeartbeatResponseProcessor(
+                computationId -> 
Optional.ofNullable(computationMap.get(computationId)))),
         clientId,
         computationMap,
         WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java
index a20c2f02b26..e49a04a7a54 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java
@@ -33,7 +33,6 @@ public abstract class WindmillConnection {
     WindmillConnection.Builder windmillWorkerConnection = 
WindmillConnection.builder();
 
     
windmillEndpoint.workerToken().ifPresent(windmillWorkerConnection::setBackendWorkerToken);
-    
windmillEndpoint.directEndpoint().ifPresent(windmillWorkerConnection::setDirectEndpoint);
     windmillWorkerConnection.setStub(endpointToStubFn.apply(windmillEndpoint));
 
     return windmillWorkerConnection.build();
@@ -45,16 +44,12 @@ public abstract class WindmillConnection {
 
   public abstract Optional<String> backendWorkerToken();
 
-  public abstract Optional<WindmillServiceAddress> directEndpoint();
-
   public abstract CloudWindmillServiceV1Alpha1Stub stub();
 
   @AutoValue.Builder
   abstract static class Builder {
     abstract Builder setBackendWorkerToken(String backendWorkerToken);
 
-    public abstract Builder setDirectEndpoint(WindmillServiceAddress value);
-
     abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub);
 
     abstract WindmillConnection build();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index edc193ff99c..845d54588e7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 
 /** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
 @ThreadSafe
-public class GrpcDispatcherClient {
+class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
@@ -66,7 +66,7 @@ public class GrpcDispatcherClient {
     this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
-  public static GrpcDispatcherClient create(WindmillStubFactory 
windmillStubFactory) {
+  static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
     return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 52de3ef7bc0..b09e341f29e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
 import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel;
 import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
-import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
@@ -30,7 +29,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -54,13 +52,10 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequ
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.RemoteWindmillStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
@@ -156,26 +151,16 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
       GrpcWindmillStreamFactory grpcWindmillStreamFactory,
       Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses)
       throws IOException {
-    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
-        serviceAddress ->
-            remoteChannel(
-                serviceAddress, 
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec());
-    ChannelCache channelCache =
-        ChannelCache.create(
-            serviceAddress ->
-                // IsolationChannel will create and manage separate RPC 
channels to the same
-                // serviceAddress via calling the channelFactory, else just 
directly return the
-                // RPC channel.
-                workerOptions.getUseWindmillIsolatedChannels()
-                    ? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
-                    : channelFactory.apply(serviceAddress));
-    WindmillStubFactory stubFactory =
-        
ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), 
channelCache);
+
     GrpcWindmillServer grpcWindmillServer =
         new GrpcWindmillServer(
             workerOptions,
             grpcWindmillStreamFactory,
-            GrpcDispatcherClient.create(stubFactory),
+            GrpcDispatcherClient.create(
+                new RemoteWindmillStubFactory(
+                    
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+                    workerOptions.getGcpCredential(),
+                    workerOptions.getUseWindmillIsolatedChannels())),
             processHeartbeatResponses);
     if (workerOptions.getWindmillServiceEndpoint() != null) {
       grpcWindmillServer.configureWindmillServiceEndpoints();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
index d7573a55c16..0c690cf9775 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
@@ -43,7 +43,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoi
 import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemProcessor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
@@ -78,7 +78,7 @@ public final class StreamingEngineClient {
   private final JobHeader jobHeader;
   private final GrpcWindmillStreamFactory streamFactory;
   private final WorkItemProcessor workItemProcessor;
-  private final ChannelCachingStubFactory channelCachingStubFactory;
+  private final WindmillStubFactory stubFactory;
   private final GrpcDispatcherClient dispatcherClient;
   private final AtomicBoolean isBudgetRefreshPaused;
   private final GetWorkBudgetRefresher getWorkBudgetRefresher;
@@ -89,7 +89,6 @@ public final class StreamingEngineClient {
   private final long clientId;
   private final Supplier<GetWorkerMetadataStream> getWorkerMetadataStream;
   private final Queue<WindmillEndpoints> newWindmillEndpoints;
-
   /** Writes are guarded by synchronization, reads are lock free. */
   private final AtomicReference<StreamingEngineConnectionState> connections;
 
@@ -100,7 +99,7 @@ public final class StreamingEngineClient {
       AtomicReference<StreamingEngineConnectionState> connections,
       GrpcWindmillStreamFactory streamFactory,
       WorkItemProcessor workItemProcessor,
-      ChannelCachingStubFactory channelCachingStubFactory,
+      WindmillStubFactory stubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
       long clientId) {
@@ -109,7 +108,7 @@ public final class StreamingEngineClient {
     this.streamFactory = streamFactory;
     this.workItemProcessor = workItemProcessor;
     this.connections = connections;
-    this.channelCachingStubFactory = channelCachingStubFactory;
+    this.stubFactory = stubFactory;
     this.dispatcherClient = dispatcherClient;
     this.isBudgetRefreshPaused = new AtomicBoolean(false);
     this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
@@ -165,7 +164,7 @@ public final class StreamingEngineClient {
       GetWorkBudget totalGetWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemProcessor processWorkItem,
-      ChannelCachingStubFactory channelCachingStubFactory,
+      WindmillStubFactory windmillGrpcStubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient) {
     StreamingEngineClient streamingEngineClient =
@@ -175,7 +174,7 @@ public final class StreamingEngineClient {
             new AtomicReference<>(StreamingEngineConnectionState.EMPTY),
             streamingEngineStreamFactory,
             processWorkItem,
-            channelCachingStubFactory,
+            windmillGrpcStubFactory,
             getWorkBudgetDistributor,
             dispatcherClient,
             new Random().nextLong());
@@ -190,7 +189,7 @@ public final class StreamingEngineClient {
       AtomicReference<StreamingEngineConnectionState> connections,
       GrpcWindmillStreamFactory streamFactory,
       WorkItemProcessor processWorkItem,
-      ChannelCachingStubFactory stubFactory,
+      WindmillStubFactory stubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
       long clientId) {
@@ -235,7 +234,6 @@ public final class StreamingEngineClient {
     getWorkBudgetRefresher.stop();
     newWorkerMetadataPublisher.shutdownNow();
     newWorkerMetadataConsumer.shutdownNow();
-    channelCachingStubFactory.shutdown();
   }
 
   /**
@@ -312,11 +310,8 @@ public final class StreamingEngineClient {
     currentStreams.entrySet().stream()
         .filter(
             connectionAndStream -> 
!newWindmillConnections.contains(connectionAndStream.getKey()))
-        .forEach(
-            entry -> {
-              entry.getValue().closeAllStreams();
-              
entry.getKey().directEndpoint().ifPresent(channelCachingStubFactory::remove);
-            });
+        .map(Entry::getValue)
+        .forEach(WindmillStreamSender::closeAllStreams);
 
     return newWindmillConnections.stream()
         .collect(
@@ -379,7 +374,7 @@ public final class StreamingEngineClient {
   private CloudWindmillServiceV1Alpha1Stub createWindmillStub(Endpoint 
endpoint) {
     return endpoint
         .directEndpoint()
-        .map(channelCachingStubFactory::createWindmillServiceStub)
+        .map(stubFactory::createWindmillServiceStub)
         .orElseGet(dispatcherClient::getWindmillServiceStub);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
deleted file mode 100644
index f95391cc1df..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.github.benmanes.caffeine.cache.RemovalListener;
-import java.io.PrintWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows <a
- * href=https://grpc.io/docs/guides/performance/#java>gRPC recommendations</a> 
for re-using channels
- * when possible.
- *
- * @implNote Backed by {@link LoadingCache} which is thread-safe.
- */
-@ThreadSafe
-public final class ChannelCache implements StatusDataProvider {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
-  private final LoadingCache<WindmillServiceAddress, ManagedChannel> 
channelCache;
-
-  private ChannelCache(
-      Function<WindmillServiceAddress, ManagedChannel> channelFactory,
-      RemovalListener<WindmillServiceAddress, ManagedChannel> 
onChannelRemoved) {
-    this.channelCache =
-        
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);
-  }
-
-  public static ChannelCache create(
-      Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
-    return new ChannelCache(
-        channelFactory,
-        // Shutdown the channels as they get removed from the cache, so they 
do not leak.
-        (address, channel, cause) -> shutdownChannel(channel));
-  }
-
-  @VisibleForTesting
-  static ChannelCache forTesting(
-      Function<WindmillServiceAddress, ManagedChannel> channelFactory, 
Runnable onChannelShutdown) {
-    return new ChannelCache(
-        channelFactory,
-        // Shutdown the channels as they get removed from the cache, so they 
do not leak.
-        // Add hook for testing so that we don't have to sleep/wait for 
arbitrary time in test.
-        (address, channel, cause) -> {
-          shutdownChannel(channel);
-          onChannelShutdown.run();
-        });
-  }
-
-  private static void shutdownChannel(ManagedChannel channel) {
-    channel.shutdown();
-    try {
-      channel.awaitTermination(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOG.error("Couldn't close gRPC channel={}", channel, e);
-    }
-    channel.shutdownNow();
-  }
-
-  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
-    return channelCache.get(windmillServiceAddress);
-  }
-
-  public void remove(WindmillServiceAddress windmillServiceAddress) {
-    channelCache.invalidate(windmillServiceAddress);
-  }
-
-  public void clear() {
-    channelCache.invalidateAll();
-  }
-
-  /**
-   * Checks to see if the cache is empty. May block the calling thread to 
perform any pending
-   * removal/insert operations first before checking the size. Should be only 
used for testing.
-   */
-  @VisibleForTesting
-  boolean isEmpty() {
-    channelCache.cleanUp();
-    return channelCache.estimatedSize() == 0;
-  }
-
-  @Override
-  public void appendSummaryHtml(PrintWriter writer) {
-    writer.write("Active gRPC Channels:<br>");
-    channelCache
-        .asMap()
-        .forEach(
-            (address, channel) -> {
-              writer.format("Address: [%s]; Channel: [%s].", address, channel);
-              writer.write("<br>");
-            });
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java
deleted file mode 100644
index 9fd4ad00730..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
-
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel;
-
-public interface ChannelCachingStubFactory extends WindmillStubFactory {
-
-  /**
-   * Remove and close the gRPC channel used to communicate with the given 
{@link
-   * WindmillServiceAddress}.
-   *
-   * <p>Subsequent calls to {@link
-   * WindmillStubFactory#createWindmillServiceStub(WindmillServiceAddress)} 
will get a stub backed
-   * by a new {@link Channel} instance to the {@link WindmillServiceAddress}. 
Users of stubs backed
-   * by the previously vended {@link Channel} will start to receive errors.
-   */
-  void remove(WindmillServiceAddress windmillServiceAddress);
-
-  /** Shuts down all channels and stubs. */
-  void shutdown();
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java
index 927c42c3e4c..7134e8b478b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * that each active rpc has its own channel.
  */
 @Internal
-public class IsolationChannel extends ManagedChannel {
+class IsolationChannel extends ManagedChannel {
   private static final Logger LOG = 
LoggerFactory.getLogger(IsolationChannel.class);
 
   private final Supplier<ManagedChannel> channelFactory;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java
similarity index 67%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java
index 65dbe75670d..9978b74c7aa 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
 
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
 import com.google.auth.Credentials;
+import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -26,30 +29,29 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.auth.MoreCallCredentials;
 
 /** Creates remote stubs to talk to Streaming Engine. */
 @Internal
 @ThreadSafe
-public final class ChannelCachingRemoteStubFactory implements 
ChannelCachingStubFactory {
+public final class RemoteWindmillStubFactory implements WindmillStubFactory {
+  private final int rpcChannelTimeoutSec;
   private final Credentials gcpCredentials;
-  private final ChannelCache channelCache;
+  private final boolean useIsolatedChannels;
 
-  private ChannelCachingRemoteStubFactory(Credentials gcpCredentials, 
ChannelCache channelCache) {
+  public RemoteWindmillStubFactory(
+      int rpcChannelTimeoutSec, Credentials gcpCredentials, boolean 
useIsolatedChannels) {
+    this.rpcChannelTimeoutSec = rpcChannelTimeoutSec;
     this.gcpCredentials = gcpCredentials;
-    this.channelCache = channelCache;
-  }
-
-  public static ChannelCachingRemoteStubFactory create(
-      Credentials gcpCredentials, ChannelCache channelCache) {
-    return new ChannelCachingRemoteStubFactory(gcpCredentials, channelCache);
+    this.useIsolatedChannels = useIsolatedChannels;
   }
 
   @Override
   public CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub(
       WindmillServiceAddress serviceAddress) {
     CloudWindmillServiceV1Alpha1Stub windmillServiceStub =
-        
CloudWindmillServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress));
+        
CloudWindmillServiceV1Alpha1Grpc.newStub(createChannel(serviceAddress));
     return serviceAddress.getKind() != 
WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS
         ? windmillServiceStub.withCallCredentials(
             MoreCallCredentials.from(new 
VendoredCredentialsAdapter(gcpCredentials)))
@@ -59,18 +61,16 @@ public final class ChannelCachingRemoteStubFactory 
implements ChannelCachingStub
   @Override
   public CloudWindmillMetadataServiceV1Alpha1Stub 
createWindmillMetadataServiceStub(
       WindmillServiceAddress serviceAddress) {
-    return 
CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress))
+    return 
CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(createChannel(serviceAddress))
         .withCallCredentials(
             MoreCallCredentials.from(new 
VendoredCredentialsAdapter(gcpCredentials)));
   }
 
-  @Override
-  public void remove(WindmillServiceAddress windmillServiceAddress) {
-    channelCache.remove(windmillServiceAddress);
-  }
-
-  @Override
-  public void shutdown() {
-    channelCache.clear();
+  private ManagedChannel createChannel(WindmillServiceAddress serviceAddress) {
+    Supplier<ManagedChannel> channelFactory =
+        () -> remoteChannel(serviceAddress, rpcChannelTimeoutSec);
+    // IsolationChannel will create and manage separate RPC channels to the 
same serviceAddress via
+    // calling the channelFactory, else just directly return the RPC channel.
+    return useIsolatedChannels ? IsolationChannel.create(channelFactory) : 
channelFactory.get();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
index 9aec29a3ba4..d8e4c064e97 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
@@ -52,7 +52,7 @@ public final class WindmillChannelFactory {
         .build();
   }
 
-  public static ManagedChannel remoteChannel(
+  static ManagedChannel remoteChannel(
       WindmillServiceAddress windmillServiceAddress, int 
windmillServiceRpcChannelTimeoutSec) {
     switch (windmillServiceAddress.getKind()) {
       case IPV6:
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 454c616db41..37dc7eff917 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -82,7 +82,6 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientCall;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientInterceptor;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientInterceptors;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Deadline;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.MethodDescriptor;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
@@ -212,7 +211,7 @@ public class GrpcWindmillServerTest {
 
     this.client =
         GrpcWindmillServer.newApplianceTestInstance(
-            inprocessChannel, new FakeWindmillStubFactory(() -> 
(ManagedChannel) inprocessChannel));
+            inprocessChannel, new FakeWindmillStubFactory(() -> 
inprocessChannel));
 
     Windmill.GetWorkResponse response1 = 
client.getWork(GetWorkRequest.getDefaultInstance());
     Windmill.GetWorkResponse response2 = 
client.getWork(GetWorkRequest.getDefaultInstance());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
index 9cdbe01b524..f9011a90c06 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
@@ -47,12 +47,13 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataR
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemProcessor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
 import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
 import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress;
@@ -95,24 +96,30 @@ public class StreamingEngineClientTest {
           .build();
 
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+
+  private final Set<ManagedChannel> channels = new HashSet<>();
   private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
   private final GrpcWindmillStreamFactory streamFactory =
       spy(GrpcWindmillStreamFactory.of(JOB_HEADER).build());
-  private final ChannelCachingStubFactory stubFactory =
+  private final WindmillStubFactory stubFactory =
       new FakeWindmillStubFactory(
-          () ->
-              grpcCleanup.register(
-                  
WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest")));
+          () -> {
+            ManagedChannel channel =
+                grpcCleanup.register(
+                    
WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest"));
+            channels.add(channel);
+            return channel;
+          });
   private final GrpcDispatcherClient dispatcherClient =
       GrpcDispatcherClient.forTesting(
           stubFactory, new ArrayList<>(), new ArrayList<>(), new HashSet<>());
   private final AtomicReference<StreamingEngineConnectionState> connections =
       new AtomicReference<>(StreamingEngineConnectionState.EMPTY);
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
-
   private Server fakeStreamingEngineServer;
   private CountDownLatch getWorkerMetadataReady;
   private GetWorkerMetadataTestStub fakeGetWorkerMetadataStub;
+
   private StreamingEngineClient streamingEngineClient;
 
   private static WorkItemProcessor noOpProcessWorkItemFn() {
@@ -136,15 +143,13 @@ public class StreamingEngineClientTest {
   }
 
   private static WorkerMetadataResponse.Endpoint 
metadataResponseEndpoint(String workerToken) {
-    return WorkerMetadataResponse.Endpoint.newBuilder()
-        
.setDirectEndpoint(DEFAULT_WINDMILL_SERVICE_ADDRESS.gcpServiceAddress().getHost())
-        .setBackendWorkerToken(workerToken)
-        .build();
+    return 
WorkerMetadataResponse.Endpoint.newBuilder().setBackendWorkerToken(workerToken).build();
   }
 
   @Before
   public void setUp() throws IOException {
-    stubFactory.shutdown();
+    channels.forEach(ManagedChannel::shutdownNow);
+    channels.clear();
     fakeStreamingEngineServer =
         grpcCleanup.register(
             InProcessServerBuilder.forName("StreamingEngineClientTest")
@@ -167,7 +172,7 @@ public class StreamingEngineClientTest {
     Preconditions.checkNotNull(streamingEngineClient).finish();
     fakeGetWorkerMetadataStub.close();
     fakeStreamingEngineServer.shutdownNow();
-    stubFactory.shutdown();
+    channels.forEach(ManagedChannel::shutdownNow);
   }
 
   private StreamingEngineClient newStreamingEngineClient(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
index 0bb80191cc3..2532fca5154 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
@@ -49,9 +49,11 @@ import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class WindmillStreamSenderTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final GetWorkRequest GET_WORK_REQUEST =
       
GetWorkRequest.newBuilder().setClientId(1L).setJobId("job").setProjectId("project").build();
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
   private final GrpcWindmillStreamFactory streamFactory =
       spy(
           GrpcWindmillStreamFactory.of(
@@ -68,7 +70,6 @@ public class WindmillStreamSenderTest {
           workItem,
           ackQueuedWorkItem,
           getWorkStreamLatencies) -> {};
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private ManagedChannel inProcessChannel;
   private CloudWindmillServiceV1Alpha1Stub stub;
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
deleted file mode 100644
index 962ffa6e37d..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class ChannelCacheTest {
-
-  private ChannelCache cache;
-
-  private static ChannelCache newCache(
-      Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
-    return ChannelCache.forTesting(channelFactory, () -> {});
-  }
-
-  @After
-  public void cleanUp() {
-    if (cache != null) {
-      cache.clear();
-    }
-  }
-
-  private ManagedChannel newChannel(String channelName) {
-    return WindmillChannelFactory.inProcessChannel(channelName);
-  }
-
-  @Test
-  public void testLoadingCacheReturnsExistingChannel() {
-    String channelName = "existingChannel";
-    ManagedChannel channel = newChannel(channelName);
-    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
-        spy(
-            new Function<WindmillServiceAddress, ManagedChannel>() {
-              @Override
-              public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
-                return channel;
-              }
-            });
-
-    cache = newCache(channelFactory);
-    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
-    // Initial call to load the cache.
-    assertThat(cache.get(someAddress)).isEqualTo(channel);
-
-    ManagedChannel cachedChannel = cache.get(someAddress);
-    assertSame(channel, cachedChannel);
-    verify(channelFactory, times(1)).apply(eq(someAddress));
-  }
-
-  @Test
-  public void testLoadingCacheReturnsLoadsChannelWhenNotPresent() {
-    String channelName = "existingChannel";
-    ManagedChannel channel = newChannel(channelName);
-    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
-        spy(
-            new Function<WindmillServiceAddress, ManagedChannel>() {
-              @Override
-              public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
-                return channel;
-              }
-            });
-
-    cache = newCache(channelFactory);
-    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
-    ManagedChannel cachedChannel = cache.get(someAddress);
-    assertSame(channel, cachedChannel);
-    verify(channelFactory, times(1)).apply(eq(someAddress));
-  }
-
-  @Test
-  public void testRemoveAndClose() throws InterruptedException {
-    String channelName = "existingChannel";
-    CountDownLatch verifyRemovalListenerAsync = new CountDownLatch(1);
-    CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
-    cache =
-        ChannelCache.forTesting(
-            ignored -> newChannel(channelName),
-            () -> {
-              try {
-                verifyRemovalListenerAsync.await();
-                notifyWhenChannelClosed.countDown();
-              } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-            });
-
-    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
-    ManagedChannel cachedChannel = cache.get(someAddress);
-    cache.remove(someAddress);
-    // Assert that the removal happened before we check to see if the 
shutdowns happen to confirm
-    // that removals are async.
-    assertTrue(cache.isEmpty());
-    verifyRemovalListenerAsync.countDown();
-
-    // Assert that the channel gets shutdown.
-    notifyWhenChannelClosed.await();
-    assertTrue(cachedChannel.isShutdown());
-
-    // Get should return a new channel, since we removed the last one.
-    ManagedChannel newChannel = cache.get(someAddress);
-    assertThat(newChannel).isNotSameInstanceAs(cachedChannel);
-  }
-
-  @Test
-  public void testClear() throws InterruptedException {
-    String channelName = "existingChannel";
-    CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
-    cache =
-        ChannelCache.forTesting(
-            ignored -> newChannel(channelName), 
notifyWhenChannelClosed::countDown);
-    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
-    ManagedChannel cachedChannel = cache.get(someAddress);
-    cache.clear();
-    notifyWhenChannelClosed.await();
-    assertTrue(cache.isEmpty());
-    assertTrue(cachedChannel.isShutdown());
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java
index af3a3e8295b..3dd40e5d5c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java
@@ -21,38 +21,27 @@ import java.util.function.Supplier;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 
 @VisibleForTesting
-public final class FakeWindmillStubFactory implements 
ChannelCachingStubFactory {
-  private final ChannelCache channelCache;
+public final class FakeWindmillStubFactory implements WindmillStubFactory {
+  private final Supplier<Channel> channelFactory;
 
-  public FakeWindmillStubFactory(Supplier<ManagedChannel> channelFactory) {
-    this.channelCache = ChannelCache.create(ignored -> channelFactory.get());
+  public FakeWindmillStubFactory(Supplier<Channel> channelFactory) {
+    this.channelFactory = channelFactory;
   }
 
   @Override
   public CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub
       createWindmillServiceStub(WindmillServiceAddress serviceAddress) {
-    return 
CloudWindmillServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress));
+    return CloudWindmillServiceV1Alpha1Grpc.newStub(channelFactory.get());
   }
 
   @Override
   public 
CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub
       createWindmillMetadataServiceStub(WindmillServiceAddress serviceAddress) 
{
-    return 
CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress));
-  }
-
-  @Override
-  public void remove(WindmillServiceAddress windmillServiceAddress) {
-    channelCache.remove(windmillServiceAddress);
-  }
-
-  @Override
-  public void shutdown() {
-    channelCache.clear();
+    return 
CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelFactory.get());
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
index 8e5ed66ebb4..249642aa6d1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
@@ -46,8 +46,9 @@ import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class EvenGetWorkBudgetDistributorTest {
-  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
   private ManagedChannel inProcessChannel;
   private CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub 
stub;
 
@@ -259,8 +260,8 @@ public class EvenGetWorkBudgetDistributorTest {
         (computation,
             inputDataWatermark,
             synchronizedProcessingTime,
-            wrappedWorkItem,
-            ackWorkItemQueued,
+            workItem,
+            ackQueuedWorkItem,
             getWorkStreamLatencies) -> {});
   }
 }

Reply via email to