Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5090#discussion_r155496482
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
    @@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
                }
        }
     
    +   /**
    +    * Tests that unused offered slots are directly used to fulfil pending 
slot
    +    * requests.
    +    *
    +    * <p>See FLINK-8089
    +    */
    +   @Test
    +   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
    +           final SlotPool slotPool = new SlotPool(rpcService, jobId);
    +
    +           final JobMasterId jobMasterId = JobMasterId.generate();
    +           final String jobMasterAddress = "foobar";
    +           final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
    +           final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
    +
    +           resourceManagerGateway.setRequestSlotConsumer(
    +                   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
    +
    +           final SlotRequestID slotRequestId1 = new SlotRequestID();
    +           final SlotRequestID slotRequestId2 = new SlotRequestID();
    +
    +           try {
    +                   slotPool.start(jobMasterId, jobMasterAddress);
    +
    +                   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
    +
    +                   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
    +
    +                   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
    +
    +                   CompletableFuture<LogicalSlot> slotFuture1 = 
slotPoolGateway.allocateSlot(
    +                           slotRequestId1,
    +                           scheduledUnit,
    +                           ResourceProfile.UNKNOWN,
    +                           Collections.emptyList(),
    +                           timeout);
    +
    +                   // wait for the first slot request
    +                   final AllocationID allocationId = 
allocationIdFuture.get();
    +
    +                   CompletableFuture<LogicalSlot> slotFuture2 = 
slotPoolGateway.allocateSlot(
    +                           slotRequestId2,
    +                           scheduledUnit,
    +                           ResourceProfile.UNKNOWN,
    +                           Collections.emptyList(),
    +                           timeout);
    +
    +                   slotPoolGateway.cancelSlotRequest(slotRequestId1);
    +
    +                   try {
    +                           // this should fail with a CancellationException
    +                           slotFuture1.get();
    +                           fail("The first slot future should have failed 
because it was cancelled.");
    +                   } catch (ExecutionException ee) {
    +                           
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
    +                   }
    +
    +                   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
    +
    +                   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
    +
    +                   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
    +
    +                   // the slot offer should fulfil the second slot request
    --- End diff --
    
    nit: same here


---

Reply via email to