This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ba714221d5e organize and refactor GrpcWindmillServer. (#29156)
ba714221d5e is described below

commit ba714221d5efaea58a59bccaad2eaeef70bd4ec4
Author: martin trieu <marti...@google.com>
AuthorDate: Tue Oct 31 02:37:10 2023 -0700

    organize and refactor GrpcWindmillServer. (#29156)
    
    organize and refactor GrpcWindmillServer to prepare for Streaming Engine 
Client changes.
---
 .../google-cloud-dataflow-java/worker/build.gradle |   3 +
 .../worker/MetricTrackingWindmillServerStub.java   |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   6 +-
 .../options/StreamingDataflowWorkerOptions.java    |   2 +-
 .../worker/windmill/WindmillServerBase.java        |   8 +-
 .../worker/windmill/WindmillServerStub.java        |   8 +-
 .../{ => client}/AbstractWindmillStream.java       |   3 +-
 .../windmill/{ => client}/WindmillStream.java      |  21 +-
 .../windmill/{ => client}/WindmillStreamPool.java  |   2 +-
 .../grpc}/AppendableInputStream.java               |   2 +-
 .../grpc}/GetWorkTimingInfosTracker.java           |   2 +-
 .../grpc}/GrpcCommitWorkStream.java                |   9 +-
 .../grpc}/GrpcDeadlineClientInterceptor.java       |   2 +-
 .../windmill/client/grpc/GrpcDispatcherClient.java | 136 +++++
 .../grpc}/GrpcGetDataStream.java                   |  13 +-
 .../grpc}/GrpcGetDataStreamRequests.java           |   2 +-
 .../grpc}/GrpcGetWorkStream.java                   |  29 +-
 .../grpc}/GrpcGetWorkerMetadataStream.java         |   9 +-
 .../windmill/client/grpc/GrpcWindmillServer.java   | 355 ++++++++++++
 .../client/grpc/GrpcWindmillStreamFactory.java     | 227 ++++++++
 .../grpc/auth/VendoredCredentialsAdapter.java      |  81 +++
 .../VendoredRequestMetadataCallbackAdapter.java    |  51 ++
 .../grpc/observers}/DirectStreamObserver.java      |   2 +-
 .../ForwardingClientResponseObserver.java          |   2 +-
 .../grpc/observers}/StreamObserverFactory.java     |   2 +-
 .../client/grpc/stubs/WindmillChannelFactory.java  | 137 +++++
 .../client/grpc/stubs/WindmillStubFactory.java     |  73 +++
 .../throttling/StreamingEngineThrottleTimers.java  |  41 ++
 .../throttling}/ThrottleTimer.java                 |   6 +-
 .../windmill/grpcclient/GrpcWindmillServer.java    | 607 ---------------------
 .../worker/windmill/work/WorkItemReceiver.java     |  34 ++
 .../worker/windmill/work/budget/GetWorkBudget.java |  98 ++++
 .../dataflow/worker/FakeWindmillServer.java        |  24 +-
 .../{ => client}/WindmillStreamPoolTest.java       |   2 +-
 .../grpc}/GrpcGetWorkerMetadataStreamTest.java     |   9 +-
 .../grpc}/GrpcWindmillServerTest.java              |  15 +-
 .../windmill/work/budget/GetWorkBudgetTest.java    |  72 +++
 37 files changed, 1413 insertions(+), 686 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle 
b/runners/google-cloud-dataflow-java/worker/build.gradle
index ce06063c9b5..1ca9eba2b48 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -89,6 +89,9 @@ applyJavaNature(
                 // Allow slf4j implementation worker for logging during 
pipeline execution
                 "org/slf4j/impl/**"
         ],
