HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246860798
 
 

 ##########
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -512,4 +517,53 @@ public void testOnContainerCompleted() throws Exception {
                        });
                }};
        }
+
+       /**
+        *      Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+        *      contains is hit.
+        */
+       @Test
+       public void testOnContainersAllocatedWithFailure() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               CompletableFuture<?> registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+                                       return null;
+                               });
+
+                               // wait for the registerSlotRequest completion
+                               registerSlotRequestFuture.get();
+
+                               // Callback from YARN when container is 
allocated.
+                               Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1);
+                               
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+                               
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+                               ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+                               
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+                                       hardwareDescription, Time.seconds(10L));
+
+                               // force to unregister the task manager
+                               
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+                               // request second slot
+                               registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
 
 Review comment:
   Good suggestion. But it is SlotPool's PendingRequest will 
completeExceptionally, but this slot request instance right? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to