[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247322#comment-16247322
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4937


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237426#comment-16237426
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148750995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
--- End diff --

Because this information is needed for scheduling.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237187#comment-16237187
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148718854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
--- End diff --

Why put the ScheduledUnit as a parameter here? I think the interface in 
slot pool should be clean and it should only have resource related parameters, 
should not have schedule related parameters.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236191#comment-16236191
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148599674
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 ---
@@ -86,10 +85,16 @@
// 

 
CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   AllocationID allocationID,
ResourceProfile resources,
Iterable locationPreferences,
@RpcTimeout Time timeout);
 
void returnAllocatedSlot(Slot slot);
+
+   /**
+* Cancel a slot allocation.
+* This method should be called when the CompletableFuture returned by 
allocateSlot completed exceptionally.
--- End diff --

Params description is missing.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236185#comment-16236185
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148591926
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
--- End diff --

We should not remove the `ScheduledUnit` parameter here.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236180#comment-16236180
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOw

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236182#comment-16236182
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570658
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
--- End diff --

separate test


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236184#comment-16236184
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148598542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -1006,7 +1044,13 @@ public boolean returnAllocatedSlot(Slot slot) {
Iterable locationPreferences = 

task.getTaskToExecute().getVertex().getPreferredLocations();
 
-   return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, locationPreferences, timeout);
+   final AllocationID allocationID = new AllocationID();
+   CompletableFuture slotFuture = 
gateway.allocateSlot(allocationID, ResourceProfile.UNKNOWN, 
locationPreferences, timeout);
+   slotFuture.exceptionally((Throwable failure) -> {
--- End diff --

I think `slotFuture.whenComplete` would better fit here.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236187#comment-16236187
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596041
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
--- End diff --

we should fail the pending request properly. E.g. check if the slot is in 
`waitingForResourceManager` or `pendingRequests`. If yes, then remove and call 
`failPendingRequest`.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236174#comment-16236174
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571851
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
when(resourceManagerGateway
-   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
-   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
+   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
+   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
--- End diff --

Why not returning a proper `CompletableFuture` here?


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236183#comment-16236183
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOw

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236170#comment-16236170
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148569315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
PendingRequest request = pendingRequests.remove(allocationID);
-   if (request != null && !request.getFuture().isDone()) {
-   request.getFuture().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
+   if (request != null && (!request.getFuture().isDone() || 
request.getFuture().isCompletedExceptionally())) {
+   //TODO: the following line depends on the pr: 
https://github.com/apache/flink/pull/4887
+   //if (resourceManagerGateway != null) {
+   //  resourceManagerGateway.cancelSlotRequest(jobId, 
jobMasterId, allocationID);
+   //}
--- End diff --

This should be removed and added once #4887 has been merged.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236190#comment-16236190
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148602423
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
--- End diff --

I think the test could look the following:

```
slotPoolGateway.allocateSlot();
CompletableFuture numberPendingRequestsFuture = 
slotPoolGateway.requestNumberPendingRequests();
assertEquals(1, numberPendingRequestsFuture.get());
slotPoolGateway.cancelAllocation();
CompletableFuture numberPendingRequestsFuture = 
slotPoolGateway.requestNumberPendingRequests();
assertEquals(0, numberPendingRequestsFuture.get());
```


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236186#comment-16236186
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148601504
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() {
return allocatedSlots;
}
 
+   @VisibleForTesting
+   AvailableSlots getAvailableSlots() {
+   return availableSlots;
+   }
+
+   @VisibleForTesting
+   int getNumOfWaitingForResourceRequests() {
+   return waitingForResourceManager.size();
+   }
+
+   @VisibleForTesting
+   int getNumOfPendingRequests() {
+   return pendingRequests.size();
+   }
--- End diff --

I think we should not make internal state easily accessible because it will 
usually be modified by the main thread. Also when checking a certain 
interleaving you might be falsely entrapped that you can do something like
```
slotPool.asyncAddPendingRequest()
slotPool.getNumOfPendingRequests() // this returns +1 pending requests
```

This is might work but sometimes it also does not work because the 
concurrent operation has not been completed. I would like to make concurrent 
operations explicit by, for example, returning a `CompletableFuture 
getNumberOfPendingRequests` if at all.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236176#comment-16236176
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570094
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
--- End diff --

Better to check with `instanceOf` I think.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236189#comment-16236189
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148602544
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
--- End diff --

It's not guaranteed that `pool.getNumofPendingRequests()` is executed after 
`pool.cancelSlotAllocation` has been executed.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236171#comment-16236171
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570278
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
--- End diff --

This should be a separate test.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236181#comment-16236181
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570875
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOw

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236178#comment-16236178
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
+   
+   removePendingRequestWithException(allocationID, new 
CancellationException("Allocation " + allocationID + " cancelled"));
+
+   if (allocatedSlots.contains(allocationID)) {
+   Slot slot = allocatedSlots.get(allocationID);
--- End diff --

We could avoid the `contains` call by simply calling `get` and then compare 
against `null`.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236188#comment-16236188
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148599978
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
--- End diff --

This is not needed if you add `throws Exception` to the test method.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236175#comment-16236175
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570569
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
--- End diff --

should be removed


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236179#comment-16236179
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570462
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
--- End diff --

Timeout should be lower


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236177#comment-16236177
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148572112
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
--- End diff --

We could think about implementing a `SimpleAckingResourceManagerGateway` 
for testing purposes. That way we avoid mocking too much.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236172#comment-16236172
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570387
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
--- End diff --

Timeouts should be lower.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236173#comment-16236173
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148592528
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
--- End diff --

maybe we could refactor this method into 
`failPendingRequest(PendingRequest, Exception)`, then it could be used by 
`checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager`


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235157#comment-16235157
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4937

[FLINK-6434] [runtime] cancel slot allocation if request timed out in 
ProviderAndOwner


## What is the purpose of the change

This pr adds a cancel slot allocation protocol between ProviderAndOwner and 
SlotPool. So that ProviderAndOwner can cancel the slot allocations no longer 
need to avoid slot leaking.

## Brief change log

  - *Let the ProviderAndOwner generate the allocation id before calling 
allocateSlot to slot pool.*
  - *If the allocateSlot call timed out, ProviderAndOwner cancel the 
previos allocation to slot pool.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unittest in SlotPoolRpcTest*
  - *Modify the existing SlotPoolTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-6434

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4937


commit ab3c599d55847451a1194ba55375207267561a71
Author: shuai.xus 
Date:   2017-10-20T09:12:39Z

[FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner

Summary:
This fix flink jira #6434
1. Let the ProviderAndOwner generate the allcation id before calling 
allocateSlot to slot pool.
2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos 
allocation to slot pool.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D323990




> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-07-03 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073081#comment-16073081
 ] 

shuai.xu commented on FLINK-6434:
-

[~till.rohrmann] Sorry, I have not start working on it

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-06-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068237#comment-16068237
 ] 

Till Rohrmann commented on FLINK-6434:
--

Hi [~tiemsn], what's the state of the fix? Are you working on this issue?

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-08 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000435#comment-16000435
 ] 

shuai.xu commented on FLINK-6434:
-

Yes, add a request id can solve the problem.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-08 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000369#comment-16000369
 ] 

Till Rohrmann commented on FLINK-6434:
--

You're right [~tiemsn]. The problem seems to be that the actual slot allocation 
has a different lifetime than a concrete slot request. I think we have to 
introduce a slot request id which can be used to fail a specific slot request. 
That way we decouple the requests from the actual slots a {{SlotPool}} has 
available.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-07 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000232#comment-16000232
 ] 

shuai.xu commented on FLINK-6434:
-

[~till.rohrmann] This seems can not fix the bug totally, as between 
allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot 
with allocationID2 may fulfill the pending request, and the allocatedSlots 
record it will allocationID2, failAllocation(allcoationID1) can not release it, 
the slot is still leaked.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-05 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997961#comment-15997961
 ] 

Till Rohrmann commented on FLINK-6434:
--

Thanks for reporting the issue [~tiemsn]. This sounds like a bug and should be 
fixed.

I think we could solve it the following way: We generate the {{AllocationID}} 
in {{ProviderAndOwner#allocateSlot}} and pass it to 
{{SlotPoolGateway#allocateSlot}}. On the returned future we register an 
exception handler which will call {{SlotPoolGateway#failAllocation}} with the 
generated {{AllocationID}}. That way we should be able to deal with timeouts on 
the {{Execution}} side. What do you think?

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-04 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996823#comment-15996823
 ] 

Till Rohrmann commented on FLINK-6434:
--

Thanks for reporting the issue. I think the best way to solve the problem is to 
check whether we can complete the pending request future. If this is not the 
case, then we should release the {{SimpleSlot}} and add the {{AllocatedSlot}} 
to the set of {{availableSlots}} instead of moving it to {{allocatedSlots}}.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)