[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r441577695 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java ## @@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception { } } + @Test + public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception { + final List allocations = new ArrayList<>(); + resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId())); + + try (SlotPoolImpl slotPool = setUpSlotPool()) { Review comment: sure, I agree that we can do the overall test cleanup as a separate issue if it is too much for this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r441364374 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -37,13 +41,13 @@ private final LinkedHashMap> aMap; - private final LinkedHashMap bMap; + private final HashMap bMap; Review comment: Alright This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r441359953 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java ## @@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception { } } + @Test + public void testOrphanedAllocationCanBeRemapped() throws Exception { + try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + final List allocationIds = new ArrayList<>(); + resourceManagerGateway.setRequestSlotConsumer( + slotRequest -> allocationIds.add(slotRequest.getAllocationId())); + + final List canceledAllocations = new ArrayList<>(); + resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add); + + setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); + final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); Review comment: why do we use `Scheduler` to unit test `SlotPoolImpl`? why not to call `SlotPoolImpl` directly, like in `testFailingAllocationFailsRemappedPendingSlotRequests`? ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java ## @@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception { } } + @Test + public void testOrphanedAllocationCanBeRemapped() throws Exception { + try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + final List allocationIds = new ArrayList<>(); + resourceManagerGateway.setRequestSlotConsumer( + slotRequest -> allocationIds.add(slotRequest.getAllocationId())); + + final List canceledAllocations = new ArrayList<>(); + resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add); + + setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); + final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + + final SlotRequestId slotRequestId1 = new SlotRequestId(); + scheduler.allocateSlot( + slotRequestId1, + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + timeout); Review comment: ```suggestion allocateSlot(scheduler, slotRequestId1); ``` ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java ## @@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception { } } + @Test + public void testOrphanedAllocationCanBeRemapped() throws Exception { + try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + final List allocationIds = new ArrayList<>(); + resourceManagerGateway.setRequestSlotConsumer( + slotRequest -> allocationIds.add(slotRequest.getAllocationId())); + + final List canceledAllocations = new ArrayList<>(); + resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add); Review comment: nit: maybe, not now but if it can be reused also in other tests, it would be nice to have something like an RM harness: ``` class RmHarness { final List allocationIds = new ArrayList<>(); final List canceledAllocations = new ArrayList<>(); RmHarness(resourceManagerGateway) getAllocations getCanceled } ``` ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java ## @@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception { } } + @Test + public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception { + final List allocations = new ArrayList<>(); + resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId())); + + try (SlotPoolImpl slotPool = setUpSlotPool()) { Review comment: Could we reuse/deduplicate `setUpSlotPool` also in `SlotPoolImplTest`? ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java ## @@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception { }
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r441346884 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -37,13 +41,13 @@ private final LinkedHashMap> aMap; - private final LinkedHashMap bMap; + private final HashMap bMap; Review comment: Hmm, for me, the order would be not obvious anyways w/o either looking into the implementation or the jdoc comment. I am not strictly opposed to the change but I would avoid it if it is not strictly needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r441343582 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) { return null; } + private void maybeRemapOrphanedAllocation( + final AllocationID allocationIdOfRequest, + final AllocationID allocationIdOfSlot) { + + final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot) + ? null : allocationIdOfRequest; + + // if the request that initiated the allocation is still pending, it should take over the orphaned allocation + // of the fulfilled request so that it can fail fast if the remapped allocation fails + if (orphanedAllocationId != null) { + final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyA(allocationIdOfSlot); + if (requestIdOfAllocatedSlot != null) { + final PendingRequest requestOfAllocatedSlot = pendingRequests.getByKeyA(requestIdOfAllocatedSlot); + requestOfAllocatedSlot.setAllocationId(orphanedAllocationId); + + // this re-insertion of initiatedRequestId will not affect its original insertion order + pendingRequests.put(requestIdOfAllocatedSlot, orphanedAllocationId, requestOfAllocatedSlot); + } else { + // cancel the slot request if the orphaned allocation is not remapped to a pending request. + // the request id can be null if the slot is returned by scheduler + resourceManagerGateway.cancelSlotRequest(orphanedAllocationId); Review comment: Alright, so this is something like release the slot at the RM at the same time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r440693739 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -22,12 +22,16 @@ import java.util.AbstractCollection; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Set; /** - * Map which stores values under two different indices. + * Map which stores values under two different indices. The mapping of the primary key to the + * value is backed by {@link LinkedHashMap} so that the iteration order over the values and + * the primary key set is the insertion order. Note that there is no contract of the iteration + * order over the secondary key set. Review comment: I would also explicitly document that `@param ` is the primary key. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -37,13 +41,13 @@ private final LinkedHashMap> aMap; - private final LinkedHashMap bMap; + private final HashMap bMap; Review comment: Does it matter for this PR which type `bMap` has? ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java ## @@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() { assertThat(map.getByKeyB(1), is(secondValue)); assertThat(map.getByKeyA(2), is(secondValue)); } + + @Test + public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() { + final DualKeyLinkedMap map = new DualKeyLinkedMap<>(2); + + final String value = "foobar"; + map.put(1, 1, value); + map.put(2, 2, value); + + map.put(1, 1, value); + assertThat(map.keySetA().iterator().next(), is(1)); Review comment: I think `values()` are also interesting to check for both added tests. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -74,6 +74,20 @@ public V getKeyB(B bKey) { } } + public A getKeyA(B bKey) { Review comment: ```suggestion public A getKeyAByKeyB(B bKey) { ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -64,7 +64,7 @@ public V getKeyA(A aKey) { } } - public V getKeyB(B bKey) { + public V getByKeyB(B bKey) { Review comment: ```suggestion public V getValueByKeyB(B bKey) { ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -54,7 +54,7 @@ public int size() { return aMap.size(); } - public V getKeyA(A aKey) { + public V getByKeyA(A aKey) { Review comment: ```suggestion public V getValueByKeyA(A aKey) { ``` ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java ## @@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() { assertThat(map.getByKeyB(1), is(secondValue)); assertThat(map.getByKeyA(2), is(secondValue)); } + + @Test + public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() { + final DualKeyLinkedMap map = new DualKeyLinkedMap<>(2); + + final String value = "foobar"; + map.put(1, 1, value); + map.put(2, 2, value); + + map.put(1, 1, value); + assertThat(map.keySetA().iterator().next(), is(1)); + } + + @Test + public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() { + final DualKeyLinkedMap map = new DualKeyLinkedMap<>(2); + + final String value = "foobar"; + map.put(1, 1, value); + map.put(2, 2, value); + + map.put(1, 3, value); Review comment: do we also want to check cleanup of key B `3` if it were in the map? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) { return null; } + private void maybeRemapOrphanedAllocation( + final AllocationID allocationIdOfRequest, + final AllocationID allocationIdOfSlot) { + + final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot) +
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r438835441 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java ## @@ -37,20 +44,20 @@ private final LinkedHashMap> aMap; - private final LinkedHashMap bMap; + private final HashMap bMap; private transient Collection values; public DualKeyLinkedMap(int initialCapacity) { Review comment: Makes sense to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r438824119 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: Ok, the FLINK-13165 makes slot requests to be completed in order only if the offers come with unknown `AllocationIds`, right? Generally we expect that RM keeps the `AllocationId` to match `SlotRequestID`. I am fine to break the tie `SlotRequestId->AllocationID` if there is no known consequences. Eventually, I hope it might even help to simplify the `SlotPoolImpl`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r435362084 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, this looks to me as an optimisation that complicates things a bit at the moment. It is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation can never succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r435362084 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation can never succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r435362084 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation cannot succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r435362084 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it might be necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r430441131 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -112,6 +114,9 @@ /** The requests that are waiting for the resource manager to be connected. */ private final LinkedHashMap waitingForResourceManager; + /** Maps a request to its allocation. */ + private final BiMap requestedAllocations; Review comment: Looking into the implementation of `DualKeyLinkedMap` for `pendingRequests`, it seems we can just remove the first matching `SlotRequestId` and then remap the orphaned `SlotRequestId` to its `AllocationID`. The original insertion ordering should not suffer in `DualKeyLinkedMap.aMap`. If so, we could remove `requestedAllocations`. EDIT: `waitingForResourceManager` -> `pendingRequests` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order
azagrebin commented on a change in pull request #12278: URL: https://github.com/apache/flink/pull/12278#discussion_r430441131 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -112,6 +114,9 @@ /** The requests that are waiting for the resource manager to be connected. */ private final LinkedHashMap waitingForResourceManager; + /** Maps a request to its allocation. */ + private final BiMap requestedAllocations; Review comment: Looking into the implementation of `DualKeyLinkedMap` for `waitingForResourceManager`, it seems we can just remove the first matching `SlotRequestId` and then remap the orphaned `SlotRequestId` to its `AllocationID`. The original insertion ordering should not suffer in `DualKeyLinkedMap.aMap`. If so, we could remove `requestedAllocations`. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ## @@ -648,26 +648,8 @@ boolean offerSlot( slotOffer.getResourceProfile(), taskManagerGateway); - // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); Review comment: I am not sure about all consequences of this change for the existing scheduling. I mean that we do not respect SlotRequestId->AllocationID by accepting the slot offer. Would it make sense to keep this behaviour configurable for now depending on scheduling strategy? Or this complication is not needed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org