m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1730022188


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget 
adjustment) {
                     .setMaxBytes(adjustment.bytes()))
             .build();
 
-    executor()
-        .execute(
-            () -> {
-              try {
-                send(extension);
-              } catch (IllegalStateException e) {
-                // Stream was closed.
-              }
-            });
+    executeSafely(
+        () -> {
+          try {
+            send(extension);
+          } catch (IllegalStateException e) {
+            // Stream was closed.
+          }
+        });
   }
 
   @Override
   protected synchronized void onNewStream() {
     workItemAssemblers.clear();
-    // Add the current in-flight budget to the next adjustment. Only positive 
values are allowed
-    // here
-    // with negatives defaulting to 0, since GetWorkBudgets cannot be created 
with negative values.
-    GetWorkBudget budgetAdjustment = 
nextBudgetAdjustment.get().apply(inFlightBudget.get());
-    inFlightBudget.set(budgetAdjustment);
-    send(
-        StreamingGetWorkRequest.newBuilder()
-            .setRequest(
-                request
-                    .toBuilder()
-                    .setMaxBytes(budgetAdjustment.bytes())
-                    .setMaxItems(budgetAdjustment.items()))
-            .build());
+    if (!isShutdown()) {
+      // Add the current in-flight budget to the next adjustment. Only 
positive values are allowed
+      // here with negatives defaulting to 0, since GetWorkBudgets cannot be 
created with negative
+      // values. We just sent the budget, reset it.
+      GetWorkBudget currentBudgetAdjustment =
+          nextBudgetAdjustment.getAndUpdate(ignored -> 
GetWorkBudget.noBudget());
+      GetWorkBudget budgetAdjustment = 
currentBudgetAdjustment.apply(inFlightBudget.get());
 
-    // We just sent the budget, reset it.
-    nextBudgetAdjustment.set(GetWorkBudget.noBudget());
+      inFlightBudget.updateAndGet(budget -> 
budget.apply(currentBudgetAdjustment));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to