This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new adbb9e48c05 [Dataflow Java Streaming] Fix defaulted flow control 
settings for grpc stream. (#35793)
adbb9e48c05 is described below

commit adbb9e48c05fcffdf3a47134fff28b6ab8c5679a
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Aug 7 11:12:10 2025 +0200

    [Dataflow Java Streaming] Fix defaulted flow control settings for grpc 
stream. (#35793)
---
 .../windmill/client/grpc/stubs/ChannelCache.java   |  12 +-
 .../client/grpc/stubs/ChannelCacheTest.java        | 133 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
index 11018bdb2c4..6de661e52cf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
@@ -38,7 +38,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalL
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListeners;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,8 +57,8 @@ public final class ChannelCache implements StatusDataProvider 
{
   private final LoadingCache<WindmillServiceAddress, ManagedChannel> 
channelCache;
 
   @GuardedBy("this")
-  @MonotonicNonNull
-  private UserWorkerGrpcFlowControlSettings currentFlowControlSettings = null;
+  private UserWorkerGrpcFlowControlSettings currentFlowControlSettings =
+      UserWorkerGrpcFlowControlSettings.getDefaultInstance();
 
   private ChannelCache(
       WindmillChannelFactory channelFactory,
@@ -78,7 +77,8 @@ public final class ChannelCache implements StatusDataProvider 
{
                   private UserWorkerGrpcFlowControlSettings 
resolveFlowControlSettings(
                       WindmillServiceAddress.Kind addressType) {
                     synchronized (ChannelCache.this) {
-                      if (currentFlowControlSettings == null) {
+                      if (currentFlowControlSettings.equals(
+                          
UserWorkerGrpcFlowControlSettings.getDefaultInstance())) {
                         return addressType == AUTHENTICATED_GCP_SERVICE_ADDRESS
                             ? 
WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS
                             : 
WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS;
@@ -132,9 +132,7 @@ public final class ChannelCache implements 
StatusDataProvider {
 
   public synchronized void consumeFlowControlSettings(
       UserWorkerGrpcFlowControlSettings flowControlSettings) {
-    //noinspection PointlessNullCheck
-    if (currentFlowControlSettings == null
-        || !flowControlSettings.equals(currentFlowControlSettings)) {
+    if (!flowControlSettings.equals(currentFlowControlSettings)) {
       // Refreshing the cache will asynchronously terminate the old channels 
via the removalListener
       // and return a newly created one on the next Cache.load(address). This 
could be expensive so
       // only do it when we have received new flow control settings.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
index 311bed75ccc..dd039782d1f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java
@@ -194,6 +194,139 @@ public class ChannelCacheTest {
     
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettings);
   }
 
+  @Test
+  public void testConsumeFlowControlSettings_UsesDefaultOverridesForDirect()
+      throws InterruptedException {
+    String channelName = "channel";
+    AtomicReference<CountDownLatch> notifyWhenChannelClosed =
+        new AtomicReference<>(new CountDownLatch(1));
+    AtomicInteger newChannelsCreated = new AtomicInteger();
+    AtomicReference<UserWorkerGrpcFlowControlSettings> 
consumedFlowControlSettings =
+        new AtomicReference<>();
+    cache =
+        ChannelCache.forTesting(
+            (newFlowControlSettings, ignoredServiceAddress) -> {
+              ManagedChannel channel = newChannel(channelName);
+              newChannelsCreated.incrementAndGet();
+              consumedFlowControlSettings.set(newFlowControlSettings);
+              return channel;
+            },
+            () -> notifyWhenChannelClosed.get().countDown());
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    when(someAddress.getKind())
+        
.thenReturn(WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS);
+
+    UserWorkerGrpcFlowControlSettings emptyFlowControlSettings =
+        UserWorkerGrpcFlowControlSettings.newBuilder().build();
+
+    // Load the cache w/ this first get.
+    ManagedChannel cachedChannel = cache.get(someAddress);
+    // Verify that the appropriate default was used.
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+
+    // Load empty flow control settings.
+    cache.consumeFlowControlSettings(emptyFlowControlSettings);
+    // This get shouldn't reload the cache, since the same default flow 
control settings
+    // should be used.
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+    assertThat(cachedChannel).isSameInstanceAs(cache.get(someAddress));
+
+    // This get should reload the cache, since flow control settings have 
changed
+    UserWorkerGrpcFlowControlSettings flowControlSettingsModified =
+        
UserWorkerGrpcFlowControlSettings.newBuilder().setEnableAutoFlowControl(true).build();
+    cache.consumeFlowControlSettings(flowControlSettingsModified);
+    ManagedChannel reloadedChannel = cache.get(someAddress);
+    notifyWhenChannelClosed.get().await();
+    assertThat(cachedChannel).isNotSameInstanceAs(reloadedChannel);
+    assertTrue(cachedChannel.isShutdown());
+    assertFalse(reloadedChannel.isShutdown());
+    assertThat(newChannelsCreated.get()).isEqualTo(2);
+    assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel);
+    
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettingsModified);
+
+    // Change back to empty settings and verify the default is used again.
+    notifyWhenChannelClosed.set(new CountDownLatch(1));
+    cache.consumeFlowControlSettings(emptyFlowControlSettings);
+    ManagedChannel reloadedChannel2 = cache.get(someAddress);
+    notifyWhenChannelClosed.get().await();
+    assertThat(reloadedChannel2).isNotSameInstanceAs(reloadedChannel);
+    assertThat(reloadedChannel2).isNotSameInstanceAs(cachedChannel);
+    assertTrue(reloadedChannel.isShutdown());
+    assertFalse(reloadedChannel2.isShutdown());
+    assertThat(newChannelsCreated.get()).isEqualTo(3);
+    assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel2);
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
+  }
+
+  @Test
+  public void testConsumeFlowControlSettings_UsesDefaultOverridesForCloudPath()
+      throws InterruptedException {
+    String channelName = "channel";
+    AtomicReference<CountDownLatch> notifyWhenChannelClosed =
+        new AtomicReference<>(new CountDownLatch(1));
+    AtomicInteger newChannelsCreated = new AtomicInteger();
+    AtomicReference<UserWorkerGrpcFlowControlSettings> 
consumedFlowControlSettings =
+        new AtomicReference<>();
+    cache =
+        ChannelCache.forTesting(
+            (newFlowControlSettings, ignoredServiceAddress) -> {
+              ManagedChannel channel = newChannel(channelName);
+              newChannelsCreated.incrementAndGet();
+              consumedFlowControlSettings.set(newFlowControlSettings);
+              return channel;
+            },
+            () -> notifyWhenChannelClosed.get().countDown());
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    
when(someAddress.getKind()).thenReturn(WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS);
+
+    UserWorkerGrpcFlowControlSettings emptyFlowControlSettings =
+        UserWorkerGrpcFlowControlSettings.newBuilder().build();
+
+    // Load the cache w/ this first get.
+    ManagedChannel cachedChannel = cache.get(someAddress);
+    // Verify that the appropriate default was used.
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+
+    // Load empty flow control settings.
+    cache.consumeFlowControlSettings(emptyFlowControlSettings);
+    // This get shouldn't reload the cache, since the same default flow 
control settings
+    // should be used.
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+    assertThat(cachedChannel).isSameInstanceAs(cache.get(someAddress));
+
+    // This get should reload the cache, since flow control settings have 
changed
+    UserWorkerGrpcFlowControlSettings flowControlSettingsModified =
+        
UserWorkerGrpcFlowControlSettings.newBuilder().setEnableAutoFlowControl(true).build();
+    cache.consumeFlowControlSettings(flowControlSettingsModified);
+    ManagedChannel reloadedChannel = cache.get(someAddress);
+    notifyWhenChannelClosed.get().await();
+    assertThat(cachedChannel).isNotSameInstanceAs(reloadedChannel);
+    assertTrue(cachedChannel.isShutdown());
+    assertFalse(reloadedChannel.isShutdown());
+    assertThat(newChannelsCreated.get()).isEqualTo(2);
+    assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel);
+    
assertThat(consumedFlowControlSettings.get()).isEqualTo(flowControlSettingsModified);
+
+    // Change back to empty settings and verify the default is used again.
+    notifyWhenChannelClosed.set(new CountDownLatch(1));
+    cache.consumeFlowControlSettings(emptyFlowControlSettings);
+    ManagedChannel reloadedChannel2 = cache.get(someAddress);
+    notifyWhenChannelClosed.get().await();
+    assertThat(reloadedChannel2).isNotSameInstanceAs(reloadedChannel);
+    assertThat(reloadedChannel2).isNotSameInstanceAs(cachedChannel);
+    assertTrue(reloadedChannel.isShutdown());
+    assertFalse(reloadedChannel2.isShutdown());
+    assertThat(newChannelsCreated.get()).isEqualTo(3);
+    assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel2);
+    assertThat(consumedFlowControlSettings.get())
+        .isEqualTo(WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
+  }
+
   @Test
   public void testConsumeFlowControlSettings_sameFlowControlSettings() {
     String channelName = "channel";

Reply via email to