m-trieu commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1808039560


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java:
##########
@@ -40,147 +38,39 @@ public class EvenGetWorkBudgetDistributorTest {
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
 
-  private static GetWorkBudgetDistributor 
createBudgetDistributor(GetWorkBudget activeWorkBudget) {
-    return GetWorkBudgetDistributors.distributeEvenly(() -> activeWorkBudget);
-  }
+  private static GetWorkBudgetSpender createGetWorkBudgetOwner() {
+    // Lambdas are final and cannot be spied.
+    return spy(
+        new GetWorkBudgetSpender() {
 
-  private static GetWorkBudgetDistributor createBudgetDistributor(long 
activeWorkItemsAndBytes) {
-    return createBudgetDistributor(
-        GetWorkBudget.builder()
-            .setItems(activeWorkItemsAndBytes)
-            .setBytes(activeWorkItemsAndBytes)
-            .build());
+          @Override
+          public void setBudget(long items, long bytes) {}
+        });
   }
 
   @Test
   public void testDistributeBudget_doesNothingWhenPassedInStreamsEmpty() {
-    createBudgetDistributor(1L)
+    GetWorkBudgetDistributors.distributeEvenly()
         .distributeBudget(
             ImmutableList.of(), 
GetWorkBudget.builder().setItems(10L).setBytes(10L).build());
   }
 
   @Test
   public void testDistributeBudget_doesNothingWithNoBudget() {
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()));
-    createBudgetDistributor(1L)
+    GetWorkBudgetSpender getWorkBudgetSpender = 
spy(createGetWorkBudgetOwner());
+    GetWorkBudgetDistributors.distributeEvenly()
         .distributeBudget(ImmutableList.of(getWorkBudgetSpender), 
GetWorkBudget.noBudget());
     verifyNoInteractions(getWorkBudgetSpender);
   }
 
-  @Test
-  public void 
testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighNoActiveWork()
 {
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        spy(
-            createGetWorkBudgetOwnerWithRemainingBudgetOf(
-                GetWorkBudget.builder().setItems(10L).setBytes(10L).build()));
-    createBudgetDistributor(0L)
-        .distributeBudget(
-            ImmutableList.of(getWorkBudgetSpender),
-            GetWorkBudget.builder().setItems(10L).setBytes(10L).build());
-
-    verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
-  }
-
-  @Test
-  public void
-      
testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighWithActiveWork()
 {
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        spy(
-            createGetWorkBudgetOwnerWithRemainingBudgetOf(
-                GetWorkBudget.builder().setItems(5L).setBytes(5L).build()));
-    createBudgetDistributor(10L)
-        .distributeBudget(
-            ImmutableList.of(getWorkBudgetSpender),
-            GetWorkBudget.builder().setItems(20L).setBytes(20L).build());
-
-    verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
-  }
-
-  @Test
-  public void
-      
testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithNoActiveWork()
 {
-    GetWorkBudget streamRemainingBudget =
-        GetWorkBudget.builder().setItems(1L).setBytes(10L).build();
-    GetWorkBudget totalGetWorkBudget = 
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
-    createBudgetDistributor(0L)
-        .distributeBudget(ImmutableList.of(getWorkBudgetSpender), 
totalGetWorkBudget);
-
-    verify(getWorkBudgetSpender, times(1))
-        .adjustBudget(
-            eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
-            eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
-  }
-
-  @Test
-  public void
-      
testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithActiveWork()
 {
-    GetWorkBudget streamRemainingBudget =
-        GetWorkBudget.builder().setItems(1L).setBytes(10L).build();
-    GetWorkBudget totalGetWorkBudget = 
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
-    long activeWorkItemsAndBytes = 2L;
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
-    createBudgetDistributor(activeWorkItemsAndBytes)
-        .distributeBudget(ImmutableList.of(getWorkBudgetSpender), 
totalGetWorkBudget);
-
-    verify(getWorkBudgetSpender, times(1))
-        .adjustBudget(
-            eq(
-                totalGetWorkBudget.items()
-                    - streamRemainingBudget.items()
-                    - activeWorkItemsAndBytes),
-            eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
-  }
-
-  @Test
-  public void 
testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowNoActiveWork()
 {
-    GetWorkBudget streamRemainingBudget =
-        GetWorkBudget.builder().setItems(10L).setBytes(1L).build();
-    GetWorkBudget totalGetWorkBudget = 
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
-    createBudgetDistributor(0L)
-        .distributeBudget(ImmutableList.of(getWorkBudgetSpender), 
totalGetWorkBudget);
-
-    verify(getWorkBudgetSpender, times(1))
-        .adjustBudget(
-            eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
-            eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
-  }
-
-  @Test
-  public void
-      
testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowWithActiveWork()
 {
-    GetWorkBudget streamRemainingBudget =
-        GetWorkBudget.builder().setItems(10L).setBytes(1L).build();
-    GetWorkBudget totalGetWorkBudget = 
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
-    long activeWorkItemsAndBytes = 2L;
-
-    GetWorkBudgetSpender getWorkBudgetSpender =
-        
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
-    createBudgetDistributor(activeWorkItemsAndBytes)
-        .distributeBudget(ImmutableList.of(getWorkBudgetSpender), 
totalGetWorkBudget);
-
-    verify(getWorkBudgetSpender, times(1))
-        .adjustBudget(
-            eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
-            eq(
-                totalGetWorkBudget.bytes()
-                    - streamRemainingBudget.bytes()
-                    - activeWorkItemsAndBytes));
-  }
-
   @Test
   public void testDistributeBudget_distributesBudgetEvenlyIfPossible() {
     long totalItemsAndBytes = 10L;

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java:
##########
@@ -192,17 +82,17 @@ public void 
testDistributeBudget_distributesBudgetEvenlyIfPossible() {
     streams.forEach(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to