zhuzhurk commented on a change in pull request #13181: URL: https://github.com/apache/flink/pull/13181#discussion_r474639086
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -274,6 +277,73 @@ public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, Int assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); } + @Test + public void testSchedulePendingRequestBulkTimeoutCheck() { + TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); + AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); + + context.allocateSlotsFor(EV1, EV3); + PhysicalSlotRequestBulk bulk = bulkChecker.getBulk(); + + assertThat(bulk.getPendingRequests(), hasSize(2)); + assertThat(bulk.getPendingRequests(), containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE)); + assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(0)); + assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT)); + } + + @Test + public void testRequestFulfilledInBulk() { + TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); + AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); + + context.allocateSlotsFor(EV1, EV3); + AllocationID allocationId = new AllocationID(); + ResourceProfile pendingSlotResourceProfile = fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId); + PhysicalSlotRequestBulk bulk = bulkChecker.getBulk(); + + assertThat(bulk.getPendingRequests(), hasSize(1)); + assertThat(bulk.getPendingRequests(), containsInAnyOrder(pendingSlotResourceProfile)); + assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(1)); + assertThat(bulk.getAllocationIdsOfFulfilledRequests(), containsInAnyOrder(allocationId)); + } + + @Test + public void testRequestBulkCancel() { + TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); + AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker); + + // allocate 2 physical slots for 2 groups + List<SlotExecutionVertexAssignment> assignments1 = context.allocateSlotsFor(EV1, EV3); + fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new AllocationID()); + PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk(); + List<SlotExecutionVertexAssignment> assignments2 = context.allocateSlotsFor(EV2); + // cancelling of (EV1, EV3) releases assignments1 and only one physical slot for EV3 + // the second physical slot is held by sharing EV2 from the next bulk + bulk1.cancel(new Throwable()); + // EV3 needs again a physical slot, therefore there are 3 requests overall + context.allocateSlotsFor(EV1, EV3); + boolean ev1failed = assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally(); + boolean ev2failed = assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally(); + + assertThat(context.getSlotProvider().getRequests().values(), hasSize(3)); + // either EV1 or EV3 logical slot future is fulfilled before cancellation + assertThat(ev1failed != ev2failed, is(false)); Review comment: do you actually want to verify `assertThat(ev1failed != ev2failed, is(true));` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org