This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5034e40fa0f [Dataflow Streaming] Remove nullness suppression of
StreamingDataflowWorker (#37797)
5034e40fa0f is described below
commit 5034e40fa0f1bd82f06ffa337a8f2db3d19d00aa
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 13:05:33 2026 +0000
[Dataflow Streaming] Remove nullness suppression of StreamingDataflowWorker
(#37797)
---
.../dataflow/worker/StreamingDataflowWorker.java | 51 ++++++++++++++--------
1 file changed, 33 insertions(+), 18 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index aad27b86986..98f596141ee 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
@@ -119,6 +120,8 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheSta
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -129,9 +132,6 @@ import org.slf4j.LoggerFactory;
*
* <p>Implements a Streaming Dataflow worker.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
@Internal
public final class StreamingDataflowWorker {
@@ -189,7 +189,7 @@ public final class StreamingDataflowWorker {
private final StreamingWorkerStatusReporter workerStatusReporter;
private final int numCommitThreads;
private final Supplier<Instant> clock;
- private final GrpcDispatcherClient dispatcherClient;
+ private final @Nullable GrpcDispatcherClient dispatcherClient;
private final ExecutorService harnessSwitchExecutor;
private final long clientId;
private final WindmillServerStub windmillServer;
@@ -271,7 +271,7 @@ public final class StreamingDataflowWorker {
streamingWorkScheduler,
getDataMetricTracker,
memoryMonitor,
- this.dispatcherClient);
+ checkNotNull(this.dispatcherClient));
} else {
harnessFactoryOutput =
createSingleSourceWorkerHarness(
@@ -330,6 +330,8 @@ public final class StreamingDataflowWorker {
}
private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
+ @UnderInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to
allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
@@ -345,6 +347,7 @@ public final class StreamingDataflowWorker {
GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
HeartbeatSender heartbeatSender = new
ApplianceHeartbeatSender(windmillServer::getData);
+ @SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingApplianceWorkCommitter.create(windmillServer::commitWork,
this::onCompleteCommit);
GetWorkSender getWorkSender = GetWorkSender.forAppliance(() ->
windmillServer.getWork(request));
@@ -355,7 +358,7 @@ public final class StreamingDataflowWorker {
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
+
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
@@ -368,6 +371,8 @@ public final class StreamingDataflowWorker {
}
private StreamingWorkerHarnessFactoryOutput
createFanOutStreamingEngineWorkerHarness(
+ @UnknownInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to
allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory windmillStreamFactory,
@@ -376,7 +381,8 @@ public final class StreamingDataflowWorker {
MemoryMonitor memoryMonitor,
GrpcDispatcherClient dispatcherClient) {
WeightedSemaphore<Commit> maxCommitByteSemaphore =
Commits.maxCommitByteSemaphore();
- ChannelCache channelCache = createChannelCache(options, configFetcher);
+ ChannelCache channelCache = createChannelCache(options,
checkNotNull(configFetcher));
+ @SuppressWarnings("methodref.receiver.bound")
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
@@ -391,7 +397,7 @@ public final class StreamingDataflowWorker {
processingContext,
drainMode,
getWorkStreamLatencies) ->
- computationStateCache
+ checkNotNull(computationStateCache)
.get(processingContext.computationId())
.ifPresent(
computationState -> {
@@ -407,7 +413,7 @@ public final class StreamingDataflowWorker {
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(),
channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
- Preconditions.checkNotNull(dispatcherClient),
+ checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
// Share the commitByteSemaphore across all created
workCommitters.
@@ -433,6 +439,8 @@ public final class StreamingDataflowWorker {
}
private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
+ @UnknownInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to
allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
@@ -454,7 +462,11 @@ public final class StreamingDataflowWorker {
new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
HeartbeatSender heartbeatSender =
createStreamingEngineHeartbeatSender(
- options, windmillServer, getDataStreamPool,
configFetcher.getGlobalConfigHandle());
+ options,
+ windmillServer,
+ getDataStreamPool,
+ checkNotNull(configFetcher).getGlobalConfigHandle());
+ @SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingEngineWorkCommitter.builder()
.setCommitWorkStreamFactory(
@@ -476,7 +488,7 @@ public final class StreamingDataflowWorker {
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
+
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
@@ -489,17 +501,20 @@ public final class StreamingDataflowWorker {
}
private void switchStreamingWorkerHarness(ConnectivityType connectivityType)
{
- if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
+ if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
+ return;
+ }
+ boolean directPath = connectivityType ==
ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH;
+ if ((directPath
&& this.streamingWorkerHarness.get() instanceof
FanOutStreamingEngineWorkerHarness)
- || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
- && streamingWorkerHarness.get() instanceof
SingleSourceWorkerHarness)) {
+ || (!directPath && streamingWorkerHarness.get() instanceof
SingleSourceWorkerHarness)) {
return;
}
// Stop the current status pages before switching the harness.
this.statusPages.get().stop();
LOG.debug("Stopped StreamingWorkerStatusPages before switching
connectivity type.");
- StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
- if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+ StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput;
+ if (directPath) {
// If dataflow experiment `enable_windmill_service_direct_path` is not
set for
// the job, do not switch to FanOutStreamingEngineWorkerHarness. This is
because
// `enable_windmill_service_direct_path` is tied to SDK version and is
only
@@ -524,11 +539,11 @@ public final class StreamingDataflowWorker {
this.streamingWorkScheduler,
this.getDataMetricTracker,
this.memoryMonitor.memoryMonitor(),
- this.dispatcherClient);
+ checkNotNull(this.dispatcherClient));
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
streamingWorkerHarness.get().start();
LOG.debug("Started FanOutStreamingEngineWorkerHarness");
- } else if (connectivityType ==
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+ } else {
LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
streamingWorkerHarness.get().shutdown();