+        generatedClassPatterns: [
+                /^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/
+        ],
         shadowClosure: {
             // Each included dependency must also include all of its necessary 
transitive dependencies
             // or have them provided by the users pipeline during job 
submission. Typically a users
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index 33b55647213..0e929249b3a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -29,8 +29,8 @@ import 
org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 811250ee785..11849e8b8c4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -104,9 +104,9 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
 import org.apache.beam.sdk.coders.Coder;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
index cc5b3302b01..bacfa1eef63 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
-import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
index fe81eece138..8caa79cd3f7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.dataflow.worker.windmill;
 
 import java.io.IOException;
 import java.util.Set;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 /**
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index 1bb5359e06f..c327e68d7e9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Set;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 /** Stub for communicating with a Windmill server. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index ea7efff7a06..4e47676989a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
similarity index 84%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
index 4dd4164fc4e..fa1f797a191 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.ThreadSafe;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import org.joda.time.Instant;
 
 /** Superclass for streams returned by streaming Windmill methods. */
@@ -41,16 +41,11 @@ public interface WindmillStream {
   /** Handle representing a stream of GetWork responses. */
   @ThreadSafe
   interface GetWorkStream extends WindmillStream {
-    /** Functional interface for receiving WorkItems. */
-    @FunctionalInterface
-    interface WorkItemReceiver {
-      void receiveWork(
-          String computation,
-          @Nullable Instant inputDataWatermark,
-          @Nullable Instant synchronizedProcessingTime,
-          Windmill.WorkItem workItem,
-          Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);
-    }
+    /** Adjusts the {@link GetWorkBudget} for the stream. */
+    void adjustBudget(long itemsDelta, long bytesDelta);
+
+    /** Returns the remaining in-flight {@link GetWorkBudget}. */
+    GetWorkBudget remainingBudget();
   }
 
   /** Interface for streaming GetDataRequests to Windmill. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
similarity index 99%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
index 9cd4ab0ea4a..9f1b67edc1e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
index dbd3613ee4c..6a0d0a63d5a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import java.io.IOException;
 import java.io.InputStream;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
similarity index 99%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
index e6710993af9..221b18be164 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
similarity index 96%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
index 1bba40805de..5d0a5085fe1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
@@ -27,15 +27,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
similarity index 97%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
index 6b0e19cbb48..629006e2359 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
new file mode 100644
index 00000000000..ef9156f9c05
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
+@ThreadSafe
+class GrpcDispatcherClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
+  private final WindmillStubFactory windmillStubFactory;
+
+  @GuardedBy("this")
+  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
+
+  @GuardedBy("this")
+  private final Set<HostAndPort> dispatcherEndpoints;
+
+  @GuardedBy("this")
+  private final Random rand;
+
+  private GrpcDispatcherClient(
+      WindmillStubFactory windmillStubFactory,
+      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      Set<HostAndPort> dispatcherEndpoints,
+      Random rand) {
+    this.windmillStubFactory = windmillStubFactory;
+    this.dispatcherStubs = dispatcherStubs;
+    this.dispatcherEndpoints = dispatcherEndpoints;
+    this.rand = rand;
+  }
+
+  static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
+    return new GrpcDispatcherClient(
+        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+  }
+
+  @VisibleForTesting
+  static GrpcDispatcherClient forTesting(
+      WindmillStubFactory windmillGrpcStubFactory,
+      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      Set<HostAndPort> dispatcherEndpoints) {
+    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    return new GrpcDispatcherClient(
+        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+  }
+
+  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+    Preconditions.checkState(
+        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (dispatcherStubs.size() == 1
+        ? dispatcherStubs.get(0)
+        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  }
+
+  synchronized boolean isReady() {
+    return !dispatcherStubs.isEmpty();
+  }
+
+  synchronized void consumeWindmillDispatcherEndpoints(
+      ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    Preconditions.checkArgument(
+        dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
+        "Cannot set dispatcher endpoints to nothing.");
+    if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+      // The endpoints are equal don't recreate the stubs.
+      return;
+    }
+
+    LOG.info("Creating a new windmill stub, endpoints: {}", 
dispatcherEndpoints);
+    if (!this.dispatcherEndpoints.isEmpty()) {
+      LOG.info("Previous windmill stub endpoints: {}", 
this.dispatcherEndpoints);
+    }
+
+    resetDispatcherEndpoints(dispatcherEndpoints);
+  }
+
+  private synchronized void resetDispatcherEndpoints(
+      ImmutableSet<HostAndPort> newDispatcherEndpoints) {
+    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
newDispatcherEndpoints);
+    this.dispatcherStubs.clear();
+    this.dispatcherEndpoints.clear();
+    this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+
+    dispatcherEndpoints.stream()
+        .map(this::createDispatcherStubForWindmillService)
+        .forEach(dispatcherStubs::add);
+  }
+
+  private CloudWindmillServiceV1Alpha1Stub 
createDispatcherStubForWindmillService(
+      HostAndPort endpoint) {
+    if (LOCALHOST.equals(endpoint.getHost())) {
+      return 
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+    }
+
+    // Use an in-process stub if testing.
+    return windmillStubFactory.getKind() == WindmillStubFactory.Kind.IN_PROCESS
+        ? windmillStubFactory.inProcess().get()
+        : 
windmillStubFactory.remote().apply(WindmillServiceAddress.create(endpoint));
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
similarity index 95%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
index 238cc771dce..ea9cd7f0fa3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;
@@ -33,8 +33,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
@@ -43,9 +41,12 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataReq
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedBatch;
-import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
 import org.joda.time.Instant;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
index 7da7b13958b..cda9537127d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import com.google.auto.value.AutoOneOf;
 import java.util.ArrayList;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
similarity index 89%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
index 4660fe25b13..d7d9bfddffb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequestExtension;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
@@ -44,7 +46,7 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class GrpcGetWorkStream
+public final class GrpcGetWorkStream
     extends AbstractWindmillStream<StreamingGetWorkRequest, 
StreamingGetWorkResponseChunk>
     implements GetWorkStream {
 
@@ -79,7 +81,7 @@ final class GrpcGetWorkStream
     this.inflightBytes = new AtomicLong();
   }
 
-  static GrpcGetWorkStream create(
+  public static GrpcGetWorkStream create(
       Function<
               StreamObserver<StreamingGetWorkResponseChunk>,
               StreamObserver<StreamingGetWorkRequest>>
@@ -190,6 +192,19 @@ final class GrpcGetWorkStream
     getWorkThrottleTimer.start();
   }
 
+  @Override
+  public void adjustBudget(long itemsDelta, long bytesDelta) {
+    // no-op
+  }
+
+  @Override
+  public GetWorkBudget remainingBudget() {
+    return GetWorkBudget.builder()
+        .setBytes(request.getMaxBytes() - inflightBytes.get())
+        .setItems(request.getMaxItems() - inflightMessages.get())
+        .build();
+  }
+
   private class WorkItemBuffer {
     private final GetWorkTimingInfosTracker workTimingInfosTracker;
     private String computation;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
similarity index 93%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
index 427fd412ec7..a403feddb45 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.io.PrintWriter;
@@ -23,13 +23,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
new file mode 100644
index 00000000000..3a881df7146
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** gRPC client for communicating with Streaming Engine. */
+@SuppressFBWarnings({
+  // Very likely real potentials for bugs.
+  "JLM_JSR166_UTILCONCURRENT_MONITORENTER", // 
https://github.com/apache/beam/issues/19273
+  "IS2_INCONSISTENT_SYNC" // https://github.com/apache/beam/issues/19271
+})
+@SuppressWarnings("nullness") // 
TODO(https://github.com/apache/beam/issues/20497
+public final class GrpcWindmillServer extends WindmillServerStub {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServer.class);
+  private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
+  private static final Duration MIN_BACKOFF = Duration.millis(1);
+  private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
+  private static final int NO_HEALTH_CHECK = -1;
+  private static final String GRPC_LOCALHOST = "grpc:localhost";
+
+  private final GrpcWindmillStreamFactory windmillStreamFactory;
+  private final GrpcDispatcherClient dispatcherClient;
+  private final StreamingDataflowWorkerOptions options;
+  private final StreamingEngineThrottleTimers throttleTimers;
+  private Duration maxBackoff;
+  private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub 
syncApplianceStub;
+
+  private GrpcWindmillServer(
+      StreamingDataflowWorkerOptions options, GrpcDispatcherClient 
grpcDispatcherClient) {
+    this.options = options;
+    this.throttleTimers = StreamingEngineThrottleTimers.create();
+    this.maxBackoff = MAX_BACKOFF;
+    this.windmillStreamFactory =
+        GrpcWindmillStreamFactory.of(
+                JobHeader.newBuilder()
+                    .setJobId(options.getJobId())
+                    .setProjectId(options.getProject())
+                    .setWorkerId(options.getWorkerId())
+                    .build())
+            .setWindmillMessagesBetweenIsReadyChecks(
+                options.getWindmillMessagesBetweenIsReadyChecks())
+            .setMaxBackOffSupplier(() -> maxBackoff)
+            .setLogEveryNStreamFailures(
+                options.getWindmillServiceStreamingLogEveryNStreamFailures())
+            
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
+            .build();
+    windmillStreamFactory.scheduleHealthChecks(
+        options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
+
+    this.dispatcherClient = grpcDispatcherClient;
+    this.syncApplianceStub = null;
+  }
+
+  private static StreamingDataflowWorkerOptions testOptions(boolean 
enableStreamingEngine) {
+    StreamingDataflowWorkerOptions options =
+        
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+    options.setProject("project");
+    options.setJobId("job");
+    options.setWorkerId("worker");
+    List<String> experiments =
+        options.getExperiments() == null ? new ArrayList<>() : 
options.getExperiments();
+    if (enableStreamingEngine) {
+      experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
+    }
+    options.setExperiments(experiments);
+
+    options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE);
+    options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK);
+    
options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES);
+
+    return options;
+  }
+
+  /** Create new instance of {@link GrpcWindmillServer}. */
+  public static GrpcWindmillServer create(StreamingDataflowWorkerOptions 
workerOptions)
+      throws IOException {
+
+    GrpcWindmillServer grpcWindmillServer =
+        new GrpcWindmillServer(
+            workerOptions,
+            GrpcDispatcherClient.create(
+                WindmillStubFactory.remoteStubFactory(
+                    
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+                    workerOptions.getGcpCredential())));
+    if (workerOptions.getWindmillServiceEndpoint() != null) {
+      grpcWindmillServer.configureWindmillServiceEndpoints();
+    } else if (!workerOptions.isEnableStreamingEngine()
+        && workerOptions.getLocalWindmillHostport() != null) {
+      grpcWindmillServer.configureLocalHost();
+    }
+
+    return grpcWindmillServer;
+  }
+
+  @VisibleForTesting
+  static GrpcWindmillServer newTestInstance(String name) {
+    ManagedChannel inProcessChannel = inProcessChannel(name);
+    CloudWindmillServiceV1Alpha1Stub stub =
+        CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel);
+    List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs = 
Lists.newArrayList(stub);
+    Set<HostAndPort> dispatcherEndpoints = 
Sets.newHashSet(HostAndPort.fromHost(name));
+    GrpcDispatcherClient dispatcherClient =
+        GrpcDispatcherClient.forTesting(
+            WindmillStubFactory.inProcessStubFactory(name, unused -> 
inProcessChannel),
+            dispatcherStubs,
+            dispatcherEndpoints);
+    return new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ 
true), dispatcherClient);
+  }
+
+  @VisibleForTesting
+  static GrpcWindmillServer newApplianceTestInstance(Channel channel) {
+    GrpcWindmillServer testServer =
+        new GrpcWindmillServer(
+            testOptions(/* enableStreamingEngine= */ false),
+            // No-op, Appliance does not use Dispatcher to call Streaming 
Engine.
+            
GrpcDispatcherClient.create(WindmillStubFactory.inProcessStubFactory("test")));
+    testServer.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
+    return testServer;
+  }
+
+  private static WindmillApplianceGrpc.WindmillApplianceBlockingStub
+      createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) {
+    return WindmillApplianceGrpc.newBlockingStub(channel)
+        
.withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline());
+  }
+
+  private static UnsupportedOperationException 
unsupportedUnaryRequestInStreamingEngineException(
+      String rpcName) {
+    return new UnsupportedOperationException(
+        String.format("Unary %s calls are not supported in Streaming Engine.", 
rpcName));
+  }
+
+  private void configureWindmillServiceEndpoints() {
+    Set<HostAndPort> endpoints = new HashSet<>();
+    for (String endpoint : 
Splitter.on(',').split(options.getWindmillServiceEndpoint())) {
+      endpoints.add(
+          
HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort()));
+    }
+
+    
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+  }
+
+  private void configureLocalHost() {
+    int portStart = options.getLocalWindmillHostport().lastIndexOf(':');
+    String endpoint = options.getLocalWindmillHostport().substring(0, 
portStart);
+    Preconditions.checkState(GRPC_LOCALHOST.equals(endpoint));
+    int port = 
Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1));
+    dispatcherClient.consumeWindmillDispatcherEndpoints(
+        ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port)));
+    initializeLocalHost(port);
+  }
+
+  @Override
+  public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+    
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+  }
+
+  @Override
+  public boolean isReady() {
+    return dispatcherClient.isReady();
+  }
+
+  private synchronized void initializeLocalHost(int port) {
+    this.maxBackoff = Duration.millis(500);
+    if (options.isEnableStreamingEngine()) {
+      dispatcherClient.consumeWindmillDispatcherEndpoints(
+          ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port)));
+    } else {
+      this.syncApplianceStub =
+          
createWindmillApplianceStubWithDeadlineInterceptor(localhostChannel(port));
+    }
+  }
+
+  @Override
+  public void appendSummaryHtml(PrintWriter writer) {
+    windmillStreamFactory.appendSummaryHtml(writer);
+  }
+
+  private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
+    // Configure backoff to retry calls forever, with a maximum sane retry 
interval.
+    BackOff backoff =
+        
FluentBackoff.DEFAULT.withInitialBackoff(MIN_BACKOFF).withMaxBackoff(maxBackoff).backoff();
+
+    int rpcErrors = 0;
+    while (true) {
+      try {
+        return function.get();
+      } catch (StatusRuntimeException e) {
+        try {
+          if (++rpcErrors % 20 == 0) {
+            LOG.warn(
+                "Many exceptions calling gRPC. Last exception: {} with status 
{}",
+                e,
+                e.getStatus());
+          }
+          if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+            throw new RpcException(e);
+          }
+        } catch (IOException | InterruptedException i) {
+          if (i instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
+          RpcException rpcException = new RpcException(e);
+          rpcException.addSuppressed(i);
+          throw rpcException;
+        }
+      }
+    }
+  }
+
+  @Override
+  public GetWorkResponse getWork(GetWorkRequest request) {
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.getWork(request));
+    }
+
+    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
+  }
+
+  @Override
+  public GetDataResponse getData(GetDataRequest request) {
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.getData(request));
+    }
+
+    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
+  }
+
+  @Override
+  public CommitWorkResponse commitWork(CommitWorkRequest request) {
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.commitWork(request));
+    }
+    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
+  }
+
+  @Override
+  public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver 
receiver) {
+    return windmillStreamFactory.createGetWorkStream(
+        dispatcherClient.getDispatcherStub(),
+        GetWorkRequest.newBuilder(request)
+            .setJobId(options.getJobId())
+            .setProjectId(options.getProject())
+            .setWorkerId(options.getWorkerId())
+            .build(),
+        throttleTimers.getWorkThrottleTimer(),
+        receiver);
+  }
+
+  @Override
+  public GetDataStream getDataStream() {
+    return windmillStreamFactory.createGetDataStream(
+        dispatcherClient.getDispatcherStub(), 
throttleTimers.getDataThrottleTimer());
+  }
+
+  @Override
+  public CommitWorkStream commitWorkStream() {
+    return windmillStreamFactory.createCommitWorkStream(
+        dispatcherClient.getDispatcherStub(), 
throttleTimers.commitWorkThrottleTimer());
+  }
+
+  @Override
+  public GetConfigResponse getConfig(GetConfigRequest request) {
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.getConfig(request));
+    }
+
+    throw new RpcException(
+        new UnsupportedOperationException("GetConfig not supported in 
Streaming Engine."));
+  }
+
+  @Override
+  public ReportStatsResponse reportStats(ReportStatsRequest request) {
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.reportStats(request));
+    }
+
+    throw new RpcException(
+        new UnsupportedOperationException("ReportStats not supported in 
Streaming Engine."));
+  }
+
+  @Override
+  public long getAndResetThrottleTime() {
+    return throttleTimers.getAndResetThrottleTime();
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
new file mode 100644
index 00000000000..e474ebf18b2
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
+
+import com.google.auto.value.AutoBuilder;
+import java.io.PrintWriter;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Creates gRPC streaming connections to Windmill Service. Maintains a set of 
all currently opened
+ * RPC streams for health check/heartbeat requests to keep the streams alive.
+ */
+@ThreadSafe
+public final class GrpcWindmillStreamFactory implements StatusDataProvider {
+  private static final Duration MIN_BACKOFF = Duration.millis(1);
+  private static final Duration DEFAULT_MAX_BACKOFF = 
Duration.standardSeconds(30);
+  private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1;
+  private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT = 
Integer.MAX_VALUE;
+  private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS = 
1;
+
+  private final JobHeader jobHeader;
+  private final int logEveryNStreamFailures;
+  private final int streamingRpcBatchLimit;
+  private final int windmillMessagesBetweenIsReadyChecks;
+  private final Supplier<BackOff> grpcBackOff;
+  private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
+  private final AtomicLong streamIdGenerator;
+
+  GrpcWindmillStreamFactory(
+      JobHeader jobHeader,
+      int logEveryNStreamFailures,
+      int streamingRpcBatchLimit,
+      int windmillMessagesBetweenIsReadyChecks,
+      Supplier<Duration> maxBackOffSupplier) {
+    this.jobHeader = jobHeader;
+    this.logEveryNStreamFailures = logEveryNStreamFailures;
+    this.streamingRpcBatchLimit = streamingRpcBatchLimit;
+    this.windmillMessagesBetweenIsReadyChecks = 
windmillMessagesBetweenIsReadyChecks;
+    // Configure backoff to retry calls forever, with a maximum sane retry 
interval.
+    this.grpcBackOff =
+        Suppliers.memoize(
+            () ->
+                FluentBackoff.DEFAULT
+                    .withInitialBackoff(MIN_BACKOFF)
+                    .withMaxBackoff(maxBackOffSupplier.get())
+                    .backoff());
+    this.streamRegistry = ConcurrentHashMap.newKeySet();
+    this.streamIdGenerator = new AtomicLong();
+  }
+
+  /**
+   * Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with 
default values set for
+   * the given {@link JobHeader}.
+   */
+  public static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) {
+    return new AutoBuilder_GrpcWindmillStreamFactory_Builder()
+        .setJobHeader(jobHeader)
+        
.setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS)
+        .setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF)
+        .setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES)
+        .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT);
+  }
+
+  private static CloudWindmillServiceV1Alpha1Stub withDeadline(
+      CloudWindmillServiceV1Alpha1Stub stub) {
+    // Deadlines are absolute points in time, so generate a new one everytime 
this function is
+    // called.
+    return stub.withDeadlineAfter(
+        AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, 
TimeUnit.SECONDS);
+  }
+
+  public GetWorkStream createGetWorkStream(
+      CloudWindmillServiceV1Alpha1Stub stub,
+      GetWorkRequest request,
+      ThrottleTimer getWorkThrottleTimer,
+      WorkItemReceiver processWorkItem) {
+    return GrpcGetWorkStream.create(
+        responseObserver -> withDeadline(stub).getWorkStream(responseObserver),
+        request,
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        getWorkThrottleTimer,
+        processWorkItem);
+  }
+
+  public GetDataStream createGetDataStream(
+      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
getDataThrottleTimer) {
+    return GrpcGetDataStream.create(
+        responseObserver -> withDeadline(stub).getDataStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        getDataThrottleTimer,
+        jobHeader,
+        streamIdGenerator,
+        streamingRpcBatchLimit);
+  }
+
+  public CommitWorkStream createCommitWorkStream(
+      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
commitWorkThrottleTimer) {
+    return GrpcCommitWorkStream.create(
+        responseObserver -> 
withDeadline(stub).commitWorkStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        commitWorkThrottleTimer,
+        jobHeader,
+        streamIdGenerator,
+        streamingRpcBatchLimit);
+  }
+
+  public GetWorkerMetadataStream createGetWorkerMetadataStream(
+      CloudWindmillServiceV1Alpha1Stub stub,
+      ThrottleTimer getWorkerMetadataThrottleTimer,
+      Consumer<WindmillEndpoints> onNewWindmillEndpoints) {
+    return GrpcGetWorkerMetadataStream.create(
+        responseObserver -> 
withDeadline(stub).getWorkerMetadataStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        jobHeader,
+        0,
+        getWorkerMetadataThrottleTimer,
+        onNewWindmillEndpoints);
+  }
+
+  private StreamObserverFactory newStreamObserverFactory() {
+    return StreamObserverFactory.direct(
+        DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
+  }
+
+  /**
+   * Schedules streaming RPC health checks to run on a background daemon 
thread, which will be
+   * cleaned up when the JVM shutdown.
+   */
+  public void scheduleHealthChecks(int healthCheckInterval) {
+    if (healthCheckInterval < 0) {
+      return;
+    }
+
+    new Timer("WindmillHealthCheckTimer")
+        .schedule(
+            new TimerTask() {
+              @Override
+              public void run() {
+                Instant reportThreshold = 
Instant.now().minus(Duration.millis(healthCheckInterval));
+                for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+                  stream.maybeSendHealthCheck(reportThreshold);
+                }
+              }
+            },
+            0,
+            healthCheckInterval);
+  }
+
+  @Override
+  public void appendSummaryHtml(PrintWriter writer) {
+    writer.write("Active Streams:<br>");
+    for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+      stream.appendSummaryHtml(writer);
+      writer.write("<br>");
+    }
+  }
+
+  @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class)
+  interface Builder {
+    Builder setJobHeader(JobHeader jobHeader);
+
+    Builder setLogEveryNStreamFailures(int logEveryNStreamFailures);
+
+    Builder setStreamingRpcBatchLimit(int streamingRpcBatchLimit);
+
+    Builder setWindmillMessagesBetweenIsReadyChecks(int 
windmillMessagesBetweenIsReadyChecks);
+
+    Builder setMaxBackOffSupplier(Supplier<Duration> maxBackOff);
+
+    GrpcWindmillStreamFactory build();
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
new file mode 100644
index 00000000000..23f6fb801a4
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a wrapper around credentials that delegates to the underlying {@link
+ * com.google.auth.Credentials}. Note that this class should override every 
method that is not final
+ * and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation
+ * delegate to reduce maintenance burden.
+ */
+public class VendoredCredentialsAdapter
+    extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials {
+
+  private final com.google.auth.Credentials credentials;
+
+  public VendoredCredentialsAdapter(com.google.auth.Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  @Override
+  public String getAuthenticationType() {
+    return credentials.getAuthenticationType();
+  }
+
+  @Override
+  public Map<String, List<String>> getRequestMetadata() throws IOException {
+    return credentials.getRequestMetadata();
+  }
+
+  @Override
+  public void getRequestMetadata(
+      final URI uri,
+      Executor executor,
+      final 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback 
callback) {
+    credentials.getRequestMetadata(
+        uri, executor, new VendoredRequestMetadataCallbackAdapter(callback));
+  }
+
+  @Override
+  public Map<String, List<String>> getRequestMetadata(URI uri) throws 
IOException {
+    return credentials.getRequestMetadata(uri);
+  }
+
+  @Override
+  public boolean hasRequestMetadata() {
+    return credentials.hasRequestMetadata();
+  }
+
+  @Override
+  public boolean hasRequestMetadataOnly() {
+    return credentials.hasRequestMetadataOnly();
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    credentials.refresh();
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
new file mode 100644
index 00000000000..8b1b695287e
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Create a wrapper around credentials callback that delegates to the 
underlying vendored {@link
+ * com.google.auth.RequestMetadataCallback}. Note that this class should 
override every method that
+ * is not final and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation
+ * delegate to reduce maintenance burden.
+ */
+public class VendoredRequestMetadataCallbackAdapter
+    implements com.google.auth.RequestMetadataCallback {
+
+  private final 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
+      callback;
+
+  VendoredRequestMetadataCallbackAdapter(
+      
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback 
callback) {
+    this.callback = callback;
+  }
+
+  @Override
+  public void onSuccess(Map<String, List<String>> metadata) {
+    callback.onSuccess(metadata);
+  }
+
+  @Override
+  public void onFailure(Throwable exception) {
+    callback.onFailure(exception);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
index 3c7798126e5..5fb22476ab3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
 
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
similarity index 96%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
index a1f80598d89..007717d03b5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
 
 import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
similarity index 97%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
index e0878b7b0b9..e3f12687638 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
 
 import java.util.function.Function;
 import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
new file mode 100644
index 00000000000..48cf8ff3f76
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+
+/** Utility class used to create different RPC Channels. */
+public final class WindmillChannelFactory {
+  public static final String LOCALHOST = "localhost";
+  private static final int DEFAULT_GRPC_PORT = 443;
+
+  private WindmillChannelFactory() {}
+
+  public static ManagedChannel inProcessChannel(String channelName) {
+    return 
InProcessChannelBuilder.forName(channelName).directExecutor().build();
+  }
+
+  public static Channel localhostChannel(int port) {
+    return NettyChannelBuilder.forAddress(LOCALHOST, port)
+        .maxInboundMessageSize(Integer.MAX_VALUE)
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .build();
+  }
+
+  static Channel remoteChannel(
+      WindmillServiceAddress windmillServiceAddress, int 
windmillServiceRpcChannelTimeoutSec) {
+    switch (windmillServiceAddress.getKind()) {
+      case IPV6:
+        return remoteChannel(windmillServiceAddress.ipv6(), 
windmillServiceRpcChannelTimeoutSec);
+      case GCP_SERVICE_ADDRESS:
+        return remoteChannel(
+            windmillServiceAddress.gcpServiceAddress(), 
windmillServiceRpcChannelTimeoutSec);
+        // switch is exhaustive will never happen.
+      default:
+        throw new UnsupportedOperationException(
+            "Only IPV6 and GCP_SERVICE_ADDRESS are supported 
WindmillServiceAddresses.");
+    }
+  }
+
+  public static Channel remoteChannel(
+      HostAndPort endpoint, int windmillServiceRpcChannelTimeoutSec) {
+    try {
+      return createRemoteChannel(
+          NettyChannelBuilder.forAddress(endpoint.getHost(), 
endpoint.getPort()),
+          windmillServiceRpcChannelTimeoutSec);
+    } catch (SSLException sslException) {
+      throw new WindmillChannelCreationException(endpoint, sslException);
+    }
+  }
+
+  public static Channel remoteChannel(
+      Inet6Address directEndpoint, int port, int 
windmillServiceRpcChannelTimeoutSec) {
+    try {
+      return createRemoteChannel(
+          NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint, 
port)),
+          windmillServiceRpcChannelTimeoutSec);
+    } catch (SSLException sslException) {
+      throw new WindmillChannelCreationException(directEndpoint.toString(), 
sslException);
+    }
+  }
+
+  public static Channel remoteChannel(
+      Inet6Address directEndpoint, int windmillServiceRpcChannelTimeoutSec) {
+    try {
+      return createRemoteChannel(
+          NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint, 
DEFAULT_GRPC_PORT)),
+          windmillServiceRpcChannelTimeoutSec);
+    } catch (SSLException sslException) {
+      throw new WindmillChannelCreationException(directEndpoint.toString(), 
sslException);
+    }
+  }
+
+  @SuppressWarnings("nullness")
+  private static Channel createRemoteChannel(
+      NettyChannelBuilder channelBuilder, int 
windmillServiceRpcChannelTimeoutSec)
+      throws SSLException {
+    if (windmillServiceRpcChannelTimeoutSec > 0) {
+      channelBuilder
+          .keepAliveTime(windmillServiceRpcChannelTimeoutSec, TimeUnit.SECONDS)
+          .keepAliveTimeout(windmillServiceRpcChannelTimeoutSec, 
TimeUnit.SECONDS)
+          .keepAliveWithoutCalls(true);
+    }
+
+    return channelBuilder
+        .flowControlWindow(10 * 1024 * 1024)
+        .maxInboundMessageSize(Integer.MAX_VALUE)
+        .maxInboundMetadataSize(1024 * 1024)
+        .negotiationType(NegotiationType.TLS)
+        // Set ciphers(null) to not use GCM, which is disabled for Dataflow
+        // due to it being horribly slow.
+        .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+        .build();
+  }
+
+  public static class WindmillChannelCreationException extends 
IllegalStateException {
+    private WindmillChannelCreationException(HostAndPort endpoint, 
SSLException sourceException) {
+      super(
+          String.format(
+              "Exception thrown when trying to create channel to 
endpoint={host:%s; port:%d}",
+              endpoint.getHost(), endpoint.getPort()),
+          sourceException);
+    }
+
+    WindmillChannelCreationException(String directEndpoint, Throwable 
sourceException) {
+      super(
+          String.format(
+              "Exception thrown when trying to create channel to 
endpoint={%s}", directEndpoint),
+          sourceException);
+    }
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
new file mode 100644
index 00000000000..0c7719b0bc1
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.auth.Credentials;
+import com.google.auto.value.AutoOneOf;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials;
+
+/**
+ * Used to create stubs to talk to Streaming Engine. Stubs are either 
in-process for testing, or
+ * remote.
+ */
+@AutoOneOf(WindmillStubFactory.Kind.class)
+public abstract class WindmillStubFactory {
+
+  public static WindmillStubFactory inProcessStubFactory(
+      String testName, Function<String, ManagedChannel> channelFactory) {
+    return AutoOneOf_WindmillStubFactory.inProcess(
+        () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(channelFactory.apply(testName)));
+  }
+
+  public static WindmillStubFactory inProcessStubFactory(String testName) {
+    return AutoOneOf_WindmillStubFactory.inProcess(
+        () ->
+            CloudWindmillServiceV1Alpha1Grpc.newStub(
+                WindmillChannelFactory.inProcessChannel(testName)));
+  }
+
+  public static WindmillStubFactory remoteStubFactory(
+      int rpcChannelTimeoutSec, Credentials gcpCredentials) {
+    return AutoOneOf_WindmillStubFactory.remote(
+        directEndpoint ->
+            CloudWindmillServiceV1Alpha1Grpc.newStub(
+                    remoteChannel(directEndpoint, rpcChannelTimeoutSec))
+                .withCallCredentials(
+                    MoreCallCredentials.from(new 
VendoredCredentialsAdapter(gcpCredentials))));
+  }
+
+  public abstract Kind getKind();
+
+  public abstract Supplier<CloudWindmillServiceV1Alpha1Stub> inProcess();
+
+  public abstract Function<WindmillServiceAddress, 
CloudWindmillServiceV1Alpha1Stub> remote();
+
+  public enum Kind {
+    IN_PROCESS,
+    REMOTE
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
new file mode 100644
index 00000000000..6b8dd272037
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class StreamingEngineThrottleTimers {
+
+  public static StreamingEngineThrottleTimers create() {
+    return new AutoValue_StreamingEngineThrottleTimers(
+        new ThrottleTimer(), new ThrottleTimer(), new ThrottleTimer());
+  }
+
+  public long getAndResetThrottleTime() {
+    return getWorkThrottleTimer().getAndResetThrottleTime()
+        + getDataThrottleTimer().getAndResetThrottleTime()
+        + commitWorkThrottleTimer().getAndResetThrottleTime();
+  }
+
+  public abstract ThrottleTimer getWorkThrottleTimer();
+
+  public abstract ThrottleTimer getDataThrottleTimer();
+
+  public abstract ThrottleTimer commitWorkThrottleTimer();
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
similarity index 94%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
index 237339aff39..f660112721b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
 
 import org.joda.time.Instant;
 
@@ -25,7 +25,7 @@ import org.joda.time.Instant;
  * CommitWork are both blocked for x, totalTime will be 2x. However, if 2 
GetWork streams are both
  * blocked for x totalTime will be x. All methods are thread safe.
  */
-class ThrottleTimer {
+public final class ThrottleTimer {
   // This is -1 if not currently being throttled or the time in
   // milliseconds when throttling for this type started.
   private long startTime = -1;
@@ -36,7 +36,7 @@ class ThrottleTimer {
   /**
    * Starts the timer if it has not been started and does nothing if it has 
already been started.
    */
-  synchronized void start() {
+  public synchronized void start() {
     if (!throttled()) { // This timer is not started yet so start it now.
       startTime = Instant.now().getMillis();
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
deleted file mode 100644
index 19cb90297df..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
+++ /dev/null
@@ -1,607 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Sleeper;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials;
-import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** gRPC client for communicating with Streaming Engine. */
-// Very likely real potential for bugs - 
https://github.com/apache/beam/issues/19273
-// Very likely real potential for bugs - 
https://github.com/apache/beam/issues/19271
-@SuppressFBWarnings({"JLM_JSR166_UTILCONCURRENT_MONITORENTER", 
"IS2_INCONSISTENT_SYNC"})
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public final class GrpcWindmillServer extends WindmillServerStub {
-  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServer.class);
-
-  // If a connection cannot be established, gRPC will fail fast so this 
deadline can be relatively
-  // high.
-  private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
-  private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
-  private static final String LOCALHOST = "localhost";
-  private static final Duration MIN_BACKOFF = Duration.millis(1);
-  private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
-  private static final AtomicLong nextId = new AtomicLong(0);
-  private static final int NO_HEALTH_CHECK = -1;
-
-  private final StreamingDataflowWorkerOptions options;
-  private final int streamingRpcBatchLimit;
-  private final 
List<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub> 
stubList;
-  private final ThrottleTimer getWorkThrottleTimer;
-  private final ThrottleTimer getDataThrottleTimer;
-  private final ThrottleTimer commitWorkThrottleTimer;
-  private final Random rand;
-  private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
-  private ImmutableSet<HostAndPort> endpoints;
-  private int logEveryNStreamFailures;
-  private Duration maxBackoff = MAX_BACKOFF;
-  private WindmillApplianceGrpc.WindmillApplianceBlockingStub 
syncApplianceStub = null;
-
-  private GrpcWindmillServer(StreamingDataflowWorkerOptions options) {
-    this.options = options;
-    this.streamingRpcBatchLimit = 
options.getWindmillServiceStreamingRpcBatchLimit();
-    this.stubList = new ArrayList<>();
-    this.logEveryNStreamFailures = 
options.getWindmillServiceStreamingLogEveryNStreamFailures();
-    this.endpoints = ImmutableSet.of();
-    this.getWorkThrottleTimer = new ThrottleTimer();
-    this.getDataThrottleTimer = new ThrottleTimer();
-    this.commitWorkThrottleTimer = new ThrottleTimer();
-    this.rand = new Random();
-    this.streamRegistry = Collections.newSetFromMap(new ConcurrentHashMap<>());
-  }
-
-  private static StreamingDataflowWorkerOptions testOptions(boolean 
enableStreamingEngine) {
-    StreamingDataflowWorkerOptions options =
-        
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
-    options.setProject("project");
-    options.setJobId("job");
-    options.setWorkerId("worker");
-    List<String> experiments =
-        options.getExperiments() == null ? new ArrayList<>() : 
options.getExperiments();
-    if (enableStreamingEngine) {
-      experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
-    }
-    options.setExperiments(experiments);
-
-    options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE);
-    options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK);
-    
options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES);
-
-    return options;
-  }
-
-  /** Create new instance of {@link GrpcWindmillServer}. */
-  public static GrpcWindmillServer create(StreamingDataflowWorkerOptions 
workerOptions)
-      throws IOException {
-    GrpcWindmillServer grpcWindmillServer = new 
GrpcWindmillServer(workerOptions);
-    if (workerOptions.getWindmillServiceEndpoint() != null) {
-      grpcWindmillServer.configureWindmillServiceEndpoints();
-    } else if (!workerOptions.isEnableStreamingEngine()
-        && workerOptions.getLocalWindmillHostport() != null) {
-      grpcWindmillServer.configureLocalHost();
-    }
-
-    if (workerOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs() > 0) 
{
-      grpcWindmillServer.scheduleHealthCheckTimer(
-          workerOptions, () -> grpcWindmillServer.streamRegistry);
-    }
-
-    return grpcWindmillServer;
-  }
-
-  @VisibleForTesting
-  static GrpcWindmillServer newTestInstance(String name) {
-    GrpcWindmillServer testServer =
-        new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ true));
-    
testServer.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel(name)));
-    return testServer;
-  }
-
-  @VisibleForTesting
-  static GrpcWindmillServer newApplianceTestInstance(Channel channel) {
-    GrpcWindmillServer testServer =
-        new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ 
false));
-    testServer.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
-    return testServer;
-  }
-
-  private static WindmillApplianceGrpc.WindmillApplianceBlockingStub
-      createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) {
-    return WindmillApplianceGrpc.newBlockingStub(channel)
-        
.withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline());
-  }
-
-  private static Channel inProcessChannel(String name) {
-    return InProcessChannelBuilder.forName(name).directExecutor().build();
-  }
-
-  private static Channel localhostChannel(int port) {
-    return NettyChannelBuilder.forAddress(LOCALHOST, port)
-        .maxInboundMessageSize(Integer.MAX_VALUE)
-        .negotiationType(NegotiationType.PLAINTEXT)
-        .build();
-  }
-
-  private static UnsupportedOperationException 
unsupportedUnaryRequestInStreamingEngineException(
-      String rpcName) {
-    return new UnsupportedOperationException(
-        String.format("Unary %s calls are not supported in Streaming Engine.", 
rpcName));
-  }
-
-  private void scheduleHealthCheckTimer(
-      StreamingDataflowWorkerOptions options, 
Supplier<Set<AbstractWindmillStream<?, ?>>> streams) {
-    new Timer("WindmillHealthCheckTimer")
-        .schedule(
-            new HealthCheckTimerTask(options, streams),
-            0,
-            options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
-  }
-
-  private void configureWindmillServiceEndpoints() throws IOException {
-    Set<HostAndPort> endpoints = new HashSet<>();
-    for (String endpoint : 
Splitter.on(',').split(options.getWindmillServiceEndpoint())) {
-      endpoints.add(
-          
HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort()));
-    }
-    initializeWindmillService(endpoints);
-  }
-
-  private void configureLocalHost() {
-    int portStart = options.getLocalWindmillHostport().lastIndexOf(':');
-    String endpoint = options.getLocalWindmillHostport().substring(0, 
portStart);
-    assert ("grpc:localhost".equals(endpoint));
-    int port = 
Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1));
-    this.endpoints = ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port));
-    initializeLocalHost(port);
-  }
-
-  @Override
-  public synchronized void setWindmillServiceEndpoints(Set<HostAndPort> 
endpoints)
-      throws IOException {
-    Preconditions.checkNotNull(endpoints);
-    if (endpoints.equals(this.endpoints)) {
-      // The endpoints are equal don't recreate the stubs.
-      return;
-    }
-    LOG.info("Creating a new windmill stub, endpoints: {}", endpoints);
-    if (this.endpoints != null) {
-      LOG.info("Previous windmill stub endpoints: {}", this.endpoints);
-    }
-    initializeWindmillService(endpoints);
-  }
-
-  @Override
-  public synchronized boolean isReady() {
-    return !stubList.isEmpty();
-  }
-
-  private synchronized void initializeLocalHost(int port) {
-    this.logEveryNStreamFailures = 1;
-    this.maxBackoff = Duration.millis(500);
-    Channel channel = localhostChannel(port);
-    if (options.isEnableStreamingEngine()) {
-      this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
-    } else {
-      this.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
-    }
-  }
-
-  private synchronized void initializeWindmillService(Set<HostAndPort> 
endpoints)
-      throws IOException {
-    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
endpoints);
-    this.stubList.clear();
-    this.endpoints = ImmutableSet.copyOf(endpoints);
-    for (HostAndPort endpoint : this.endpoints) {
-      if (LOCALHOST.equals(endpoint.getHost())) {
-        initializeLocalHost(endpoint.getPort());
-      } else {
-        this.stubList.add(
-            CloudWindmillServiceV1Alpha1Grpc.newStub(remoteChannel(endpoint))
-                .withCallCredentials(
-                    MoreCallCredentials.from(
-                        new 
VendoredCredentialsAdapter(options.getGcpCredential()))));
-      }
-    }
-  }
-
-  private Channel remoteChannel(HostAndPort endpoint) throws IOException {
-    NettyChannelBuilder builder =
-        NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort());
-    int timeoutSec = options.getWindmillServiceRpcChannelAliveTimeoutSec();
-    if (timeoutSec > 0) {
-      builder
-          .keepAliveTime(timeoutSec, TimeUnit.SECONDS)
-          .keepAliveTimeout(timeoutSec, TimeUnit.SECONDS)
-          .keepAliveWithoutCalls(true);
-    }
-    return builder
-        .flowControlWindow(10 * 1024 * 1024)
-        .maxInboundMessageSize(Integer.MAX_VALUE)
-        .maxInboundMetadataSize(1024 * 1024)
-        .negotiationType(NegotiationType.TLS)
-        // Set ciphers(null) to not use GCM, which is disabled for Dataflow
-        // due to it being horribly slow.
-        .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
-        .build();
-  }
-
-  /**
-   * Stubs returned from this method do not (and should not) have {@link
-   * org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) set since they 
represent an absolute
-   * point in time. {@link 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) should not be
-   * treated as a timeout which represents a relative point in time.
-   *
-   * @see <a href=https://grpc.io/blog/deadlines/>Official gRPC deadline 
documentation for more
-   *     details.</a>
-   */
-  private synchronized 
CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub() {
-    if (stubList.isEmpty()) {
-      throw new RuntimeException("windmillServiceEndpoint has not been set");
-    }
-
-    return stubList.size() == 1 ? stubList.get(0) : 
stubList.get(rand.nextInt(stubList.size()));
-  }
-
-  @Override
-  public void appendSummaryHtml(PrintWriter writer) {
-    writer.write("Active Streams:<br>");
-    for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
-      stream.appendSummaryHtml(writer);
-      writer.write("<br>");
-    }
-  }
-
-  // Configure backoff to retry calls forever, with a maximum sane retry 
interval.
-  private BackOff grpcBackoff() {
-    return FluentBackoff.DEFAULT
-        .withInitialBackoff(MIN_BACKOFF)
-        .withMaxBackoff(maxBackoff)
-        .backoff();
-  }
-
-  private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
-    BackOff backoff = grpcBackoff();
-    int rpcErrors = 0;
-    while (true) {
-      try {
-        return function.get();
-      } catch (StatusRuntimeException e) {
-        try {
-          if (++rpcErrors % 20 == 0) {
-            LOG.warn(
-                "Many exceptions calling gRPC. Last exception: {} with status 
{}",
-                e,
-                e.getStatus());
-          }
-          if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
-            throw new RpcException(e);
-          }
-        } catch (IOException | InterruptedException i) {
-          if (i instanceof InterruptedException) {
-            Thread.currentThread().interrupt();
-          }
-          RpcException rpcException = new RpcException(e);
-          rpcException.addSuppressed(i);
-          throw rpcException;
-        }
-      }
-    }
-  }
-
-  @Override
-  public GetWorkResponse getWork(GetWorkRequest request) {
-    if (syncApplianceStub != null) {
-      return callWithBackoff(() -> syncApplianceStub.getWork(request));
-    }
-
-    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
-  }
-
-  @Override
-  public GetDataResponse getData(GetDataRequest request) {
-    if (syncApplianceStub != null) {
-      return callWithBackoff(() -> syncApplianceStub.getData(request));
-    }
-
-    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
-  }
-
-  @Override
-  public CommitWorkResponse commitWork(CommitWorkRequest request) {
-    if (syncApplianceStub != null) {
-      return callWithBackoff(() -> syncApplianceStub.commitWork(request));
-    }
-    throw new 
RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
-  }
-
-  private StreamObserverFactory newStreamObserverFactory() {
-    return StreamObserverFactory.direct(
-        DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
options.getWindmillMessagesBetweenIsReadyChecks());
-  }
-
-  @Override
-  public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver 
receiver) {
-    GetWorkRequest getWorkRequest =
-        GetWorkRequest.newBuilder(request)
-            .setJobId(options.getJobId())
-            .setProjectId(options.getProject())
-            .setWorkerId(options.getWorkerId())
-            .build();
-
-    return GrpcGetWorkStream.create(
-        responseObserver ->
-            stub()
-                // Deadlines are absolute points in time, so generate a new 
one everytime this
-                // function is called.
-                .withDeadlineAfter(
-                    
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
-                .getWorkStream(responseObserver),
-        getWorkRequest,
-        grpcBackoff(),
-        newStreamObserverFactory(),
-        streamRegistry,
-        logEveryNStreamFailures,
-        getWorkThrottleTimer,
-        receiver);
-  }
-
-  @Override
-  public GetDataStream getDataStream() {
-    return GrpcGetDataStream.create(
-        responseObserver ->
-            stub()
-                // Deadlines are absolute points in time, so generate a new 
one everytime this
-                // function is called.
-                .withDeadlineAfter(
-                    
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
-                .getDataStream(responseObserver),
-        grpcBackoff(),
-        newStreamObserverFactory(),
-        streamRegistry,
-        logEveryNStreamFailures,
-        getDataThrottleTimer,
-        makeHeader(),
-        nextId,
-        streamingRpcBatchLimit);
-  }
-
-  @Override
-  public CommitWorkStream commitWorkStream() {
-    return GrpcCommitWorkStream.create(
-        responseObserver ->
-            stub()
-                // Deadlines are absolute points in time, so generate a new 
one everytime this
-                // function is called.
-                .withDeadlineAfter(
-                    
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
-                .commitWorkStream(responseObserver),
-        grpcBackoff(),
-        newStreamObserverFactory(),
-        streamRegistry,
-        logEveryNStreamFailures,
-        commitWorkThrottleTimer,
-        makeHeader(),
-        nextId,
-        streamingRpcBatchLimit);
-  }
-
-  @Override
-  public GetConfigResponse getConfig(GetConfigRequest request) {
-    if (syncApplianceStub != null) {
-      return callWithBackoff(() -> syncApplianceStub.getConfig(request));
-    }
-
-    throw new RpcException(
-        new UnsupportedOperationException("GetConfig not supported in 
Streaming Engine."));
-  }
-
-  @Override
-  public ReportStatsResponse reportStats(ReportStatsRequest request) {
-    if (syncApplianceStub != null) {
-      return callWithBackoff(() -> syncApplianceStub.reportStats(request));
-    }
-
-    throw new RpcException(
-        new UnsupportedOperationException("ReportStats not supported in 
Streaming Engine."));
-  }
-
-  @Override
-  public long getAndResetThrottleTime() {
-    return getWorkThrottleTimer.getAndResetThrottleTime()
-        + getDataThrottleTimer.getAndResetThrottleTime()
-        + commitWorkThrottleTimer.getAndResetThrottleTime();
-  }
-
-  private JobHeader makeHeader() {
-    return JobHeader.newBuilder()
-        .setJobId(options.getJobId())
-        .setProjectId(options.getProject())
-        .setWorkerId(options.getWorkerId())
-        .build();
-  }
-
-  /**
-   * Create a wrapper around credentials callback that delegates to the 
underlying vendored {@link
-   * com.google.auth.RequestMetadataCallback}. Note that this class should 
override every method
-   * that is not final and not static and call the delegate directly.
-   *
-   * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation
-   * delegate to reduce maintenance burden.
-   */
-  private static class VendoredRequestMetadataCallbackAdapter
-      implements com.google.auth.RequestMetadataCallback {
-
-    private final 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
-        callback;
-
-    private VendoredRequestMetadataCallbackAdapter(
-        
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback 
callback) {
-      this.callback = callback;
-    }
-
-    @Override
-    public void onSuccess(Map<String, List<String>> metadata) {
-      callback.onSuccess(metadata);
-    }
-
-    @Override
-    public void onFailure(Throwable exception) {
-      callback.onFailure(exception);
-    }
-  }
-
-  /**
-   * Create a wrapper around credentials that delegates to the underlying 
{@link
-   * com.google.auth.Credentials}. Note that this class should override every 
method that is not
-   * final and not static and call the delegate directly.
-   *
-   * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation
-   * delegate to reduce maintenance burden.
-   */
-  private static class VendoredCredentialsAdapter
-      extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials {
-
-    private final com.google.auth.Credentials credentials;
-
-    private VendoredCredentialsAdapter(com.google.auth.Credentials 
credentials) {
-      this.credentials = credentials;
-    }
-
-    @Override
-    public String getAuthenticationType() {
-      return credentials.getAuthenticationType();
-    }
-
-    @Override
-    public Map<String, List<String>> getRequestMetadata() throws IOException {
-      return credentials.getRequestMetadata();
-    }
-
-    @Override
-    public void getRequestMetadata(
-        final URI uri,
-        Executor executor,
-        final 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
-            callback) {
-      credentials.getRequestMetadata(
-          uri, executor, new VendoredRequestMetadataCallbackAdapter(callback));
-    }
-
-    @Override
-    public Map<String, List<String>> getRequestMetadata(URI uri) throws 
IOException {
-      return credentials.getRequestMetadata(uri);
-    }
-
-    @Override
-    public boolean hasRequestMetadata() {
-      return credentials.hasRequestMetadata();
-    }
-
-    @Override
-    public boolean hasRequestMetadataOnly() {
-      return credentials.hasRequestMetadataOnly();
-    }
-
-    @Override
-    public void refresh() throws IOException {
-      credentials.refresh();
-    }
-  }
-
-  private static class HealthCheckTimerTask extends TimerTask {
-    private final StreamingDataflowWorkerOptions options;
-    private final Supplier<Set<AbstractWindmillStream<?, ?>>> streams;
-
-    public HealthCheckTimerTask(
-        StreamingDataflowWorkerOptions options,
-        Supplier<Set<AbstractWindmillStream<?, ?>>> streams) {
-      this.options = options;
-      this.streams = streams;
-    }
-
-    @Override
-    public void run() {
-      Instant reportThreshold =
-          Instant.now()
-              
.minus(Duration.millis(options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()));
-      for (AbstractWindmillStream<?, ?> stream : streams.get()) {
-        stream.maybeSendHealthCheck(reportThreshold);
-      }
-    }
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
new file mode 100644
index 00000000000..307dfdfa17b
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work;
+
+import java.util.Collection;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/** Functional interface for receiving WorkItems. */
+@FunctionalInterface
+public interface WorkItemReceiver {
+  void receiveWork(
+      String computation,
+      @Nullable Instant inputDataWatermark,
+      @Nullable Instant synchronizedProcessingTime,
+      Windmill.WorkItem workItem,
+      Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
new file mode 100644
index 00000000000..0038e3e9cc6
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via 
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned 
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+  public static GetWorkBudget.Builder builder() {
+    return new AutoValue_GetWorkBudget.Builder();
+  }
+
+  /** {@link GetWorkBudget} of 0. */
+  public static GetWorkBudget noBudget() {
+    return builder().setItems(0).setBytes(0).build();
+  }
+
+  public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+    return builder()
+        .setItems(getWorkRequest.getMaxItems())
+        .setBytes(getWorkRequest.getMaxBytes())
+        .build();
+  }
+
+  /**
+   * Adds the given bytes and items or the current budget, returning a new 
{@link GetWorkBudget}.
+   * Does not drop below 0.
+   */
+  public GetWorkBudget add(long items, long bytes) {
+    Preconditions.checkArgument(items >= 0 && bytes >= 0);
+    return GetWorkBudget.builder().setBytes(bytes() + bytes).setItems(items() 
+ items).build();
+  }
+
+  public GetWorkBudget add(GetWorkBudget other) {
+    return add(other.items(), other.bytes());
+  }
+
+  /**
+   * Subtracts the given bytes and items or the current budget, returning a 
new {@link
+   * GetWorkBudget}. Does not drop below 0.
+   */
+  public GetWorkBudget subtract(long items, long bytes) {
+    Preconditions.checkArgument(items >= 0 && bytes >= 0);
+    return GetWorkBudget.builder().setBytes(bytes() - bytes).setItems(items() 
- items).build();
+  }
+
+  public GetWorkBudget subtract(GetWorkBudget other) {
+    return subtract(other.items(), other.bytes());
+  }
+
+  /** Budget of bytes for GetWork. Does not drop below 0. */
+  public abstract long bytes();
+
+  /** Budget of items for GetWork. Does not drop below 0. */
+  public abstract long items();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setBytes(long bytes);
+
+    public abstract Builder setItems(long budget);
+
+    abstract long items();
+
+    abstract long bytes();
+
+    abstract GetWorkBudget autoBuild();
+
+    public final GetWorkBudget build() {
+      setItems(Math.max(0, items()));
+      setBytes(Math.max(0, bytes()));
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 4700217dc8a..092f5e59a13 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -53,9 +53,11 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribut
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.Duration;
@@ -198,8 +200,7 @@ class FakeWindmillServer extends WindmillServerStub {
   }
 
   @Override
-  public GetWorkStream getWorkStream(
-      Windmill.GetWorkRequest request, GetWorkStream.WorkItemReceiver 
receiver) {
+  public GetWorkStream getWorkStream(Windmill.GetWorkRequest request, 
WorkItemReceiver receiver) {
     LOG.debug("getWorkStream: {}", request.toString());
     Instant startTime = Instant.now();
     final CountDownLatch done = new CountDownLatch(1);
@@ -209,6 +210,19 @@ class FakeWindmillServer extends WindmillServerStub {
         done.countDown();
       }
 
+      @Override
+      public void adjustBudget(long itemsDelta, long bytesDelta) {
+        // no-op.
+      }
+
+      @Override
+      public GetWorkBudget remainingBudget() {
+        return GetWorkBudget.builder()
+            .setItems(request.getMaxItems())
+            .setBytes(request.getMaxBytes())
+            .build();
+      }
+
       @Override
       public boolean awaitTermination(int time, TimeUnit unit) throws 
InterruptedException {
         while (done.getCount() > 0) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
similarity index 99%
rename from 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
rename to 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
index 9924bb7d2b2..264540531bf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
similarity index 96%
rename from 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
rename to 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
index 45ed3381a8b..e3b07bf7aa4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import static com.google.common.truth.Truth.assertThat;
-import static 
org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.verify;
@@ -33,13 +33,14 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
rename to 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 53afc6990e4..d9f4b72716c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -68,9 +68,9 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Value;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
@@ -99,10 +99,7 @@ import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Unit tests for {@link
- * 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer}.
- */
+/** Unit tests for {@link GrpcWindmillServer}. */
 @RunWith(JUnit4.class)
 @SuppressWarnings({
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
@@ -114,7 +111,7 @@ public class GrpcWindmillServerTest {
   private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
   @Rule public ErrorCollector errorCollector = new ErrorCollector();
   private Server server;
-  private 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer 
client;
+  private GrpcWindmillServer client;
   private int remainingErrors = 20;
 
   @Before
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
new file mode 100644
index 00000000000..76d50839785
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GetWorkBudgetTest {
+
+  @Test
+  public void testCreateWithNoBudget() {
+    GetWorkBudget getWorkBudget = GetWorkBudget.noBudget();
+    assertEquals(0, getWorkBudget.items());
+    assertEquals(0, getWorkBudget.bytes());
+  }
+
+  @Test
+  public void testBuild_itemsAndBytesNeverBelowZero() {
+    GetWorkBudget getWorkBudget = 
GetWorkBudget.builder().setItems(-10).setBytes(-10).build();
+    assertEquals(0, getWorkBudget.items());
+    assertEquals(0, getWorkBudget.bytes());
+  }
+
+  @Test
+  public void testAdd_doesNotAllowNegativeParameters() {
+    GetWorkBudget getWorkBudget = 
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+    assertThrows(IllegalArgumentException.class, () -> getWorkBudget.add(-1, 
-1));
+  }
+
+  @Test
+  public void testSubtract_itemsAndBytesNeverBelowZero() {
+    GetWorkBudget getWorkBudget = 
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+    GetWorkBudget subtracted = getWorkBudget.subtract(10, 10);
+    assertEquals(0, subtracted.items());
+    assertEquals(0, subtracted.bytes());
+  }
+
+  @Test
+  public void testSubtractGetWorkBudget_itemsAndBytesNeverBelowZero() {
+    GetWorkBudget getWorkBudget = 
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+    GetWorkBudget subtracted =
+        
getWorkBudget.subtract(GetWorkBudget.builder().setItems(10).setBytes(10).build());
+    assertEquals(0, subtracted.items());
+    assertEquals(0, subtracted.bytes());
+  }
+
+  @Test
+  public void testSubtract_doesNotAllowNegativeParameters() {
+    GetWorkBudget getWorkBudget = 
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+    assertThrows(IllegalArgumentException.class, () -> 
getWorkBudget.subtract(-1, -1));
+  }
+}


Reply via email to