This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ef5060416d9 [Dataflow Streaming] Enabled Heartbeat by Default (#31689)
ef5060416d9 is described below

commit ef5060416d9fed2e08a6682e69657c6fa9f98af4
Author: TongruiLi <12992126+tongru...@users.noreply.github.com>
AuthorDate: Mon Jul 1 06:04:18 2024 -0700

    [Dataflow Streaming] Enabled Heartbeat by Default (#31689)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 +--
 .../windmill/client/grpc/GrpcWindmillServer.java   |  4 +--
 .../client/grpc/GrpcWindmillServerTest.java        | 37 +++++++++-------------
 3 files changed, 19 insertions(+), 26 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index fc1be2cd137..59819db88a0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -602,8 +602,8 @@ public class StreamingDataflowWorker {
         
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
         .setSendKeyedGetDataRequests(
             !options.isEnableStreamingEngine()
-                || !DataflowRunner.hasExperiment(
-                    options, "streaming_engine_send_new_heartbeat_requests"));
+                || DataflowRunner.hasExperiment(
+                    options, 
"streaming_engine_disable_new_heartbeat_requests"));
   }
 
   private static BoundedQueueExecutor 
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index abf85d98548..0ab03a80318 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -176,8 +176,8 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
         testOptions(/* enableStreamingEngine= */ true, experiments);
     boolean sendKeyedGetDataRequests =
         !testOptions.isEnableStreamingEngine()
-            || !DataflowRunner.hasExperiment(
-                testOptions, "streaming_engine_send_new_heartbeat_requests");
+            || DataflowRunner.hasExperiment(
+                testOptions, 
"streaming_engine_disable_new_heartbeat_requests");
     GrpcWindmillStreamFactory windmillStreamFactory =
         GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
             .setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index b1d5309e12d..6473d5527a8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -124,8 +124,16 @@ public class GrpcWindmillServerTest {
 
   @Before
   public void setUp() throws Exception {
-    String name = "Fake server for " + getClass();
+    startServerAndClient(new ArrayList<>());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    server.shutdownNow();
+  }
 
+  private void startServerAndClient(List<String> experiments) throws Exception 
{
+    String name = "Fake server for " + getClass();
     this.server =
         InProcessServerBuilder.forName(name)
             .fallbackHandlerRegistry(serviceRegistry)
@@ -136,17 +144,12 @@ public class GrpcWindmillServerTest {
     this.client =
         GrpcWindmillServer.newTestInstance(
             name,
-            new ArrayList<>(),
+            experiments,
             clientId,
             new FakeWindmillStubFactory(
                 () -> 
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))));
   }
 
-  @After
-  public void tearDown() throws Exception {
-    server.shutdownNow();
-  }
-
   private <Stream extends StreamObserver> void maybeInjectError(Stream stream) 
{
     if (remainingErrors > 0 && ThreadLocalRandom.current().nextInt(20) == 0) {
       try {
@@ -880,6 +883,11 @@ public class GrpcWindmillServerTest {
   public void testStreamingGetDataHeartbeatsAsKeyedGetDataRequests() throws 
Exception {
     // This server records the heartbeats observed but doesn't respond.
     final Map<String, List<KeyedGetDataRequest>> getDataHeartbeats = new 
HashMap<>();
+    // Create a client and server different from the one in SetUp so we can 
add an experiment to the
+    // options passed in. This requires teardown and re-constructing the 
client and server
+    tearDown();
+    startServerAndClient(
+        
Collections.singletonList("streaming_engine_disable_new_heartbeat_requests"));
 
     serviceRegistry.addService(
         new CloudWindmillServiceV1Alpha1ImplBase() {
@@ -973,21 +981,6 @@ public class GrpcWindmillServerTest {
 
   @Test
   public void testStreamingGetDataHeartbeatsAsHeartbeatRequests() throws 
Exception {
-    // Create a client and server different from the one in SetUp so we can 
add an experiment to the
-    // options passed in.
-    this.server =
-        InProcessServerBuilder.forName("TestServer")
-            .fallbackHandlerRegistry(serviceRegistry)
-            .executor(Executors.newFixedThreadPool(1))
-            .build()
-            .start();
-    this.client =
-        GrpcWindmillServer.newTestInstance(
-            "TestServer",
-            
Collections.singletonList("streaming_engine_send_new_heartbeat_requests"),
-            clientId,
-            new FakeWindmillStubFactory(
-                () -> WindmillChannelFactory.inProcessChannel("TestServer")));
     // This server records the heartbeats observed but doesn't respond.
     final List<ComputationHeartbeatRequest> receivedHeartbeats = new 
ArrayList<>();
 

Reply via email to