This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec1fb23ece move heartbeat processor to where it is being used (#31298)
6ec1fb23ece is described below

commit 6ec1fb23ece81721806bdc1323ffb23fa7ce55a0
Author: martin trieu <marti...@google.com>
AuthorDate: Fri Jun 28 02:31:11 2024 -0700

    move heartbeat processor to where it is being used (#31298)
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 172 +++++++++++++--------
 .../windmill/client/grpc/GrpcWindmillServer.java   |  53 +++----
 .../client/grpc/GrpcWindmillStreamFactory.java     | 101 +++++++-----
 .../client/grpc/StreamingEngineClient.java         |  22 +--
 .../windmill/client/grpc/WindmillStreamSender.java |  19 +--
 .../client/grpc/StreamingEngineClientTest.java     |   5 +-
 .../client/grpc/WindmillStreamSenderTest.java      |  14 +-
 .../budget/EvenGetWorkBudgetDistributorTest.java   |   3 +-
 8 files changed, 210 insertions(+), 179 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 98015e2ea71..fc1be2cd137 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -103,7 +103,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -121,11 +120,6 @@ public class StreamingDataflowWorker {
       MetricName.named(
           
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
           "throttling-msecs");
-  // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
-  static final int MAX_PROCESSING_THREADS = 300;
-  static final long THREAD_EXPIRATION_TIME_SEC = 60;
-  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
-  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
 
   /**
    * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the 
amount of data sinked
@@ -135,13 +129,20 @@ public class StreamingDataflowWorker {
    */
   public static final int MAX_SINK_BYTES = 10_000_000;
 
+  // Maximum number of threads for processing.  Currently, each thread 
processes one key at a time.
+  static final int MAX_PROCESSING_THREADS = 300;
+  static final long THREAD_EXPIRATION_TIME_SEC = 60;
+  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingDataflowWorker.class);
+
   /** The idGenerator to generate unique id globally. */
   private static final IdGenerator ID_GENERATOR = 
IdGenerators.decrementingLongs();
 
   private static final int DEFAULT_STATUS_PORT = 8081;
   // Maximum size of the result of a GetWork request.
   private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
+
   /** Maximum number of failure stacktraces to report in each update sent to 
backend. */
   private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
 
@@ -328,39 +329,27 @@ public class StreamingDataflowWorker {
         threadName ->
             Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat(threadName).build());
-    GrpcWindmillStreamFactory windmillStreamFactory =
-        createWindmillStreamFactory(options, clientId);
-    GrpcDispatcherClient dispatcherClient = 
GrpcDispatcherClient.create(createStubFactory(options));
-
-    // If ComputationConfig.Fetcher is the Streaming Appliance implementation, 
WindmillServerStub
-    // can be created without a heartbeat response processor, as appliance 
does not send heartbeats.
-    Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>> 
configFetcherAndWindmillClient =
-        createConfigFetcherAndWindmillClient(
-            options,
-            dataflowServiceClient,
-            dispatcherClient,
-            maxWorkItemCommitBytes,
-            windmillStreamFactory);
+    GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
+        createGrpcwindmillStreamFactoryBuilder(options, clientId);
+
+    ConfigFetcherComputationStateCacheAndWindmillClient
+        configFetcherComputationStateCacheAndWindmillClient =
+            createConfigFetcherComputationStateCacheAndWindmillClient(
+                options,
+                dataflowServiceClient,
+                maxWorkItemCommitBytes,
+                windmillStreamFactoryBuilder,
+                configFetcher ->
+                    ComputationStateCache.create(
+                        configFetcher,
+                        workExecutor,
+                        windmillStateCache::forComputation,
+                        ID_GENERATOR));
 
     ComputationStateCache computationStateCache =
-        ComputationStateCache.create(
-            configFetcherAndWindmillClient.getLeft(),
-            workExecutor,
-            windmillStateCache::forComputation,
-            ID_GENERATOR);
-
-    // If WindmillServerStub is not present, it is a Streaming Engine job. We 
now have all the
-    // components created to initialize the GrpcWindmillServer.
+        
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
     WindmillServerStub windmillServer =
-        configFetcherAndWindmillClient
-            .getRight()
-            .orElseGet(
-                () ->
-                    GrpcWindmillServer.create(
-                        options,
-                        windmillStreamFactory,
-                        dispatcherClient,
-                        new 
WorkHeartbeatResponseProcessor(computationStateCache::get)));
+        configFetcherComputationStateCacheAndWindmillClient.windmillServer();
 
     FailureTracker failureTracker =
         options.isEnableStreamingEngine()
@@ -393,7 +382,7 @@ public class StreamingDataflowWorker {
     return new StreamingDataflowWorker(
         windmillServer,
         clientId,
-        configFetcherAndWindmillClient.getLeft(),
+        configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
         computationStateCache,
         windmillStateCache,
         workExecutor,
@@ -407,20 +396,29 @@ public class StreamingDataflowWorker {
         streamingCounters,
         memoryMonitor,
         maxWorkItemCommitBytes,
-        windmillStreamFactory,
+        
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         executorSupplier,
         stageInfo);
   }
 
-  private static Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>>
-      createConfigFetcherAndWindmillClient(
+  /**
+   * {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and 
{@link
+   * WindmillServerStub} are constructed in different orders due to cyclic 
dependencies depending on
+   * the underlying implementation. This method simplifies creating them and 
returns an object with
+   * all of these dependencies initialized.
+   */
+  private static ConfigFetcherComputationStateCacheAndWindmillClient
+      createConfigFetcherComputationStateCacheAndWindmillClient(
           DataflowWorkerHarnessOptions options,
           WorkUnitClient dataflowServiceClient,
-          GrpcDispatcherClient dispatcherClient,
           AtomicInteger maxWorkItemCommitBytes,
-          GrpcWindmillStreamFactory windmillStreamFactory) {
+          GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
+          Function<ComputationConfig.Fetcher, ComputationStateCache> 
computationStateCacheFactory) {
     ComputationConfig.Fetcher configFetcher;
-    @Nullable WindmillServerStub windmillServer = null;
+    WindmillServerStub windmillServer;
+    ComputationStateCache computationStateCache;
+    GrpcDispatcherClient dispatcherClient = 
GrpcDispatcherClient.create(createStubFactory(options));
+    GrpcWindmillStreamFactory windmillStreamFactory;
     if (options.isEnableStreamingEngine()) {
       configFetcher =
           StreamingEngineComputationConfigFetcher.create(
@@ -431,13 +429,36 @@ public class StreamingDataflowWorker {
                       config,
                       dispatcherClient::consumeWindmillDispatcherEndpoints,
                       maxWorkItemCommitBytes));
+      computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
+      windmillStreamFactory =
+          windmillStreamFactoryBuilder
+              .setProcessHeartbeatResponses(
+                  new 
WorkHeartbeatResponseProcessor(computationStateCache::get))
+              .setHealthCheckIntervalMillis(
+                  options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+              .build();
+      windmillServer = GrpcWindmillServer.create(options, 
windmillStreamFactory, dispatcherClient);
     } else {
-      windmillServer =
-          createWindmillServerStub(options, windmillStreamFactory, 
dispatcherClient, ignored -> {});
+      if (options.getWindmillServiceEndpoint() != null
+          || options.getLocalWindmillHostport().startsWith("grpc:")) {
+        windmillStreamFactory =
+            windmillStreamFactoryBuilder
+                .setHealthCheckIntervalMillis(
+                    
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+                .build();
+        windmillServer =
+            GrpcWindmillServer.create(options, windmillStreamFactory, 
dispatcherClient);
+      } else {
+        windmillStreamFactory = windmillStreamFactoryBuilder.build();
+        windmillServer = new 
JniWindmillApplianceServer(options.getLocalWindmillHostport());
+      }
+
       configFetcher = new 
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+      computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
     }
 
-    return Pair.of(configFetcher, Optional.ofNullable(windmillServer));
+    return ConfigFetcherComputationStateCacheAndWindmillClient.create(
+        configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
   }
 
   @VisibleForTesting
@@ -516,6 +537,11 @@ public class StreamingDataflowWorker {
             options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
             options.getPerWorkerMetricsUpdateReportingPeriodMillis());
 
+    GrpcWindmillStreamFactory.Builder windmillStreamFactory =
+        createGrpcwindmillStreamFactoryBuilder(options, 1)
+            .setProcessHeartbeatResponses(
+                new 
WorkHeartbeatResponseProcessor(computationStateCache::get));
+
     return new StreamingDataflowWorker(
         windmillServer,
         1L,
@@ -533,7 +559,12 @@ public class StreamingDataflowWorker {
         streamingCounters,
         memoryMonitor,
         maxWorkItemCommitBytes,
-        createWindmillStreamFactory(options, 1),
+        options.isEnableStreamingEngine()
+            ? windmillStreamFactory
+                .setHealthCheckIntervalMillis(
+                    
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+                .build()
+            : windmillStreamFactory.build(),
         executorSupplier,
         stageInfo);
   }
@@ -552,7 +583,7 @@ public class StreamingDataflowWorker {
     }
   }
 
-  private static GrpcWindmillStreamFactory createWindmillStreamFactory(
+  private static GrpcWindmillStreamFactory.Builder 
createGrpcwindmillStreamFactoryBuilder(
       DataflowWorkerHarnessOptions options, long clientId) {
     Duration maxBackoff =
         !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
@@ -569,7 +600,10 @@ public class StreamingDataflowWorker {
         .setMaxBackOffSupplier(() -> maxBackoff)
         
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
         
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
-        .build();
+        .setSendKeyedGetDataRequests(
+            !options.isEnableStreamingEngine()
+                || !DataflowRunner.hasExperiment(
+                    options, "streaming_engine_send_new_heartbeat_requests"));
   }
 
   private static BoundedQueueExecutor 
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
@@ -619,23 +653,6 @@ public class StreamingDataflowWorker {
     worker.start();
   }
 
-  private static WindmillServerStub createWindmillServerStub(
-      DataflowWorkerHarnessOptions options,
-      GrpcWindmillStreamFactory windmillStreamFactory,
-      GrpcDispatcherClient dispatcherClient,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
-    if (options.getWindmillServiceEndpoint() != null
-        || options.isEnableStreamingEngine()
-        || options.getLocalWindmillHostport().startsWith("grpc:")) {
-      windmillStreamFactory.scheduleHealthChecks(
-          options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
-      return GrpcWindmillServer.create(
-          options, windmillStreamFactory, dispatcherClient, 
processHeartbeatResponses);
-    } else {
-      return new 
JniWindmillApplianceServer(options.getLocalWindmillHostport());
-    }
-  }
-
   private static ChannelCachingStubFactory createStubFactory(
       DataflowWorkerHarnessOptions workerOptions) {
     Function<WindmillServiceAddress, ManagedChannel> channelFactory =
@@ -895,4 +912,25 @@ public class StreamingDataflowWorker {
             .pendingCumulativeCounters()
             .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
   }
+
+  @AutoValue
+  abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {
+
+    private static ConfigFetcherComputationStateCacheAndWindmillClient create(
+        ComputationConfig.Fetcher configFetcher,
+        ComputationStateCache computationStateCache,
+        WindmillServerStub windmillServer,
+        GrpcWindmillStreamFactory windmillStreamFactory) {
+      return new 
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
+          configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
+    }
+
+    abstract ComputationConfig.Fetcher configFetcher();
+
+    abstract ComputationStateCache computationStateCache();
+
+    abstract WindmillServerStub windmillServer();
+
+    abstract GrpcWindmillStreamFactory windmillStreamFactory();
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 69807c523ed..abf85d98548 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -40,7 +39,6 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
@@ -96,29 +94,19 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   private final GrpcDispatcherClient dispatcherClient;
   private final DataflowWorkerHarnessOptions options;
   private final StreamingEngineThrottleTimers throttleTimers;
+  private final GrpcWindmillStreamFactory windmillStreamFactory;
   private Duration maxBackoff;
   private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub 
syncApplianceStub;
-  // If true, then active work refreshes will be sent as KeyedGetDataRequests. 
Otherwise, use the
-  // newer ComputationHeartbeatRequests.
-  private final boolean sendKeyedGetDataRequests;
-  private final Consumer<List<ComputationHeartbeatResponse>> 
processHeartbeatResponses;
-  private final GrpcWindmillStreamFactory windmillStreamFactory;
 
   private GrpcWindmillServer(
       DataflowWorkerHarnessOptions options,
       GrpcWindmillStreamFactory grpcWindmillStreamFactory,
-      GrpcDispatcherClient grpcDispatcherClient,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
+      GrpcDispatcherClient grpcDispatcherClient) {
     this.options = options;
     this.throttleTimers = StreamingEngineThrottleTimers.create();
     this.maxBackoff = 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
     this.dispatcherClient = grpcDispatcherClient;
     this.syncApplianceStub = null;
-    this.sendKeyedGetDataRequests =
-        !options.isEnableStreamingEngine()
-            || !DataflowRunner.hasExperiment(
-                options, "streaming_engine_send_new_heartbeat_requests");
-    this.processHeartbeatResponses = processHeartbeatResponses;
     this.windmillStreamFactory = grpcWindmillStreamFactory;
   }
 
@@ -148,11 +136,9 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   public static GrpcWindmillServer create(
       DataflowWorkerHarnessOptions workerOptions,
       GrpcWindmillStreamFactory grpcWindmillStreamFactory,
-      GrpcDispatcherClient dispatcherClient,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
+      GrpcDispatcherClient dispatcherClient) {
     GrpcWindmillServer grpcWindmillServer =
-        new GrpcWindmillServer(
-            workerOptions, grpcWindmillStreamFactory, dispatcherClient, 
processHeartbeatResponses);
+        new GrpcWindmillServer(workerOptions, grpcWindmillStreamFactory, 
dispatcherClient);
     if (workerOptions.getWindmillServiceEndpoint() != null) {
       grpcWindmillServer.configureWindmillServiceEndpoints();
     } else if (!workerOptions.isEnableStreamingEngine()
@@ -188,11 +174,18 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
 
     DataflowWorkerHarnessOptions testOptions =
         testOptions(/* enableStreamingEngine= */ true, experiments);
+    boolean sendKeyedGetDataRequests =
+        !testOptions.isEnableStreamingEngine()
+            || !DataflowRunner.hasExperiment(
+                testOptions, "streaming_engine_send_new_heartbeat_requests");
     GrpcWindmillStreamFactory windmillStreamFactory =
-        GrpcWindmillStreamFactory.of(createJobHeader(testOptions, 
clientId)).build();
-    windmillStreamFactory.scheduleHealthChecks(
-        testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
-    return new GrpcWindmillServer(testOptions, windmillStreamFactory, 
dispatcherClient, noop -> {});
+        GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
+            .setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
+            .setHealthCheckIntervalMillis(
+                
testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+            .build();
+
+    return new GrpcWindmillServer(testOptions, windmillStreamFactory, 
dispatcherClient);
   }
 
   @VisibleForTesting
@@ -205,8 +198,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
             options,
             GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
             // No-op, Appliance does not use Dispatcher to call Streaming 
Engine.
-            GrpcDispatcherClient.create(windmillStubFactory),
-            noop -> {});
+            GrpcDispatcherClient.create(windmillStubFactory));
     testServer.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
     return testServer;
   }
@@ -253,13 +245,13 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   }
 
   @Override
-  public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
-    
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+  public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+    return dispatcherClient.getDispatcherEndpoints();
   }
 
   @Override
-  public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
-    return dispatcherClient.getDispatcherEndpoints();
+  public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+    
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
   }
 
   @Override
@@ -357,10 +349,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   @Override
   public GetDataStream getDataStream() {
     return windmillStreamFactory.createGetDataStream(
-        dispatcherClient.getWindmillServiceStub(),
-        throttleTimers.getDataThrottleTimer(),
-        sendKeyedGetDataRequests,
-        this.processHeartbeatResponses);
+        dispatcherClient.getWindmillServiceStub(), 
throttleTimers.getDataThrottleTimer());
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
index c652e98e556..14866f3f586 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -68,6 +68,7 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
   private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1;
   private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT = 
Integer.MAX_VALUE;
   private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS = 
1;
+  private static final int NO_HEALTH_CHECKS = -1;
 
   private final JobHeader jobHeader;
   private final int logEveryNStreamFailures;
@@ -76,12 +77,18 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
   private final Supplier<BackOff> grpcBackOff;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final AtomicLong streamIdGenerator;
+  // If true, then active work refreshes will be sent as KeyedGetDataRequests. 
Otherwise, use the
+  // newer ComputationHeartbeatRequests.
+  private final boolean sendKeyedGetDataRequests;
+  private final Consumer<List<ComputationHeartbeatResponse>> 
processHeartbeatResponses;
 
-  GrpcWindmillStreamFactory(
+  private GrpcWindmillStreamFactory(
       JobHeader jobHeader,
       int logEveryNStreamFailures,
       int streamingRpcBatchLimit,
       int windmillMessagesBetweenIsReadyChecks,
+      boolean sendKeyedGetDataRequests,
+      Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
       Supplier<Duration> maxBackOffSupplier) {
     this.jobHeader = jobHeader;
     this.logEveryNStreamFailures = logEveryNStreamFailures;
@@ -96,9 +103,53 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
                     .withMaxBackoff(maxBackOffSupplier.get())
                     .backoff());
     this.streamRegistry = ConcurrentHashMap.newKeySet();
+    this.sendKeyedGetDataRequests = sendKeyedGetDataRequests;
+    this.processHeartbeatResponses = processHeartbeatResponses;
     this.streamIdGenerator = new AtomicLong();
   }
 
+  /** @implNote Used for {@link AutoBuilder} {@link Builder} class, do not 
call directly. */
+  static GrpcWindmillStreamFactory create(
+      JobHeader jobHeader,
+      int logEveryNStreamFailures,
+      int streamingRpcBatchLimit,
+      int windmillMessagesBetweenIsReadyChecks,
+      boolean sendKeyedGetDataRequests,
+      Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
+      Supplier<Duration> maxBackOffSupplier,
+      int healthCheckIntervalMillis) {
+    GrpcWindmillStreamFactory streamFactory =
+        new GrpcWindmillStreamFactory(
+            jobHeader,
+            logEveryNStreamFailures,
+            streamingRpcBatchLimit,
+            windmillMessagesBetweenIsReadyChecks,
+            sendKeyedGetDataRequests,
+            processHeartbeatResponses,
+            maxBackOffSupplier);
+
+    if (healthCheckIntervalMillis >= 0) {
+      // Health checks are run on background daemon thread, which will only be 
cleaned up on JVM
+      // shutdown.
+      new Timer("WindmillHealthCheckTimer")
+          .schedule(
+              new TimerTask() {
+                @Override
+                public void run() {
+                  Instant reportThreshold =
+                      
Instant.now().minus(Duration.millis(healthCheckIntervalMillis));
+                  for (AbstractWindmillStream<?, ?> stream : 
streamFactory.streamRegistry) {
+                    stream.maybeSendHealthCheck(reportThreshold);
+                  }
+                }
+              },
+              0,
+              healthCheckIntervalMillis);
+    }
+
+    return streamFactory;
+  }
+
   /**
    * Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with 
default values set for
    * the given {@link JobHeader}.
@@ -109,7 +160,10 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
         
.setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS)
         .setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF)
         .setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES)
-        .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT);
+        .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT)
+        .setHealthCheckIntervalMillis(NO_HEALTH_CHECKS)
+        .setSendKeyedGetDataRequests(true)
+        .setProcessHeartbeatResponses(ignored -> {});
   }
 
   private static <T extends AbstractStub<T>> T withDefaultDeadline(T stub) {
@@ -156,10 +210,7 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
   }
 
   public GetDataStream createGetDataStream(
-      CloudWindmillServiceV1Alpha1Stub stub,
-      ThrottleTimer getDataThrottleTimer,
-      boolean sendKeyedGetDataRequests,
-      Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses) {
+      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
getDataThrottleTimer) {
     return GrpcGetDataStream.create(
         responseObserver -> 
withDefaultDeadline(stub).getDataStream(responseObserver),
         grpcBackOff.get(),
@@ -174,11 +225,6 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
         processHeartbeatResponses);
   }
 
-  public GetDataStream createGetDataStream(
-      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
getDataThrottleTimer) {
-    return createGetDataStream(stub, getDataThrottleTimer, false, (response) 
-> {});
-  }
-
   public CommitWorkStream createCommitWorkStream(
       CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
commitWorkThrottleTimer) {
     return GrpcCommitWorkStream.create(
@@ -214,30 +260,6 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
         DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
   }
 
-  /**
-   * Schedules streaming RPC health checks to run on a background daemon 
thread, which will be
-   * cleaned up when the JVM shutdown.
-   */
-  public void scheduleHealthChecks(int healthCheckInterval) {
-    if (healthCheckInterval < 0) {
-      return;
-    }
-
-    new Timer("WindmillHealthCheckTimer")
-        .schedule(
-            new TimerTask() {
-              @Override
-              public void run() {
-                Instant reportThreshold = 
Instant.now().minus(Duration.millis(healthCheckInterval));
-                for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
-                  stream.maybeSendHealthCheck(reportThreshold);
-                }
-              }
-            },
-            0,
-            healthCheckInterval);
-  }
-
   @Override
   public void appendSummaryHtml(PrintWriter writer) {
     writer.write("Active Streams:<br>");
@@ -248,7 +270,7 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
   }
 
   @Internal
-  @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class)
+  @AutoBuilder(callMethod = "create")
   public interface Builder {
     Builder setJobHeader(JobHeader jobHeader);
 
@@ -260,6 +282,13 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
 
     Builder setMaxBackOffSupplier(Supplier<Duration> maxBackOff);
 
+    Builder setSendKeyedGetDataRequests(boolean sendKeyedGetDataRequests);
+
+    Builder setProcessHeartbeatResponses(
+        Consumer<List<ComputationHeartbeatResponse>> 
processHeartbeatResponses);
+
+    Builder setHealthCheckIntervalMillis(int healthCheckIntervalMillis);
+
     GrpcWindmillStreamFactory build();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
index a9ca749ff1c..4760062c575 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
@@ -30,13 +30,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.CheckReturnValue;
 import javax.annotation.concurrent.ThreadSafe;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
@@ -93,7 +91,6 @@ public final class StreamingEngineClient {
   private final Supplier<GetWorkerMetadataStream> getWorkerMetadataStream;
   private final Queue<WindmillEndpoints> newWindmillEndpoints;
   private final Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory;
-  private final Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor;
 
   /** Writes are guarded by synchronization, reads are lock free. */
   private final AtomicReference<StreamingEngineConnectionState> connections;
@@ -110,8 +107,7 @@ public final class StreamingEngineClient {
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
       long clientId,
-      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor) {
+      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory) {
     this.jobHeader = jobHeader;
     this.started = false;
     this.streamFactory = streamFactory;
@@ -147,7 +143,6 @@ public final class StreamingEngineClient {
                         newWorkerMetadataPublisher.submit(
                             () -> newWindmillEndpoints.add(endpoints))));
     this.workCommitterFactory = workCommitterFactory;
-    this.heartbeatResponseProcessor = heartbeatResponseProcessor;
   }
 
   private static ExecutorService singleThreadedExecutorServiceOf(String 
threadName) {
@@ -176,8 +171,7 @@ public final class StreamingEngineClient {
       ChannelCachingStubFactory channelCachingStubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
-      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatProcessor) {
+      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory) {
     return new StreamingEngineClient(
         jobHeader,
         totalGetWorkBudget,
@@ -187,8 +181,7 @@ public final class StreamingEngineClient {
         getWorkBudgetDistributor,
         dispatcherClient,
         /* clientId= */ new Random().nextLong(),
-        workCommitterFactory,
-        heartbeatProcessor);
+        workCommitterFactory);
   }
 
   @VisibleForTesting
@@ -201,8 +194,7 @@ public final class StreamingEngineClient {
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
       long clientId,
-      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor) {
+      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory) {
     StreamingEngineClient streamingEngineClient =
         new StreamingEngineClient(
             jobHeader,
@@ -213,8 +205,7 @@ public final class StreamingEngineClient {
             getWorkBudgetDistributor,
             dispatcherClient,
             clientId,
-            workCommitterFactory,
-            heartbeatResponseProcessor);
+            workCommitterFactory);
     streamingEngineClient.start();
     return streamingEngineClient;
   }
@@ -409,8 +400,7 @@ public final class StreamingEngineClient {
             GetWorkBudget.noBudget(),
             streamFactory,
             workItemScheduler,
-            workCommitterFactory,
-            heartbeatResponseProcessor);
+            workCommitterFactory);
     windmillStreamSender.startStreams();
     return windmillStreamSender;
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
index ff9ddc00c3f..e9f008eb522 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
@@ -73,24 +70,20 @@ public class WindmillStreamSender {
       AtomicReference<GetWorkBudget> getWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemScheduler workItemScheduler,
-      Function<CommitWorkStream, WorkCommitter> workCommitterFactory,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor) {
+      Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
     this.started = new AtomicBoolean(false);
     this.getWorkBudget = getWorkBudget;
     this.streamingEngineThrottleTimers = 
StreamingEngineThrottleTimers.create();
 
     // All streams are memoized/cached since they are expensive to create and 
some implementations
     // perform side effects on construction (i.e. sending initial requests to 
the stream server to
-    // initiate the streaming RPC connection). Stream instances 
connect/reconnect internally so we
+    // initiate the streaming RPC connection). Stream instances 
connect/reconnect internally, so we
     // can reuse the same instance through the entire lifecycle of 
WindmillStreamSender.
     this.getDataStream =
         Suppliers.memoize(
             () ->
                 streamingEngineStreamFactory.createGetDataStream(
-                    stub,
-                    streamingEngineThrottleTimers.getDataThrottleTimer(),
-                    false,
-                    heartbeatResponseProcessor));
+                    stub, 
streamingEngineThrottleTimers.getDataThrottleTimer()));
     this.commitWorkStream =
         Suppliers.memoize(
             () ->
@@ -116,16 +109,14 @@ public class WindmillStreamSender {
       GetWorkBudget getWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemScheduler workItemScheduler,
-      Function<CommitWorkStream, WorkCommitter> workCommitterFactory,
-      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor) {
+      Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
     return new WindmillStreamSender(
         stub,
         getWorkRequest,
         new AtomicReference<>(getWorkBudget),
         streamingEngineStreamFactory,
         workItemScheduler,
-        workCommitterFactory,
-        heartbeatResponseProcessor);
+        workCommitterFactory);
   }
 
   private static GetWorkRequest withRequestBudget(GetWorkRequest request, 
GetWorkBudget budget) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
index 9822daa9156..bc3afaff1b3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
@@ -181,8 +181,7 @@ public class StreamingEngineClientTest {
         getWorkBudgetDistributor,
         dispatcherClient,
         CLIENT_ID,
-        ignored -> mock(WorkCommitter.class),
-        ignored -> {});
+        ignored -> mock(WorkCommitter.class));
   }
 
   @Test
@@ -238,7 +237,7 @@ public class StreamingEngineClientTest {
         .createDirectGetWorkStream(
             any(), eq(getWorkRequest(0, 0)), any(), any(), any(), 
eq(noOpProcessWorkItemFn()));
 
-    verify(streamFactory, times(2)).createGetDataStream(any(), any(), 
eq(false), any());
+    verify(streamFactory, times(2)).createGetDataStream(any(), any());
     verify(streamFactory, times(2)).createCommitWorkStream(any(), any());
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
index 496f69dc52d..162c69509ae 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
@@ -107,7 +107,7 @@ public class WindmillStreamSenderTest {
             any(),
             eq(workItemScheduler));
 
-    verify(streamFactory).createGetDataStream(eq(stub), 
any(ThrottleTimer.class), eq(false), any());
+    verify(streamFactory).createGetDataStream(eq(stub), 
any(ThrottleTimer.class));
     verify(streamFactory).createCommitWorkStream(eq(stub), 
any(ThrottleTimer.class));
   }
 
@@ -138,8 +138,7 @@ public class WindmillStreamSenderTest {
             any(),
             eq(workItemScheduler));
 
-    verify(streamFactory, times(1))
-        .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false), 
any());
+    verify(streamFactory, times(1)).createGetDataStream(eq(stub), 
any(ThrottleTimer.class));
     verify(streamFactory, times(1)).createCommitWorkStream(eq(stub), 
any(ThrottleTimer.class));
   }
 
@@ -173,8 +172,7 @@ public class WindmillStreamSenderTest {
             any(),
             eq(workItemScheduler));
 
-    verify(streamFactory, times(1))
-        .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false), 
any());
+    verify(streamFactory, times(1)).createGetDataStream(eq(stub), 
any(ThrottleTimer.class));
     verify(streamFactory, times(1)).createCommitWorkStream(eq(stub), 
any(ThrottleTimer.class));
   }
 
