[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247322#comment-16247322 ] ASF GitHub Bot commented on FLINK-6434: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4937 > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237426#comment-16237426 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148750995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, --- End diff -- Because this information is needed for scheduling. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237187#comment-16237187 ] ASF GitHub Bot commented on FLINK-6434: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148718854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, --- End diff -- Why put the ScheduledUnit as a parameter here? I think the interface in slot pool should be clean and it should only have resource related parameters, should not have schedule related parameters. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236191#comment-16236191 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148599674 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java --- @@ -86,10 +85,16 @@ // CompletableFuture allocateSlot( - ScheduledUnit task, + AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, @RpcTimeout Time timeout); void returnAllocatedSlot(Slot slot); + + /** +* Cancel a slot allocation. +* This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally. --- End diff -- Params description is missing. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236185#comment-16236185 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148591926 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, --- End diff -- We should not remove the `ScheduledUnit` parameter here. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236180#comment-16236180 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236182#comment-16236182 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570658 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool --- End diff -- separate test > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236184#comment-16236184 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148598542 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -1006,7 +1044,13 @@ public boolean returnAllocatedSlot(Slot slot) { Iterable locationPreferences = task.getTaskToExecute().getVertex().getPreferredLocations(); - return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout); + final AllocationID allocationID = new AllocationID(); + CompletableFuture slotFuture = gateway.allocateSlot(allocationID, ResourceProfile.UNKNOWN, locationPreferences, timeout); + slotFuture.exceptionally((Throwable failure) -> { --- End diff -- I think `slotFuture.whenComplete` would better fit here. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236187#comment-16236187 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148596041 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, + public CompletableFuture allocateSlot(AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, resources, locationPreferences); } @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + waitingForResourceManager.remove(allocationID); --- End diff -- we should fail the pending request properly. E.g. check if the slot is in `waitingForResourceManager` or `pendingRequests`. If yes, then remove and call `failPendingRequest`. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236174#comment-16236174 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571851 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) { } } - private static ResourceManagerGateway createResourceManagerGatewayMock() { + static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway - .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); + .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class))) + .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); --- End diff -- Why not returning a proper `CompletableFuture` here? > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236183#comment-16236183 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236170#comment-16236170 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148569315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.getFuture().isDone()) { - request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null && (!request.getFuture().isDone() || request.getFuture().isCompletedExceptionally())) { + //TODO: the following line depends on the pr: https://github.com/apache/flink/pull/4887 + //if (resourceManagerGateway != null) { + // resourceManagerGateway.cancelSlotRequest(jobId, jobMasterId, allocationID); + //} --- End diff -- This should be removed and added once #4887 has been merged. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236190#comment-16236190 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148602423 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); --- End diff -- I think the test could look the following: ``` slotPoolGateway.allocateSlot(); CompletableFuture numberPendingRequestsFuture = slotPoolGateway.requestNumberPendingRequests(); assertEquals(1, numberPendingRequestsFuture.get()); slotPoolGateway.cancelAllocation(); CompletableFuture numberPendingRequestsFuture = slotPoolGateway.requestNumberPendingRequests(); assertEquals(0, numberPendingRequestsFuture.get()); ``` > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236186#comment-16236186 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148601504 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() { return allocatedSlots; } + @VisibleForTesting + AvailableSlots getAvailableSlots() { + return availableSlots; + } + + @VisibleForTesting + int getNumOfWaitingForResourceRequests() { + return waitingForResourceManager.size(); + } + + @VisibleForTesting + int getNumOfPendingRequests() { + return pendingRequests.size(); + } --- End diff -- I think we should not make internal state easily accessible because it will usually be modified by the main thread. Also when checking a certain interleaving you might be falsely entrapped that you can do something like ``` slotPool.asyncAddPendingRequest() slotPool.getNumOfPendingRequests() // this returns +1 pending requests ``` This is might work but sometimes it also does not work because the concurrent operation has not been completed. I would like to make concurrent operations explicit by, for example, returning a `CompletableFuture getNumberOfPendingRequests` if at all. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236176#comment-16236176 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570094 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); --- End diff -- Better to check with `instanceOf` I think. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236189#comment-16236189 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148602544 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); --- End diff -- It's not guaranteed that `pool.getNumofPendingRequests()` is executed after `pool.cancelSlotAllocation` has been executed. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236171#comment-16236171 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570278 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests --- End diff -- This should be a separate test. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236181#comment-16236181 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570875 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236178#comment-16236178 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148596180 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, + public CompletableFuture allocateSlot(AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, resources, locationPreferences); } @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + waitingForResourceManager.remove(allocationID); + + removePendingRequestWithException(allocationID, new CancellationException("Allocation " + allocationID + " cancelled")); + + if (allocatedSlots.contains(allocationID)) { + Slot slot = allocatedSlots.get(allocationID); --- End diff -- We could avoid the `contains` call by simply calling `get` and then compare against `null`. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236188#comment-16236188 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148599978 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { --- End diff -- This is not needed if you add `throws Exception` to the test method. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236175#comment-16236175 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570569 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); --- End diff -- should be removed > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236179#comment-16236179 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570462 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here --- End diff -- Timeout should be lower > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236177#comment-16236177 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148572112 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) { } } - private static ResourceManagerGateway createResourceManagerGatewayMock() { + static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); --- End diff -- We could think about implementing a `SimpleAckingResourceManagerGateway` for testing purposes. That way we avoid mocking too much. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236172#comment-16236172 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570387 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); --- End diff -- Timeouts should be lower. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236173#comment-16236173 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148592528 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { --- End diff -- maybe we could refactor this method into `failPendingRequest(PendingRequest, Exception)`, then it could be used by `checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager` > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235157#comment-16235157 ] ASF GitHub Bot commented on FLINK-6434: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4937 [FLINK-6434] [runtime] cancel slot allocation if request timed out in ProviderAndOwner ## What is the purpose of the change This pr adds a cancel slot allocation protocol between ProviderAndOwner and SlotPool. So that ProviderAndOwner can cancel the slot allocations no longer need to avoid slot leaking. ## Brief change log - *Let the ProviderAndOwner generate the allocation id before calling allocateSlot to slot pool.* - *If the allocateSlot call timed out, ProviderAndOwner cancel the previos allocation to slot pool.* ## Verifying this change This change added tests and can be verified as follows: - *Added unittest in SlotPoolRpcTest* - *Modify the existing SlotPoolTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-6434 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4937 commit ab3c599d55847451a1194ba55375207267561a71 Author: shuai.xus Date: 2017-10-20T09:12:39Z [FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner Summary: This fix flink jira #6434 1. Let the ProviderAndOwner generate the allcation id before calling allocateSlot to slot pool. 2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos allocation to slot pool. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D323990 > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073081#comment-16073081 ] shuai.xu commented on FLINK-6434: - [~till.rohrmann] Sorry, I have not start working on it > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068237#comment-16068237 ] Till Rohrmann commented on FLINK-6434: -- Hi [~tiemsn], what's the state of the fix? Are you working on this issue? > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000435#comment-16000435 ] shuai.xu commented on FLINK-6434: - Yes, add a request id can solve the problem. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000369#comment-16000369 ] Till Rohrmann commented on FLINK-6434: -- You're right [~tiemsn]. The problem seems to be that the actual slot allocation has a different lifetime than a concrete slot request. I think we have to introduce a slot request id which can be used to fail a specific slot request. That way we decouple the requests from the actual slots a {{SlotPool}} has available. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000232#comment-16000232 ] shuai.xu commented on FLINK-6434: - [~till.rohrmann] This seems can not fix the bug totally, as between allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot with allocationID2 may fulfill the pending request, and the allocatedSlots record it will allocationID2, failAllocation(allcoationID1) can not release it, the slot is still leaked. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997961#comment-15997961 ] Till Rohrmann commented on FLINK-6434: -- Thanks for reporting the issue [~tiemsn]. This sounds like a bug and should be fixed. I think we could solve it the following way: We generate the {{AllocationID}} in {{ProviderAndOwner#allocateSlot}} and pass it to {{SlotPoolGateway#allocateSlot}}. On the returned future we register an exception handler which will call {{SlotPoolGateway#failAllocation}} with the generated {{AllocationID}}. That way we should be able to deal with timeouts on the {{Execution}} side. What do you think? > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996823#comment-15996823 ] Till Rohrmann commented on FLINK-6434: -- Thanks for reporting the issue. I think the best way to solve the problem is to check whether we can complete the pending request future. If this is not the case, then we should release the {{SimpleSlot}} and add the {{AllocatedSlot}} to the set of {{availableSlots}} instead of moving it to {{allocatedSlots}}. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)