This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new adbb9e48c05 [Dataflow Java Streaming] Fix defaulted flow control
settings for grpc stream. (#35793)
adbb9e48c05 is described below
commit adbb9e48c05fcffdf3a47134fff28b6ab8c5679a
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Aug 7 11:12:10 2025 +0200
[Dataflow Java Streaming] Fix defaulted flow control settings for grpc
stream. (#35793)
---
.../windmill/client/grpc/stubs/ChannelCache.java | 12 +-
.../client/grpc/stubs/ChannelCacheTest.java | 133 +++++++++++++++++++++
2 files changed, 138 insertions(+), 7 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
index 11018bdb2c4..6de661e52cf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
@@ -38,7 +38,6 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalL
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListeners;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +57,8 @@ public final class ChannelCache implements StatusDataProvider
{
private final LoadingCache<WindmillServiceAddress, ManagedChannel>
channelCache;
@GuardedBy("this")
- @MonotonicNonNull
- private UserWorkerGrpcFlowControlSettings currentFlowControlSettings = null;
+ private UserWorkerGrpcFlowControlSettings currentFlowControlSettings =
+ UserWorkerGrpcFlowControlSettings.getDefaultInstance();
private ChannelCache(
WindmillChannelFactory channelFactory,
@@ -78,7 +77,8 @@ public final class ChannelCache implements StatusDataProvider
{
private UserWorkerGrpcFlowControlSettings
resolveFlowControlSettings(
WindmillServiceAddress.Kind addressType) {
synchronized (ChannelCache.this) {
- if (currentFlowControlSettings == null) {
+ if (currentFlowControlSettings.equals(
+
UserWorkerGrpcFlowControlSettings.getDefaultInstance())) {
return addressType == AUTHENTICATED_GCP_SERVICE_ADDRESS
?
WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS
:
WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS;
@@ -132,9 +132,7 @@ public final class ChannelCache implements
StatusDataProvider {
public synchronized void consumeFlowControlSettings(
UserWorkerGrpcFlowControlSettings flowControlSettings) {
- //noinspection PointlessNullCheck
- if (currentFlowControlSettings == null
- || !flowControlSettings.equals(currentFlowControlSettings)) {
+ if (!flowControlSettings.equals(currentFlowControlSettings)) {
// Refreshing the cache will asynchronously terminate the old channels
via the removalListener
// and return a newly created one on the next Cache.load(address). This
could be expensive so
// only do it when we have received new flow control settings.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
index 311bed75ccc..dd039782d1f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
@@ -194,6 +194,139 @@ public class ChannelCacheTest {
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettings);
}
+ @Test
+ public void testConsumeFlowControlSettings_UsesDefaultOverridesForDirect()
+ throws InterruptedException {
+ String channelName = "channel";
+ AtomicReference<CountDownLatch> notifyWhenChannelClosed =
+ new AtomicReference<>(new CountDownLatch(1));
+ AtomicInteger newChannelsCreated = new AtomicInteger();
+ AtomicReference<UserWorkerGrpcFlowControlSettings>
consumedFlowControlSettings =
+ new AtomicReference<>();
+ cache =
+ ChannelCache.forTesting(
+ (newFlowControlSettings, ignoredServiceAddress) -> {
+ ManagedChannel channel = newChannel(channelName);
+ newChannelsCreated.incrementAndGet();
+ consumedFlowControlSettings.set(newFlowControlSettings);
+ return channel;
+ },
+ () -> notifyWhenChannelClosed.get().countDown());
+ WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+ when(someAddress.getKind())
+
.thenReturn(WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS);
+
+ UserWorkerGrpcFlowControlSettings emptyFlowControlSettings =
+ UserWorkerGrpcFlowControlSettings.newBuilder().build();
+
+ // Load the cache w/ this first get.
+ ManagedChannel cachedChannel = cache.get(someAddress);
+ // Verify that the appropriate default was used.
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+
+ // Load empty flow control settings.
+ cache.consumeFlowControlSettings(emptyFlowControlSettings);
+ // This get shouldn't reload the cache, since the same default flow
control settings
+ // should be used.
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+ assertThat(cachedChannel).isSameInstanceAs(cache.get(someAddress));
+
+ // This get should reload the cache, since flow control settings have
changed
+ UserWorkerGrpcFlowControlSettings flowControlSettingsModified =
+
UserWorkerGrpcFlowControlSettings.newBuilder().setEnableAutoFlowControl(true).build();
+ cache.consumeFlowControlSettings(flowControlSettingsModified);
+ ManagedChannel reloadedChannel = cache.get(someAddress);
+ notifyWhenChannelClosed.get().await();
+ assertThat(cachedChannel).isNotSameInstanceAs(reloadedChannel);
+ assertTrue(cachedChannel.isShutdown());
+ assertFalse(reloadedChannel.isShutdown());
+ assertThat(newChannelsCreated.get()).isEqualTo(2);
+ assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel);
+
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettingsModified);
+
+ // Change back to empty settings and verify the default is used again.
+ notifyWhenChannelClosed.set(new CountDownLatch(1));
+ cache.consumeFlowControlSettings(emptyFlowControlSettings);
+ ManagedChannel reloadedChannel2 = cache.get(someAddress);
+ notifyWhenChannelClosed.get().await();
+ assertThat(reloadedChannel2).isNotSameInstanceAs(reloadedChannel);
+ assertThat(reloadedChannel2).isNotSameInstanceAs(cachedChannel);
+ assertTrue(reloadedChannel.isShutdown());
+ assertFalse(reloadedChannel2.isShutdown());
+ assertThat(newChannelsCreated.get()).isEqualTo(3);
+ assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel2);
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+ }
+
+ @Test
+ public void testConsumeFlowControlSettings_UsesDefaultOverridesForCloudPath()
+ throws InterruptedException {
+ String channelName = "channel";
+ AtomicReference<CountDownLatch> notifyWhenChannelClosed =
+ new AtomicReference<>(new CountDownLatch(1));
+ AtomicInteger newChannelsCreated = new AtomicInteger();
+ AtomicReference<UserWorkerGrpcFlowControlSettings>
consumedFlowControlSettings =
+ new AtomicReference<>();
+ cache =
+ ChannelCache.forTesting(
+ (newFlowControlSettings, ignoredServiceAddress) -> {
+ ManagedChannel channel = newChannel(channelName);
+ newChannelsCreated.incrementAndGet();
+ consumedFlowControlSettings.set(newFlowControlSettings);
+ return channel;
+ },
+ () -> notifyWhenChannelClosed.get().countDown());
+ WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+
when(someAddress.getKind()).thenReturn(WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS);
+
+ UserWorkerGrpcFlowControlSettings emptyFlowControlSettings =
+ UserWorkerGrpcFlowControlSettings.newBuilder().build();
+
+ // Load the cache w/ this first get.
+ ManagedChannel cachedChannel = cache.get(someAddress);
+ // Verify that the appropriate default was used.
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+
+ // Load empty flow control settings.
+ cache.consumeFlowControlSettings(emptyFlowControlSettings);
+ // This get shouldn't reload the cache, since the same default flow
control settings
+ // should be used.
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+ assertThat(cachedChannel).isSameInstanceAs(cache.get(someAddress));
+
+ // This get should reload the cache, since flow control settings have
changed
+ UserWorkerGrpcFlowControlSettings flowControlSettingsModified =
+
UserWorkerGrpcFlowControlSettings.newBuilder().setEnableAutoFlowControl(true).build();
+ cache.consumeFlowControlSettings(flowControlSettingsModified);
+ ManagedChannel reloadedChannel = cache.get(someAddress);
+ notifyWhenChannelClosed.get().await();
+ assertThat(cachedChannel).isNotSameInstanceAs(reloadedChannel);
+ assertTrue(cachedChannel.isShutdown());
+ assertFalse(reloadedChannel.isShutdown());
+ assertThat(newChannelsCreated.get()).isEqualTo(2);
+ assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel);
+
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettingsModified);
+
+ // Change back to empty settings and verify the default is used again.
+ notifyWhenChannelClosed.set(new CountDownLatch(1));
+ cache.consumeFlowControlSettings(emptyFlowControlSettings);
+ ManagedChannel reloadedChannel2 = cache.get(someAddress);
+ notifyWhenChannelClosed.get().await();
+ assertThat(reloadedChannel2).isNotSameInstanceAs(reloadedChannel);
+ assertThat(reloadedChannel2).isNotSameInstanceAs(cachedChannel);
+ assertTrue(reloadedChannel.isShutdown());
+ assertFalse(reloadedChannel2.isShutdown());
+ assertThat(newChannelsCreated.get()).isEqualTo(3);
+ assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel2);
+ assertThat(consumedFlowControlSettings.get())
+ .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+ }
+
@Test
public void testConsumeFlowControlSettings_sameFlowControlSettings() {
String channelName = "channel";