m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757462919


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -165,46 +174,52 @@ private static Watermarks createWatermarks(
         .build();
   }
 
-  private void sendRequestExtension(GetWorkBudget adjustment) {
-    inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment));
-    StreamingGetWorkRequest extension =
-        StreamingGetWorkRequest.newBuilder()
-            .setRequestExtension(
-                Windmill.StreamingGetWorkRequestExtension.newBuilder()
-                    .setMaxItems(adjustment.items())
-                    .setMaxBytes(adjustment.bytes()))
-            .build();
+  private void sendRequestExtension() {
+    GetWorkBudget currentInFlightBudget = inFlightBudget.get();
+    GetWorkBudget currentMaxBudget = maxGetWorkBudget.get();
 
-    executor()
-        .execute(
+    // If the outstanding items or bytes limit has gotten too low, top both 
off with a
+    // GetWorkExtension. The goal is to keep the limits relatively close to 
their maximum
+    // values without sending too many extension requests.
+    if (currentInFlightBudget.items() < currentMaxBudget.items() / 2
+        || currentInFlightBudget.bytes() < currentMaxBudget.bytes() / 2) {
+      GetWorkBudget extension = 
currentMaxBudget.subtract(currentInFlightBudget);
+      if (extension.items() > 0 || extension.bytes() > 0) {

Review Comment:
   extension cannot be negative since GetWorkBudget does not allow negative 
values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to