m-trieu commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1808039560
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java:
##########
@@ -40,147 +38,39 @@ public class EvenGetWorkBudgetDistributorTest {
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
- private static GetWorkBudgetDistributor
createBudgetDistributor(GetWorkBudget activeWorkBudget) {
- return GetWorkBudgetDistributors.distributeEvenly(() -> activeWorkBudget);
- }
+ private static GetWorkBudgetSpender createGetWorkBudgetOwner() {
+ // Lambdas are final and cannot be spied.
+ return spy(
+ new GetWorkBudgetSpender() {
- private static GetWorkBudgetDistributor createBudgetDistributor(long
activeWorkItemsAndBytes) {
- return createBudgetDistributor(
- GetWorkBudget.builder()
- .setItems(activeWorkItemsAndBytes)
- .setBytes(activeWorkItemsAndBytes)
- .build());
+ @Override
+ public void setBudget(long items, long bytes) {}
+ });
}
@Test
public void testDistributeBudget_doesNothingWhenPassedInStreamsEmpty() {
- createBudgetDistributor(1L)
+ GetWorkBudgetDistributors.distributeEvenly()
.distributeBudget(
ImmutableList.of(),
GetWorkBudget.builder().setItems(10L).setBytes(10L).build());
}
@Test
public void testDistributeBudget_doesNothingWithNoBudget() {
- GetWorkBudgetSpender getWorkBudgetSpender =
-
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()));
- createBudgetDistributor(1L)
+ GetWorkBudgetSpender getWorkBudgetSpender =
spy(createGetWorkBudgetOwner());
+ GetWorkBudgetDistributors.distributeEvenly()
.distributeBudget(ImmutableList.of(getWorkBudgetSpender),
GetWorkBudget.noBudget());
verifyNoInteractions(getWorkBudgetSpender);
}
- @Test
- public void
testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighNoActiveWork()
{
- GetWorkBudgetSpender getWorkBudgetSpender =
- spy(
- createGetWorkBudgetOwnerWithRemainingBudgetOf(
- GetWorkBudget.builder().setItems(10L).setBytes(10L).build()));
- createBudgetDistributor(0L)
- .distributeBudget(
- ImmutableList.of(getWorkBudgetSpender),
- GetWorkBudget.builder().setItems(10L).setBytes(10L).build());
-
- verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
- }
-
- @Test
- public void
-
testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighWithActiveWork()
{
- GetWorkBudgetSpender getWorkBudgetSpender =
- spy(
- createGetWorkBudgetOwnerWithRemainingBudgetOf(
- GetWorkBudget.builder().setItems(5L).setBytes(5L).build()));
- createBudgetDistributor(10L)
- .distributeBudget(
- ImmutableList.of(getWorkBudgetSpender),
- GetWorkBudget.builder().setItems(20L).setBytes(20L).build());
-
- verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
- }
-
- @Test
- public void
-
testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithNoActiveWork()
{
- GetWorkBudget streamRemainingBudget =
- GetWorkBudget.builder().setItems(1L).setBytes(10L).build();
- GetWorkBudget totalGetWorkBudget =
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
- GetWorkBudgetSpender getWorkBudgetSpender =
-
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
- createBudgetDistributor(0L)
- .distributeBudget(ImmutableList.of(getWorkBudgetSpender),
totalGetWorkBudget);
-
- verify(getWorkBudgetSpender, times(1))
- .adjustBudget(
- eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
- eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
- }
-
- @Test
- public void
-
testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithActiveWork()
{
- GetWorkBudget streamRemainingBudget =
- GetWorkBudget.builder().setItems(1L).setBytes(10L).build();
- GetWorkBudget totalGetWorkBudget =
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
- long activeWorkItemsAndBytes = 2L;
- GetWorkBudgetSpender getWorkBudgetSpender =
-
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
- createBudgetDistributor(activeWorkItemsAndBytes)
- .distributeBudget(ImmutableList.of(getWorkBudgetSpender),
totalGetWorkBudget);
-
- verify(getWorkBudgetSpender, times(1))
- .adjustBudget(
- eq(
- totalGetWorkBudget.items()
- - streamRemainingBudget.items()
- - activeWorkItemsAndBytes),
- eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
- }
-
- @Test
- public void
testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowNoActiveWork()
{
- GetWorkBudget streamRemainingBudget =
- GetWorkBudget.builder().setItems(10L).setBytes(1L).build();
- GetWorkBudget totalGetWorkBudget =
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
- GetWorkBudgetSpender getWorkBudgetSpender =
-
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
- createBudgetDistributor(0L)
- .distributeBudget(ImmutableList.of(getWorkBudgetSpender),
totalGetWorkBudget);
-
- verify(getWorkBudgetSpender, times(1))
- .adjustBudget(
- eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
- eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
- }
-
- @Test
- public void
-
testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowWithActiveWork()
{
- GetWorkBudget streamRemainingBudget =
- GetWorkBudget.builder().setItems(10L).setBytes(1L).build();
- GetWorkBudget totalGetWorkBudget =
GetWorkBudget.builder().setItems(10L).setBytes(10L).build();
- long activeWorkItemsAndBytes = 2L;
-
- GetWorkBudgetSpender getWorkBudgetSpender =
-
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget));
- createBudgetDistributor(activeWorkItemsAndBytes)
- .distributeBudget(ImmutableList.of(getWorkBudgetSpender),
totalGetWorkBudget);
-
- verify(getWorkBudgetSpender, times(1))
- .adjustBudget(
- eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
- eq(
- totalGetWorkBudget.bytes()
- - streamRemainingBudget.bytes()
- - activeWorkItemsAndBytes));
- }
-
@Test
public void testDistributeBudget_distributesBudgetEvenlyIfPossible() {
long totalItemsAndBytes = 10L;
Review Comment:
done
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java:
##########
@@ -192,17 +82,17 @@ public void
testDistributeBudget_distributesBudgetEvenlyIfPossible() {
streams.forEach(
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]