Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155496482 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** + * Tests that unused offered slots are directly used to fulfil pending slot + * requests. + * + * <p>See FLINK-8089 + */ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- nit: same here
---