This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63501f33968 Fix data_race condition in WindmillStreamSenderTest 
(#38589)
63501f33968 is described below

commit 63501f339684d580dc38574c105dda1a10e2740b
Author: parveensania <[email protected]>
AuthorDate: Fri May 22 07:22:33 2026 -0700

    Fix data_race condition in WindmillStreamSenderTest (#38589)
    
    * Instead of memoizing Backoff constructed instance, memoize the builder 
config instead
    
    * fix indendation
    
    * indendation fix
---
 .../worker/windmill/client/grpc/GrpcWindmillStreamFactory.java      | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
index 244d2ad3fa1..0184b88d53c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -117,13 +117,13 @@ public class GrpcWindmillStreamFactory implements 
StatusDataProvider {
     this.streamingRpcBatchLimit = streamingRpcBatchLimit;
     this.windmillMessagesBetweenIsReadyChecks = 
windmillMessagesBetweenIsReadyChecks;
     // Configure backoff to retry calls forever, with a maximum sane retry 
interval.
-    this.grpcBackOff =
+    Supplier<FluentBackoff> backoffConfig =
         Suppliers.memoize(
             () ->
                 FluentBackoff.DEFAULT
                     .withInitialBackoff(MIN_BACKOFF)
-                    .withMaxBackoff(maxBackOffSupplier.get())
-                    .backoff());
+                    .withMaxBackoff(maxBackOffSupplier.get()));
+    this.grpcBackOff = () -> backoffConfig.get().backoff();
     this.streamRegistry = ConcurrentHashMap.newKeySet();
     this.sendKeyedGetDataRequests = sendKeyedGetDataRequests;
     this.requestBatchedGetWorkResponse = requestBatchedGetWorkResponse;

Reply via email to