This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ef5060416d9 [Dataflow Streaming] Enabled Heartbeat by Default (#31689) ef5060416d9 is described below commit ef5060416d9fed2e08a6682e69657c6fa9f98af4 Author: TongruiLi <12992126+tongru...@users.noreply.github.com> AuthorDate: Mon Jul 1 06:04:18 2024 -0700 [Dataflow Streaming] Enabled Heartbeat by Default (#31689) --- .../dataflow/worker/StreamingDataflowWorker.java | 4 +-- .../windmill/client/grpc/GrpcWindmillServer.java | 4 +-- .../client/grpc/GrpcWindmillServerTest.java | 37 +++++++++------------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index fc1be2cd137..59819db88a0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -602,8 +602,8 @@ public class StreamingDataflowWorker { .setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit()) .setSendKeyedGetDataRequests( !options.isEnableStreamingEngine() - || !DataflowRunner.hasExperiment( - options, "streaming_engine_send_new_heartbeat_requests")); + || DataflowRunner.hasExperiment( + options, "streaming_engine_disable_new_heartbeat_requests")); } private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index abf85d98548..0ab03a80318 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -176,8 +176,8 @@ public final class GrpcWindmillServer extends WindmillServerStub { testOptions(/* enableStreamingEngine= */ true, experiments); boolean sendKeyedGetDataRequests = !testOptions.isEnableStreamingEngine() - || !DataflowRunner.hasExperiment( - testOptions, "streaming_engine_send_new_heartbeat_requests"); + || DataflowRunner.hasExperiment( + testOptions, "streaming_engine_disable_new_heartbeat_requests"); GrpcWindmillStreamFactory windmillStreamFactory = GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId)) .setSendKeyedGetDataRequests(sendKeyedGetDataRequests) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index b1d5309e12d..6473d5527a8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -124,8 +124,16 @@ public class GrpcWindmillServerTest { @Before public void setUp() throws Exception { - String name = "Fake server for " + getClass(); + startServerAndClient(new ArrayList<>()); + } + + @After + public void tearDown() throws Exception { + server.shutdownNow(); + } + private void startServerAndClient(List<String> experiments) throws Exception { + String name = "Fake server for " + getClass(); this.server = InProcessServerBuilder.forName(name) .fallbackHandlerRegistry(serviceRegistry) @@ -136,17 +144,12 @@ public class GrpcWindmillServerTest { this.client = GrpcWindmillServer.newTestInstance( name, - new ArrayList<>(), + experiments, clientId, new FakeWindmillStubFactory( () -> grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name)))); } - @After - public void tearDown() throws Exception { - server.shutdownNow(); - } - private <Stream extends StreamObserver> void maybeInjectError(Stream stream) { if (remainingErrors > 0 && ThreadLocalRandom.current().nextInt(20) == 0) { try { @@ -880,6 +883,11 @@ public class GrpcWindmillServerTest { public void testStreamingGetDataHeartbeatsAsKeyedGetDataRequests() throws Exception { // This server records the heartbeats observed but doesn't respond. final Map<String, List<KeyedGetDataRequest>> getDataHeartbeats = new HashMap<>(); + // Create a client and server different from the one in SetUp so we can add an experiment to the + // options passed in. This requires teardown and re-constructing the client and server + tearDown(); + startServerAndClient( + Collections.singletonList("streaming_engine_disable_new_heartbeat_requests")); serviceRegistry.addService( new CloudWindmillServiceV1Alpha1ImplBase() { @@ -973,21 +981,6 @@ public class GrpcWindmillServerTest { @Test public void testStreamingGetDataHeartbeatsAsHeartbeatRequests() throws Exception { - // Create a client and server different from the one in SetUp so we can add an experiment to the - // options passed in. - this.server = - InProcessServerBuilder.forName("TestServer") - .fallbackHandlerRegistry(serviceRegistry) - .executor(Executors.newFixedThreadPool(1)) - .build() - .start(); - this.client = - GrpcWindmillServer.newTestInstance( - "TestServer", - Collections.singletonList("streaming_engine_send_new_heartbeat_requests"), - clientId, - new FakeWindmillStubFactory( - () -> WindmillChannelFactory.inProcessChannel("TestServer"))); // This server records the heartbeats observed but doesn't respond. final List<ComputationHeartbeatRequest> receivedHeartbeats = new ArrayList<>();