@@ -208,8 +206,7 @@ public class WindmillStreamSenderTest {
             eq(workItemScheduler)))
         .thenReturn(mockGetWorkStream);
 
-    when(mockStreamFactory.createGetDataStream(
-            eq(stub), any(ThrottleTimer.class), eq(false), any()))
+    when(mockStreamFactory.createGetDataStream(eq(stub), 
any(ThrottleTimer.class)))
         .thenReturn(mockGetDataStream);
     when(mockStreamFactory.createCommitWorkStream(eq(stub), 
any(ThrottleTimer.class)))
         .thenReturn(mockCommitWorkStream);
@@ -239,7 +236,6 @@ public class WindmillStreamSenderTest {
         budget,
         streamFactory,
         workItemScheduler,
-        ignored -> mock(WorkCommitter.class),
-        ignored -> {});
+        ignored -> mock(WorkCommitter.class));
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
index 4fa424412ee..83ae8aa22ce 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
@@ -259,7 +259,6 @@ public class EvenGetWorkBudgetDistributorTest {
                     .build())
             .build(),
         (workItem, watermarks, processingContext, ackWorkItemQueued, 
getWorkStreamLatencies) -> {},
-        ignored -> mock(WorkCommitter.class),
-        ignored -> {});
+        ignored -> mock(WorkCommitter.class));
   }
 }


Reply via email to