This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch revert-30425-mt-shutdown-channels in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8681ab9901d38af3c826eb75bbf0816cc55b0627 Author: Yi Hu <huu...@gmail.com> AuthorDate: Wed Mar 20 14:27:29 2024 -0400 Revert "Cache and close windmill grpc channels (#30425)" This reverts commit c1c255a0a433d7cdf1b5f0bc61986d395c7703ad. --- .../google-cloud-dataflow-java/worker/build.gradle | 12 -- .../dataflow/worker/StreamingDataflowWorker.java | 11 +- .../worker/windmill/WindmillConnection.java | 5 - .../windmill/client/grpc/GrpcDispatcherClient.java | 4 +- .../windmill/client/grpc/GrpcWindmillServer.java | 29 +--- .../client/grpc/StreamingEngineClient.java | 25 ++-- .../windmill/client/grpc/stubs/ChannelCache.java | 117 ---------------- .../grpc/stubs/ChannelCachingStubFactory.java | 38 ------ .../client/grpc/stubs/IsolationChannel.java | 2 +- ...Factory.java => RemoteWindmillStubFactory.java} | 38 +++--- .../client/grpc/stubs/WindmillChannelFactory.java | 2 +- .../client/grpc/GrpcWindmillServerTest.java | 3 +- .../client/grpc/StreamingEngineClientTest.java | 31 +++-- .../client/grpc/WindmillStreamSenderTest.java | 3 +- .../client/grpc/stubs/ChannelCacheTest.java | 150 --------------------- .../windmill/testing/FakeWindmillStubFactory.java | 27 ++-- .../budget/EvenGetWorkBudgetDistributorTest.java | 7 +- 17 files changed, 78 insertions(+), 426 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index 4ddb3e2755e..124c11026df 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -71,10 +71,6 @@ def excluded_dependencies = [ library.java.truth // Test only ] -// For Java8+ and less than Java11, use versions 2.x.x. -// Java11+ can use versions 3.x.x per https://github.com/ben-manes/caffeine. -def caffeine_cache_version = "2.9.3" - applyJavaNature( automaticModuleName: 'org.apache.beam.runners.dataflow.worker', archivesBaseName: 'beam-runners-google-cloud-dataflow-java-legacy-worker', @@ -141,13 +137,6 @@ applyJavaNature( relocate("org.eclipse.jetty", getWorkerRelocatedPath("org.eclipse.jetty")) relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet")) - // Use Caffeine cache instead of Guava cache. - // Context: https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder - dependencies { - include(dependency("com.github.ben-manes.caffeine:caffeine:${caffeine_cache_version}}")) - } - relocate("com.github.ben-manes.caffeine", getWorkerRelocatedPath("com.github.ben-manes.caffeine")) - // We don't relocate windmill since it is already underneath the org.apache.beam.runners.dataflow.worker namespace and never // expect a user pipeline to include it. There is also a JNI component that windmill server relies on which makes // arbitrary relocation more difficult. @@ -213,7 +202,6 @@ dependencies { implementation "javax.servlet:javax.servlet-api:3.1.0" implementation "org.eclipse.jetty:jetty-server:9.2.10.v20150310" implementation "org.eclipse.jetty:jetty-servlet:9.2.10.v20150310" - implementation "com.github.ben-manes.caffeine:caffeine:${caffeine_cache_version}" implementation library.java.avro implementation library.java.jackson_annotations implementation library.java.jackson_core diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index cc5b890bc60..4c3ffd08a0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -457,13 +457,12 @@ public class StreamingDataflowWorker { public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { ConcurrentMap<String, ComputationState> computationMap = new ConcurrentHashMap<>(); long clientId = clientIdGenerator.nextLong(); - - Consumer<List<Windmill.ComputationHeartbeatResponse>> workHeartbeatResponseProcessor = - new WorkHeartbeatResponseProcessor( - computationId -> Optional.ofNullable(computationMap.get(computationId))); - return new StreamingDataflowWorker( - createWindmillServerStub(options, clientId, workHeartbeatResponseProcessor), + createWindmillServerStub( + options, + clientId, + new WorkHeartbeatResponseProcessor( + computationId -> Optional.ofNullable(computationMap.get(computationId)))), clientId, computationMap, WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java index a20c2f02b26..e49a04a7a54 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java @@ -33,7 +33,6 @@ public abstract class WindmillConnection { WindmillConnection.Builder windmillWorkerConnection = WindmillConnection.builder(); windmillEndpoint.workerToken().ifPresent(windmillWorkerConnection::setBackendWorkerToken); - windmillEndpoint.directEndpoint().ifPresent(windmillWorkerConnection::setDirectEndpoint); windmillWorkerConnection.setStub(endpointToStubFn.apply(windmillEndpoint)); return windmillWorkerConnection.build(); @@ -45,16 +44,12 @@ public abstract class WindmillConnection { public abstract Optional<String> backendWorkerToken(); - public abstract Optional<WindmillServiceAddress> directEndpoint(); - public abstract CloudWindmillServiceV1Alpha1Stub stub(); @AutoValue.Builder abstract static class Builder { abstract Builder setBackendWorkerToken(String backendWorkerToken); - public abstract Builder setDirectEndpoint(WindmillServiceAddress value); - abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub); abstract WindmillConnection build(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index edc193ff99c..845d54588e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; /** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */ @ThreadSafe -public class GrpcDispatcherClient { +class GrpcDispatcherClient { private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class); private final WindmillStubFactory windmillStubFactory; @@ -66,7 +66,7 @@ public class GrpcDispatcherClient { this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs); } - public static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { + static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { return new GrpcDispatcherClient(windmillStubFactory, DispatcherStubs.empty(), new Random()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 52de3ef7bc0..b09e341f29e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST; import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel; import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel; -import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -30,7 +29,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -54,13 +52,10 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse; import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.RemoteWindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; @@ -156,26 +151,16 @@ public final class GrpcWindmillServer extends WindmillServerStub { GrpcWindmillStreamFactory grpcWindmillStreamFactory, Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) throws IOException { - Function<WindmillServiceAddress, ManagedChannel> channelFactory = - serviceAddress -> - remoteChannel( - serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()); - ChannelCache channelCache = - ChannelCache.create( - serviceAddress -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress via calling the channelFactory, else just directly return the - // RPC channel. - workerOptions.getUseWindmillIsolatedChannels() - ? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) - : channelFactory.apply(serviceAddress)); - WindmillStubFactory stubFactory = - ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache); + GrpcWindmillServer grpcWindmillServer = new GrpcWindmillServer( workerOptions, grpcWindmillStreamFactory, - GrpcDispatcherClient.create(stubFactory), + GrpcDispatcherClient.create( + new RemoteWindmillStubFactory( + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + workerOptions.getGcpCredential(), + workerOptions.getUseWindmillIsolatedChannels())), processHeartbeatResponses); if (workerOptions.getWindmillServiceEndpoint() != null) { grpcWindmillServer.configureWindmillServiceEndpoints(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java index d7573a55c16..0c690cf9775 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java @@ -43,7 +43,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoi import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemProcessor; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; @@ -78,7 +78,7 @@ public final class StreamingEngineClient { private final JobHeader jobHeader; private final GrpcWindmillStreamFactory streamFactory; private final WorkItemProcessor workItemProcessor; - private final ChannelCachingStubFactory channelCachingStubFactory; + private final WindmillStubFactory stubFactory; private final GrpcDispatcherClient dispatcherClient; private final AtomicBoolean isBudgetRefreshPaused; private final GetWorkBudgetRefresher getWorkBudgetRefresher; @@ -89,7 +89,6 @@ public final class StreamingEngineClient { private final long clientId; private final Supplier<GetWorkerMetadataStream> getWorkerMetadataStream; private final Queue<WindmillEndpoints> newWindmillEndpoints; - /** Writes are guarded by synchronization, reads are lock free. */ private final AtomicReference<StreamingEngineConnectionState> connections; @@ -100,7 +99,7 @@ public final class StreamingEngineClient { AtomicReference<StreamingEngineConnectionState> connections, GrpcWindmillStreamFactory streamFactory, WorkItemProcessor workItemProcessor, - ChannelCachingStubFactory channelCachingStubFactory, + WindmillStubFactory stubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, long clientId) { @@ -109,7 +108,7 @@ public final class StreamingEngineClient { this.streamFactory = streamFactory; this.workItemProcessor = workItemProcessor; this.connections = connections; - this.channelCachingStubFactory = channelCachingStubFactory; + this.stubFactory = stubFactory; this.dispatcherClient = dispatcherClient; this.isBudgetRefreshPaused = new AtomicBoolean(false); this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); @@ -165,7 +164,7 @@ public final class StreamingEngineClient { GetWorkBudget totalGetWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemProcessor processWorkItem, - ChannelCachingStubFactory channelCachingStubFactory, + WindmillStubFactory windmillGrpcStubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient) { StreamingEngineClient streamingEngineClient = @@ -175,7 +174,7 @@ public final class StreamingEngineClient { new AtomicReference<>(StreamingEngineConnectionState.EMPTY), streamingEngineStreamFactory, processWorkItem, - channelCachingStubFactory, + windmillGrpcStubFactory, getWorkBudgetDistributor, dispatcherClient, new Random().nextLong()); @@ -190,7 +189,7 @@ public final class StreamingEngineClient { AtomicReference<StreamingEngineConnectionState> connections, GrpcWindmillStreamFactory streamFactory, WorkItemProcessor processWorkItem, - ChannelCachingStubFactory stubFactory, + WindmillStubFactory stubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, long clientId) { @@ -235,7 +234,6 @@ public final class StreamingEngineClient { getWorkBudgetRefresher.stop(); newWorkerMetadataPublisher.shutdownNow(); newWorkerMetadataConsumer.shutdownNow(); - channelCachingStubFactory.shutdown(); } /** @@ -312,11 +310,8 @@ public final class StreamingEngineClient { currentStreams.entrySet().stream() .filter( connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey())) - .forEach( - entry -> { - entry.getValue().closeAllStreams(); - entry.getKey().directEndpoint().ifPresent(channelCachingStubFactory::remove); - }); + .map(Entry::getValue) + .forEach(WindmillStreamSender::closeAllStreams); return newWindmillConnections.stream() .collect( @@ -379,7 +374,7 @@ public final class StreamingEngineClient { private CloudWindmillServiceV1Alpha1Stub createWindmillStub(Endpoint endpoint) { return endpoint .directEndpoint() - .map(channelCachingStubFactory::createWindmillServiceStub) + .map(stubFactory::createWindmillServiceStub) .orElseGet(dispatcherClient::getWindmillServiceStub); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java deleted file mode 100644 index f95391cc1df..00000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; - -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.RemovalListener; -import java.io.PrintWriter; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows <a - * href=https://grpc.io/docs/guides/performance/#java>gRPC recommendations</a> for re-using channels - * when possible. - * - * @implNote Backed by {@link LoadingCache} which is thread-safe. - */ -@ThreadSafe -public final class ChannelCache implements StatusDataProvider { - private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); - private final LoadingCache<WindmillServiceAddress, ManagedChannel> channelCache; - - private ChannelCache( - Function<WindmillServiceAddress, ManagedChannel> channelFactory, - RemovalListener<WindmillServiceAddress, ManagedChannel> onChannelRemoved) { - this.channelCache = - Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply); - } - - public static ChannelCache create( - Function<WindmillServiceAddress, ManagedChannel> channelFactory) { - return new ChannelCache( - channelFactory, - // Shutdown the channels as they get removed from the cache, so they do not leak. - (address, channel, cause) -> shutdownChannel(channel)); - } - - @VisibleForTesting - static ChannelCache forTesting( - Function<WindmillServiceAddress, ManagedChannel> channelFactory, Runnable onChannelShutdown) { - return new ChannelCache( - channelFactory, - // Shutdown the channels as they get removed from the cache, so they do not leak. - // Add hook for testing so that we don't have to sleep/wait for arbitrary time in test. - (address, channel, cause) -> { - shutdownChannel(channel); - onChannelShutdown.run(); - }); - } - - private static void shutdownChannel(ManagedChannel channel) { - channel.shutdown(); - try { - channel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.error("Couldn't close gRPC channel={}", channel, e); - } - channel.shutdownNow(); - } - - public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { - return channelCache.get(windmillServiceAddress); - } - - public void remove(WindmillServiceAddress windmillServiceAddress) { - channelCache.invalidate(windmillServiceAddress); - } - - public void clear() { - channelCache.invalidateAll(); - } - - /** - * Checks to see if the cache is empty. May block the calling thread to perform any pending - * removal/insert operations first before checking the size. Should be only used for testing. - */ - @VisibleForTesting - boolean isEmpty() { - channelCache.cleanUp(); - return channelCache.estimatedSize() == 0; - } - - @Override - public void appendSummaryHtml(PrintWriter writer) { - writer.write("Active gRPC Channels:<br>"); - channelCache - .asMap() - .forEach( - (address, channel) -> { - writer.format("Address: [%s]; Channel: [%s].", address, channel); - writer.write("<br>"); - }); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java deleted file mode 100644 index 9fd4ad00730..00000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; - -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel; - -public interface ChannelCachingStubFactory extends WindmillStubFactory { - - /** - * Remove and close the gRPC channel used to communicate with the given {@link - * WindmillServiceAddress}. - * - * <p>Subsequent calls to {@link - * WindmillStubFactory#createWindmillServiceStub(WindmillServiceAddress)} will get a stub backed - * by a new {@link Channel} instance to the {@link WindmillServiceAddress}. Users of stubs backed - * by the previously vended {@link Channel} will start to receive errors. - */ - void remove(WindmillServiceAddress windmillServiceAddress); - - /** Shuts down all channels and stubs. */ - void shutdown(); -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java index 927c42c3e4c..7134e8b478b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; * that each active rpc has its own channel. */ @Internal -public class IsolationChannel extends ManagedChannel { +class IsolationChannel extends ManagedChannel { private static final Logger LOG = LoggerFactory.getLogger(IsolationChannel.class); private final Supplier<ManagedChannel> channelFactory; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java similarity index 67% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java index 65dbe75670d..9978b74c7aa 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingRemoteStubFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/RemoteWindmillStubFactory.java @@ -17,7 +17,10 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; + import com.google.auth.Credentials; +import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub; @@ -26,30 +29,29 @@ import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.auth.MoreCallCredentials; /** Creates remote stubs to talk to Streaming Engine. */ @Internal @ThreadSafe -public final class ChannelCachingRemoteStubFactory implements ChannelCachingStubFactory { +public final class RemoteWindmillStubFactory implements WindmillStubFactory { + private final int rpcChannelTimeoutSec; private final Credentials gcpCredentials; - private final ChannelCache channelCache; + private final boolean useIsolatedChannels; - private ChannelCachingRemoteStubFactory(Credentials gcpCredentials, ChannelCache channelCache) { + public RemoteWindmillStubFactory( + int rpcChannelTimeoutSec, Credentials gcpCredentials, boolean useIsolatedChannels) { + this.rpcChannelTimeoutSec = rpcChannelTimeoutSec; this.gcpCredentials = gcpCredentials; - this.channelCache = channelCache; - } - - public static ChannelCachingRemoteStubFactory create( - Credentials gcpCredentials, ChannelCache channelCache) { - return new ChannelCachingRemoteStubFactory(gcpCredentials, channelCache); + this.useIsolatedChannels = useIsolatedChannels; } @Override public CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub( WindmillServiceAddress serviceAddress) { CloudWindmillServiceV1Alpha1Stub windmillServiceStub = - CloudWindmillServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress)); + CloudWindmillServiceV1Alpha1Grpc.newStub(createChannel(serviceAddress)); return serviceAddress.getKind() != WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS ? windmillServiceStub.withCallCredentials( MoreCallCredentials.from(new VendoredCredentialsAdapter(gcpCredentials))) @@ -59,18 +61,16 @@ public final class ChannelCachingRemoteStubFactory implements ChannelCachingStub @Override public CloudWindmillMetadataServiceV1Alpha1Stub createWindmillMetadataServiceStub( WindmillServiceAddress serviceAddress) { - return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress)) + return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(createChannel(serviceAddress)) .withCallCredentials( MoreCallCredentials.from(new VendoredCredentialsAdapter(gcpCredentials))); } - @Override - public void remove(WindmillServiceAddress windmillServiceAddress) { - channelCache.remove(windmillServiceAddress); - } - - @Override - public void shutdown() { - channelCache.clear(); + private ManagedChannel createChannel(WindmillServiceAddress serviceAddress) { + Supplier<ManagedChannel> channelFactory = + () -> remoteChannel(serviceAddress, rpcChannelTimeoutSec); + // IsolationChannel will create and manage separate RPC channels to the same serviceAddress via + // calling the channelFactory, else just directly return the RPC channel. + return useIsolatedChannels ? IsolationChannel.create(channelFactory) : channelFactory.get(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java index 9aec29a3ba4..d8e4c064e97 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java @@ -52,7 +52,7 @@ public final class WindmillChannelFactory { .build(); } - public static ManagedChannel remoteChannel( + static ManagedChannel remoteChannel( WindmillServiceAddress windmillServiceAddress, int windmillServiceRpcChannelTimeoutSec) { switch (windmillServiceAddress.getKind()) { case IPV6: diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index 454c616db41..37dc7eff917 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -82,7 +82,6 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientCall; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientInterceptor; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientInterceptors; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Deadline; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.MethodDescriptor; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status; @@ -212,7 +211,7 @@ public class GrpcWindmillServerTest { this.client = GrpcWindmillServer.newApplianceTestInstance( - inprocessChannel, new FakeWindmillStubFactory(() -> (ManagedChannel) inprocessChannel)); + inprocessChannel, new FakeWindmillStubFactory(() -> inprocessChannel)); Windmill.GetWorkResponse response1 = client.getWork(GetWorkRequest.getDefaultInstance()); Windmill.GetWorkResponse response2 = client.getWork(GetWorkRequest.getDefaultInstance()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java index 9cdbe01b524..f9011a90c06 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java @@ -47,12 +47,13 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataR import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemProcessor; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress; @@ -95,24 +96,30 @@ public class StreamingEngineClientTest { .build(); @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + + private final Set<ManagedChannel> channels = new HashSet<>(); private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private final GrpcWindmillStreamFactory streamFactory = spy(GrpcWindmillStreamFactory.of(JOB_HEADER).build()); - private final ChannelCachingStubFactory stubFactory = + private final WindmillStubFactory stubFactory = new FakeWindmillStubFactory( - () -> - grpcCleanup.register( - WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest"))); + () -> { + ManagedChannel channel = + grpcCleanup.register( + WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest")); + channels.add(channel); + return channel; + }); private final GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.forTesting( stubFactory, new ArrayList<>(), new ArrayList<>(), new HashSet<>()); private final AtomicReference<StreamingEngineConnectionState> connections = new AtomicReference<>(StreamingEngineConnectionState.EMPTY); - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); - private Server fakeStreamingEngineServer; private CountDownLatch getWorkerMetadataReady; private GetWorkerMetadataTestStub fakeGetWorkerMetadataStub; + private StreamingEngineClient streamingEngineClient; private static WorkItemProcessor noOpProcessWorkItemFn() { @@ -136,15 +143,13 @@ public class StreamingEngineClientTest { } private static WorkerMetadataResponse.Endpoint metadataResponseEndpoint(String workerToken) { - return WorkerMetadataResponse.Endpoint.newBuilder() - .setDirectEndpoint(DEFAULT_WINDMILL_SERVICE_ADDRESS.gcpServiceAddress().getHost()) - .setBackendWorkerToken(workerToken) - .build(); + return WorkerMetadataResponse.Endpoint.newBuilder().setBackendWorkerToken(workerToken).build(); } @Before public void setUp() throws IOException { - stubFactory.shutdown(); + channels.forEach(ManagedChannel::shutdownNow); + channels.clear(); fakeStreamingEngineServer = grpcCleanup.register( InProcessServerBuilder.forName("StreamingEngineClientTest") @@ -167,7 +172,7 @@ public class StreamingEngineClientTest { Preconditions.checkNotNull(streamingEngineClient).finish(); fakeGetWorkerMetadataStub.close(); fakeStreamingEngineServer.shutdownNow(); - stubFactory.shutdown(); + channels.forEach(ManagedChannel::shutdownNow); } private StreamingEngineClient newStreamingEngineClient( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java index 0bb80191cc3..2532fca5154 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java @@ -49,9 +49,11 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WindmillStreamSenderTest { + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private static final GetWorkRequest GET_WORK_REQUEST = GetWorkRequest.newBuilder().setClientId(1L).setJobId("job").setProjectId("project").build(); @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final GrpcWindmillStreamFactory streamFactory = spy( GrpcWindmillStreamFactory.of( @@ -68,7 +70,6 @@ public class WindmillStreamSenderTest { workItem, ackQueuedWorkItem, getWorkStreamLatencies) -> {}; - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private ManagedChannel inProcessChannel; private CloudWindmillServiceV1Alpha1Stub stub; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java deleted file mode 100644 index 962ffa6e37d..00000000000 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.CountDownLatch; -import java.util.function.Function; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.junit.After; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class ChannelCacheTest { - - private ChannelCache cache; - - private static ChannelCache newCache( - Function<WindmillServiceAddress, ManagedChannel> channelFactory) { - return ChannelCache.forTesting(channelFactory, () -> {}); - } - - @After - public void cleanUp() { - if (cache != null) { - cache.clear(); - } - } - - private ManagedChannel newChannel(String channelName) { - return WindmillChannelFactory.inProcessChannel(channelName); - } - - @Test - public void testLoadingCacheReturnsExistingChannel() { - String channelName = "existingChannel"; - ManagedChannel channel = newChannel(channelName); - Function<WindmillServiceAddress, ManagedChannel> channelFactory = - spy( - new Function<WindmillServiceAddress, ManagedChannel>() { - @Override - public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { - return channel; - } - }); - - cache = newCache(channelFactory); - WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); - // Initial call to load the cache. - assertThat(cache.get(someAddress)).isEqualTo(channel); - - ManagedChannel cachedChannel = cache.get(someAddress); - assertSame(channel, cachedChannel); - verify(channelFactory, times(1)).apply(eq(someAddress)); - } - - @Test - public void testLoadingCacheReturnsLoadsChannelWhenNotPresent() { - String channelName = "existingChannel"; - ManagedChannel channel = newChannel(channelName); - Function<WindmillServiceAddress, ManagedChannel> channelFactory = - spy( - new Function<WindmillServiceAddress, ManagedChannel>() { - @Override - public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { - return channel; - } - }); - - cache = newCache(channelFactory); - WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); - ManagedChannel cachedChannel = cache.get(someAddress); - assertSame(channel, cachedChannel); - verify(channelFactory, times(1)).apply(eq(someAddress)); - } - - @Test - public void testRemoveAndClose() throws InterruptedException { - String channelName = "existingChannel"; - CountDownLatch verifyRemovalListenerAsync = new CountDownLatch(1); - CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1); - cache = - ChannelCache.forTesting( - ignored -> newChannel(channelName), - () -> { - try { - verifyRemovalListenerAsync.await(); - notifyWhenChannelClosed.countDown(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - - WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); - ManagedChannel cachedChannel = cache.get(someAddress); - cache.remove(someAddress); - // Assert that the removal happened before we check to see if the shutdowns happen to confirm - // that removals are async. - assertTrue(cache.isEmpty()); - verifyRemovalListenerAsync.countDown(); - - // Assert that the channel gets shutdown. - notifyWhenChannelClosed.await(); - assertTrue(cachedChannel.isShutdown()); - - // Get should return a new channel, since we removed the last one. - ManagedChannel newChannel = cache.get(someAddress); - assertThat(newChannel).isNotSameInstanceAs(cachedChannel); - } - - @Test - public void testClear() throws InterruptedException { - String channelName = "existingChannel"; - CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1); - cache = - ChannelCache.forTesting( - ignored -> newChannel(channelName), notifyWhenChannelClosed::countDown); - WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); - ManagedChannel cachedChannel = cache.get(someAddress); - cache.clear(); - notifyWhenChannelClosed.await(); - assertTrue(cache.isEmpty()); - assertTrue(cachedChannel.isShutdown()); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java index af3a3e8295b..3dd40e5d5c7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java @@ -21,38 +21,27 @@ import java.util.function.Supplier; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @VisibleForTesting -public final class FakeWindmillStubFactory implements ChannelCachingStubFactory { - private final ChannelCache channelCache; +public final class FakeWindmillStubFactory implements WindmillStubFactory { + private final Supplier<Channel> channelFactory; - public FakeWindmillStubFactory(Supplier<ManagedChannel> channelFactory) { - this.channelCache = ChannelCache.create(ignored -> channelFactory.get()); + public FakeWindmillStubFactory(Supplier<Channel> channelFactory) { + this.channelFactory = channelFactory; } @Override public CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub(WindmillServiceAddress serviceAddress) { - return CloudWindmillServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress)); + return CloudWindmillServiceV1Alpha1Grpc.newStub(channelFactory.get()); } @Override public CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub createWindmillMetadataServiceStub(WindmillServiceAddress serviceAddress) { - return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelCache.get(serviceAddress)); - } - - @Override - public void remove(WindmillServiceAddress windmillServiceAddress) { - channelCache.remove(windmillServiceAddress); - } - - @Override - public void shutdown() { - channelCache.clear(); + return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(channelFactory.get()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java index 8e5ed66ebb4..249642aa6d1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java @@ -46,8 +46,9 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class EvenGetWorkBudgetDistributorTest { - @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private ManagedChannel inProcessChannel; private CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub; @@ -259,8 +260,8 @@ public class EvenGetWorkBudgetDistributorTest { (computation, inputDataWatermark, synchronizedProcessingTime, - wrappedWorkItem, - ackWorkItemQueued, + workItem, + ackQueuedWorkItem, getWorkStreamLatencies) -> {}); } }