m-trieu commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1803919692


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -267,25 +277,100 @@ protected void startThrottleTimer() {
   }
 
   @Override
-  public void adjustBudget(long itemsDelta, long bytesDelta) {
-    GetWorkBudget adjustment =
-        nextBudgetAdjustment
-            // Get the current value, and reset the nextBudgetAdjustment. This 
will be set again
-            // when adjustBudget is called.
-            .getAndUpdate(unused -> GetWorkBudget.noBudget())
-            .apply(itemsDelta, bytesDelta);
-    sendRequestExtension(adjustment);
+  public void setBudget(long newItems, long newBytes) {
+    GetWorkBudget currentMaxGetWorkBudget =
+        maxGetWorkBudget.updateAndGet(
+            ignored -> 
GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
+    GetWorkBudget extension = 
budgetTracker.computeBudgetExtension(currentMaxGetWorkBudget);
+    maybeSendRequestExtension(extension);
   }
 
-  @Override
-  public GetWorkBudget remainingBudget() {
-    // Snapshot the current budgets.
-    GetWorkBudget currentPendingResponseBudget = pendingResponseBudget.get();
-    GetWorkBudget currentNextBudgetAdjustment = nextBudgetAdjustment.get();
-    GetWorkBudget currentInflightBudget = inFlightBudget.get();
-
-    return currentPendingResponseBudget
-        .apply(currentNextBudgetAdjustment)
-        .apply(currentInflightBudget);
+  /**
+   * Tracks sent and received GetWorkBudget and uses this information to 
generate request
+   * extensions.
+   */
+  @AutoValue
+  abstract static class GetWorkBudgetTracker {
+
+    private static GetWorkBudgetTracker create() {
+      return new AutoValue_GrpcDirectGetWorkStream_GetWorkBudgetTracker(
+          new AtomicLong(), new AtomicLong(), new AtomicLong(), new 
AtomicLong());
+    }
+
+    abstract AtomicLong itemsRequested();

Review Comment:
   Done also added tests for this class 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -67,7 +64,8 @@ public static ManagedChannel remoteChannel(
             windmillServiceRpcChannelTimeoutSec);
       default:
         throw new UnsupportedOperationException(
-            "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS 
are supported WindmillServiceAddresses.");
+            "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS 
are supported"

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to