m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1730022188
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget
adjustment) {
.setMaxBytes(adjustment.bytes()))
.build();
- executor()
- .execute(
- () -> {
- try {
- send(extension);
- } catch (IllegalStateException e) {
- // Stream was closed.
- }
- });
+ executeSafely(
+ () -> {
+ try {
+ send(extension);
+ } catch (IllegalStateException e) {
+ // Stream was closed.
+ }
+ });
}
@Override
protected synchronized void onNewStream() {
workItemAssemblers.clear();
- // Add the current in-flight budget to the next adjustment. Only positive
values are allowed
- // here
- // with negatives defaulting to 0, since GetWorkBudgets cannot be created
with negative values.
- GetWorkBudget budgetAdjustment =
nextBudgetAdjustment.get().apply(inFlightBudget.get());
- inFlightBudget.set(budgetAdjustment);
- send(
- StreamingGetWorkRequest.newBuilder()
- .setRequest(
- request
- .toBuilder()
- .setMaxBytes(budgetAdjustment.bytes())
- .setMaxItems(budgetAdjustment.items()))
- .build());
+ if (!isShutdown()) {
+ // Add the current in-flight budget to the next adjustment. Only
positive values are allowed
+ // here with negatives defaulting to 0, since GetWorkBudgets cannot be
created with negative
+ // values. We just sent the budget, reset it.
+ GetWorkBudget currentBudgetAdjustment =
+ nextBudgetAdjustment.getAndUpdate(ignored ->
GetWorkBudget.noBudget());
+ GetWorkBudget budgetAdjustment =
currentBudgetAdjustment.apply(inFlightBudget.get());
- // We just sent the budget, reset it.
- nextBudgetAdjustment.set(GetWorkBudget.noBudget());
+ inFlightBudget.updateAndGet(budget ->
budget.apply(currentBudgetAdjustment));
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]