zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r474639086



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -274,6 +277,73 @@ public void testPhysicalSlotReleaseLogicalSlots() throws 
ExecutionException, Int
                assertThat(payloads.stream().allMatch(payload -> 
payload.getTerminalStateFuture().isDone()), is(true));
        }
 
+       @Test
+       public void testSchedulePendingRequestBulkTimeoutCheck() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               context.allocateSlotsFor(EV1, EV3);
+               PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+               assertThat(bulk.getPendingRequests(), hasSize(2));
+               assertThat(bulk.getPendingRequests(), 
containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
hasSize(0));
+               assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT));
+       }
+
+       @Test
+       public void testRequestFulfilledInBulk() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               context.allocateSlotsFor(EV1, EV3);
+               AllocationID allocationId = new AllocationID();
+               ResourceProfile pendingSlotResourceProfile = 
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
+               PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+               assertThat(bulk.getPendingRequests(), hasSize(1));
+               assertThat(bulk.getPendingRequests(), 
containsInAnyOrder(pendingSlotResourceProfile));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
hasSize(1));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
containsInAnyOrder(allocationId));
+       }
+
+       @Test
+       public void testRequestBulkCancel() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               // allocate 2 physical slots for 2 groups
+               List<SlotExecutionVertexAssignment> assignments1 = 
context.allocateSlotsFor(EV1, EV3);
+               fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new 
AllocationID());
+               PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
+               List<SlotExecutionVertexAssignment> assignments2 = 
context.allocateSlotsFor(EV2);
+               // cancelling of (EV1, EV3) releases assignments1 and only one 
physical slot for EV3
+               // the second physical slot is held by sharing EV2 from the 
next bulk
+               bulk1.cancel(new Throwable());
+               // EV3 needs again a physical slot, therefore there are 3 
requests overall
+               context.allocateSlotsFor(EV1, EV3);
+               boolean ev1failed = 
assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally();
+               boolean ev2failed = 
assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally();
+
+               assertThat(context.getSlotProvider().getRequests().values(), 
hasSize(3));
+               // either EV1 or EV3 logical slot future is fulfilled before 
cancellation
+               assertThat(ev1failed != ev2failed, is(false));

Review comment:
       do you actually want to verify `assertThat(ev1failed != ev2failed, 
is(true));` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to