[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-02 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386798964
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   @TaoYang526
   Are you suggesting that calling `NMClientAsync.getContainerStatusAsync` on a 
`NEW` container might result in `onGetContainerStatusError` on some Hadoop 
versions while `onContainerStatusReceived` on other versions?
   
   If that is the case, I think we can have a common method handling releasing 
the container and removing it from the worker node map, which should be called 
in both `onContainerStatusReceived` with `if (containerStatus.getState() == 
ContaienrState.NEW)` and `onGetContainerStatusError`.
   
   One more question, how do we now whether a container is `NEW` or there's 
some other problems in `onGetContainerStatusError`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386187320
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+   private BiFunction 
getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+   throw new UnsupportedOperationException("No 
getContainerStatusAsync function has been set.");
 
 Review comment:
   > This function should only be invoked explicitly.
   
   I don't see how is this necessary. We should not assume how RM uses this 
method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386187320
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+   private BiFunction 
getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+   throw new UnsupportedOperationException("No 
getContainerStatusAsync function has been set.");
 
 Review comment:
   > This function should only be invoked explicitly.
   I don't see how is this necessary. We should not assume how RM uses this 
method.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386172732
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -446,8 +459,8 @@ public void testStopWorker() throws Exception {
 
unregisterAndReleaseFuture.get();
 
-   
verify(mockNMClient).stopContainerAsync(any(ContainerId.class), 
any(NodeId.class));

verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+   
testingContainerStopFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
 
 Review comment:
   This should come before the previous line, because containers are first 
stopped, then released.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386173738
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -528,6 +553,40 @@ public void testOnStartContainerError() throws Exception {
}};
}
 
+   @Test
+   public void testHandleContainersFromPreviousAttempt() throws Exception {
+   new Context() {{
+   runTest(() -> {
+   Container runningCountainer = 
mockContainer("container", 1234, 1, resourceManager.getContainerResource());
+   Container newContainer = 
mockContainer("container", 1234, 2, resourceManager.getContainerResource());
+
+   
resourceManager.getTestingNMClientAsync().setGetContainerStatusAsyncBiFunction((containerId,
 nodeId) -> {
+   if 
(containerId.equals(runningCountainer.getId())) {
+   return 
ContainerStatus.newInstance(containerId, ContainerState.RUNNING, "", 0);
+   } else {
+   return 
ContainerStatus.newInstance(containerId, ContainerState.NEW, "", 0);
+   }
+   });
+
+   List containersFromPreviousAttempt = 
new ArrayList<>();
+   containersFromPreviousAttempt.add(newContainer);
+   
containersFromPreviousAttempt.add(runningCountainer);
+   RegisterApplicationMasterResponse testResponse 
= RegisterApplicationMasterResponse.newInstance(
+   resourceManager.getContainerResource(),
+   resourceManager.getContainerResource(),
+   Collections.emptyMap(),
+   null,
+   containersFromPreviousAttempt,
+   "",
+   Collections.emptyList());
+   
resourceManager.getContainersFromPreviousAttempts(testResponse);
+
+   
verify(mockResourceManagerClient).releaseAssignedContainer(newContainer.getId());
+   assertEquals(1, 
resourceManager.getWorkerNodeMap().size());
 
 Review comment:
   In addition to the size, we can also verify the worker in the worker not map 
is the running container.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386170128
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+   private BiFunction 
getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+   throw new UnsupportedOperationException("No 
getContainerStatusAsync function has been set.");
 
 Review comment:
   I think we should try to have a default function that does not throw any 
exception if possible.
   
   We can change the return type of this function to 
`Optional`, return `Optional.empty()` by default, and in 
`getContainerStatusAsync` we check it and only invoke the `callbackHandler` if 
the function return value is not empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386071286
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   Regarding your concern:
   - In most cases, people can simply set the functions before starting the RM, 
i.e., before `runTest`.
   - In cases changing functions during the test, one can easily make sure the 
change of the function happens after calls to the previous function with 
`CompletableFuture`. Similar to what we do with 
`rpcService.(re)setRpcGatewayFutureFunction` in 
`ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor`. 
   - Even if someone messed it up, it should only affect certain test case, 
which is easy to locate. No production code or other test cases will be 
affected.
   
   I agree that adding a builder class for `Context` could also be an option. 
However, I think overwriting the `TestingNMClientAsync` functions might be a 
better solution, for the following advantages compared to adding builder for 
`Context`:
   - It narrows down the scope that we need to customize in a test case. We 
need only provide a function that defines the NM client behavior (exactly what 
actually needs to be customized for the test case), instead of the entire 
`TestingNMClientAsync`.
   - It provides the flexibility of changing NM client behavior during the 
test, which might not be needed ATM but might be in the future.
   - Currently we define the test cases in double brace initialization of the 
`Context` class, providing very good readability. I believe the double brace 
initialization is required to directly follow the constructor, which means 
adding a builder to `Context` would prevent the usage of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386034735
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws 
Exception {
}
}
 
-   void verifyContainerHasBeenStarted(Container testingContainer) {
+   void verifyContainerHasBeenStarted(Container testingContainer, 
List startedContainerIds) throws Exception {
verify(mockResourceManagerClient, 
VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-   verify(mockNMClient, 
VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), 
any(ContainerLaunchContext.class));
+   // Wait the call of
+   for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 
1000) {
+   if 
(startedContainerIds.contains(testingContainer.getId())) {
+   return;
+   }
+   Thread.sleep(1000);
+   }
+   throw new Exception("The container has not been start 
before timeout.");
 
 Review comment:
   I think there are various benefits using `CompletableFuture`:
   - You don't have to handle the thread safety yourself. 
   - You don't have to do the loop and sleep yourself.
   - Better readability.
   - Probably better performance, compared to your loop and sleep.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386034363
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   I'm not sure about that. There are quite some fields in `Context`. If for 
each field that needs to be customized we always add it to the `Context` 
constructor parameters, then the constructor will soon become complex, or we 
have to maintain lots of constructors with different parameter combinations.
   
   I think the key point of making `Consumer` and `Function` of 
`TestingNMClientAsync` is that, we can use the same default 
`TestingNMClientAsync` created in `Context`, but still be able to define its 
behavior in the test case. Being able to change it's behavior is only an extra 
benefit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385998488
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -528,6 +563,43 @@ public void testOnStartContainerError() throws Exception {
}};
}
 
+   @Test
+   public void testStartWithContainerFromPreviousAttempt() throws 
Exception {
 
 Review comment:
   minor:
   It's a bit confusing what does "start with" mean in the test case name.
   I would call it something like "testHandleContainersFromPreviousAttempt".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385994939
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+   private final BiFunction 
getContainerStatusAsyncBiFunction;
+   private final BiConsumer 
startContainerAsyncBiConsumer;
+   private final BiConsumer 
stopContainerAsyncBiConsumer;
+
+   private TestingNMClientAsync(
+   CallbackHandler callbackHandler,
+   BiFunction 
getContainerStatusAsyncBiFunction,
+   BiConsumer 
startContainerAsyncBiConsumer,
+   BiConsumer 
stopContainerAsyncBiConsumer) {
+   super(callbackHandler);
+   this.getContainerStatusAsyncBiFunction = 
getContainerStatusAsyncBiFunction;
+   this.startContainerAsyncBiConsumer = 
startContainerAsyncBiConsumer;
+   this.stopContainerAsyncBiConsumer = 
stopContainerAsyncBiConsumer;
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   startContainerAsyncBiConsumer.accept(container, 
containerLaunchContext);
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
+   stopContainerAsyncBiConsumer.accept(containerId, nodeId);
+   }
+
+   @Override
+   public void getContainerStatusAsync(ContainerId containerId, NodeId 
nodeId) {
+   callbackHandler.onContainerStatusReceived(containerId, 
getContainerStatusAsyncBiFunction.apply(containerId, nodeId));
+   }
+
+   public static Builder newBuilder(NMClientAsync.CallbackHandler 
callbackHandler) {
+   return new Builder(callbackHandler);
+   }
+
+   /**
+* Builder for {@link TestingNMClientAsync}.
+*/
+   public static class Builder {
+   private BiFunction 
getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+   throw new UnsupportedOperationException("No 
getContainerStatusAsync function has been set.");
+   };
+   private BiConsumer 
startContainerAsyncBiConsumer = (container, containerLaunchContext) -> {};
+   private BiConsumer 
stopContainerAsyncBiConsumer = (containerId, nodeId) -> {};
+   private NMClientAsync.CallbackHandler callbackHandler;
 
 Review comment:
   nit: this could be `final`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385998143
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws 
Exception {
}
}
 
-   void verifyContainerHasBeenStarted(Container testingContainer) {
+   void verifyContainerHasBeenStarted(Container testingContainer, 
List startedContainerIds) throws Exception {
verify(mockResourceManagerClient, 
VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-   verify(mockNMClient, 
VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), 
any(ContainerLaunchContext.class));
+   // Wait the call of
+   for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 
1000) {
+   if 
(startedContainerIds.contains(testingContainer.getId())) {
+   return;
+   }
+   Thread.sleep(1000);
+   }
+   throw new Exception("The container has not been start 
before timeout.");
 
 Review comment:
   I see a thread safety issue here. The collection `startedContainerIds` are 
concurrently accessed by this verification method and 
`TestingNMClientAsync#startContainerAsyncBiConsumer`. It's probably fine at the 
moment, since the verification method does not do any modifications. But I 
think it would still be good to avoid this.
   
   In the test case, for each container we could create a `CompletableFuture`, 
and complete the future in 
`TestingNMClientAsync#startContainerAsyncBiConsumer`. And in the verification 
method, we can use `CompletableFuture#get(timeout, unit)` to verify whether the 
container is started. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385996792
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   I'm not sure about setting the `TestingNMClientAsync` in this way.
   It requires `setTestingNMClientAsync` get called before 
`createAndStartNodeManagerClient`, in other words before RM is started, which 
is implicit and hard to maintain. 
   
   I would suggest to overwrite the behavior of `TestingNMClientAsync`, rather 
than overwriting the object it self. To be specific, we could do the following 
changes.
   1. Add a field `TestingNMClientAsync` in `Context`, so we can access it in 
the test cases.
   2. Create `TestingNMClientAsync` in `Context` and pass it into 
`TestingYarnResourceManager`.
   3. Make `Consumer` and `Function` in `TestingNMClientAsync` settable, so we 
can define/change its behavior during tests.
   
   In this way, it only requires to define the NM client behaviors before using 
them, which I believe is more straightforward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385995255
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
static class TestingYarnResourceManager extends YarnResourceManager {
AMRMClientAsync 
mockResourceManagerClient;
-   NMClientAsync mockNMClient;
+   TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   I think we can keep it declared as `NMClientAsync`. That ensures we use the 
exact same set of public interfaces in the tests as in the production code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-28 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385996883
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
static class TestingYarnResourceManager extends YarnResourceManager {
AMRMClientAsync 
mockResourceManagerClient;
-   NMClientAsync mockNMClient;
+   TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Let's make these fields `final`, to avoid changing them after the 
`TestingYarnResourceManager` is created.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385530896
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -528,6 +558,31 @@ public void testOnStartContainerError() throws Exception {
}};
}
 
+   @Test
+   public void testStartWithContainerFromPreviousAttempt() throws 
Exception {
+   new Context() {{
+   runTest(() -> {
+   Container runningCountainer = 
mockContainer("container", 1234, 1, Resource.newInstance(1024, 1));
+   Container newContainer = 
mockContainer("container", 1234, 2, Resource.newInstance(1024, 1));
+   resourceManager.getWorkerNodeMap().put(new 
ResourceID(runningCountainer.getId().toString()), new 
YarnWorkerNode(runningCountainer));
+   resourceManager.getWorkerNodeMap().put(new 
ResourceID(newContainer.getId().toString()), new YarnWorkerNode(newContainer));
+   testingNMClientAsync.containerStatuses.add(
+   
ContainerStatus.newInstance(runningCountainer.getId(), ContainerState.RUNNING, 
"", 0));
+   testingNMClientAsync.containerStatuses.add(
+   
ContainerStatus.newInstance(newContainer.getId(), ContainerState.NEW, "", 0));
+
+   CompletableFuture 
requestContainerStatusFuture = resourceManager.runInMainThread(() -> {
+   
testingNMClientAsync.getContainerStatusAsync(runningCountainer.getId(), 
runningCountainer.getNodeId());
+   
testingNMClientAsync.getContainerStatusAsync(newContainer.getId(), 
newContainer.getNodeId());
+   return null;
+   });
 
 Review comment:
   I think we can make `getContainersFromPreviousAttempts` visible for testing, 
and call it on the main thread with a custom 
`RegisterApplicationMasterResponse`.
   
   The purpose of this test case is to test `YarnResourceManager` properly 
handles recovered containers, including querying their status from Yarn NM and 
handles them according to the received status. Currently, only covers second 
part of the workflow. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385527439
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -454,6 +456,11 @@ public void onError(Throwable error) {
onFatalError(error);
}
 
+   @VisibleForTesting
+   Map getWorkerNodeMap() {
+   return workerNodeMap;
 
 Review comment:
   ```suggestion
return Collections.unmodifiableMap(workerNodeMap);
   ```
   We should always be careful exposing non-primitive fields, such as `Map` and 
`List`. Despite declared `final`, they might still be changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519116
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
+   // If the status is "NEW", it means that the container 
is allocated but not be started yet.
+   // We need to release it.
+   log.warn("The container {} from the previous attempt 
did not start. Released.", containerId);
 
 Review comment:
   ```suggestion
log.info("Releasing container {} from the previous 
attempt. No TaskExecutor started inside.", containerId);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519373
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
+   // If the status is "NEW", it means that the container 
is allocated but not be started yet.
+   // We need to release it.
+   log.warn("The container {} from the previous attempt 
did not start. Released.", containerId);
 
 Review comment:
   I think a INFO level log message should be enough. This is not causing any 
problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385525529
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
}
}
 
+   static class TestingNMClientAsync extends NMClientAsync {
+
+   public List containerStatuses = new 
ArrayList<>();
+
+   protected TestingNMClientAsync(CallbackHandler callbackHandler) 
{
+   super(callbackHandler);
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   // Do nothing.
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId 
nodeId) {
+   // Do nothing.
+   }
+
+   @Override
+   public void getContainerStatusAsync(ContainerId containerId, 
NodeId nodeId) {
+   for (ContainerStatus containerStatus: 
containerStatuses) {
+   if 
(containerStatus.getContainerId().equals(containerId)) {
+   
callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+   }
+   }
+   }
+   }
 
 Review comment:
   Let's make this class more general by using `Function` and `Consumer` to 
define its behaviors. Something like the following, taking 
`getContainerStatusAsync` as an example.
   ```
   static class TestingNMClientAsync extends NMClientAsync {
   // ...
   
   private final BiFunction 
getContainerStatusAsyncFunction;
   
   // ...
   
   public void getContainerStatusAsync(ContainerId containerId, NodeId 
nodeId) {
   callbackHandler.onContainerStatusReceived(containerId, 
getContainerStatusAsyncFunction.apply(containerId, NodeId));
   }
   
   // ...
   }
   ```
   
   We can use a builder class to create this class, allowing setting custom 
`Function` and `Consumer`. If the codes for this class grows too much, we can 
also put it in a separate file.
   
   There are several benefit for doing this.
   - It allows defining per-test-case behavior, which makes is easier to reuse 
this class in the future.
   - It avoids using `mock`, `spy` and `verify`.
   - It avoids having a public accessible `containerStatuses`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services