[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-03 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386859531
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   I just have an offline discussion with @xintongsong  and @TaoYang526.
   
   The key point of this issue is: we need to release the container which is 
not started in the previous attempt or is not useable anymore. When we try to 
get the status of those containers:
   - If it goes into `onContainerStatusReceived`: It means that the container 
has been started in the previous attempt. No matter what is the state, these 
containers will be released eventually by the existing `onStartContainerError` 
or `onContainersCompleted`. There is no need to handle them.
   - If it goes into `onGetContainerStatusError`: It means that the container 
is not started in the previous attempt or other causes like NM lost. In such 
cases, the container is not useable and we need to release it and remove it 
from the `workerNodeMap`.
   
   So, I will move the release logic here to onGetContainerStatusError.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-03 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386859531
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   I just have an offline discussion with Xintong and Taoyang.
   
   The key point of this issue is: we need to release the container which is 
not started in the previous attempt or is not useable anymore. When we try to 
get the status of those containers:
   - If it goes into `onContainerStatusReceived`: It means that the container 
has been started in the previous attempt. No matter what is the state, these 
containers will be released eventually by the existing `onStartContainerError` 
or `onContainersCompleted`. There is no need to handle them.
   - If it goes into `onGetContainerStatusError`: It means that the container 
is not started in the previous attempt or other causes like NM lost. In such 
cases, the container is not useable and we need to release it and remove it 
from the `workerNodeMap`.
   
   So, I will move the release logic here to onGetContainerStatusError.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386175617
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+   private BiFunction 
getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+   throw new UnsupportedOperationException("No 
getContainerStatusAsync function has been set.");
 
 Review comment:
   I'm not sure about that. This function should only be invoked explicitly. If 
someone needs to invoke it in the test case, they need to define the behavior 
either. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-03-01 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386164345
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
static class TestingYarnResourceManager extends YarnResourceManager {
AMRMClientAsync 
mockResourceManagerClient;
-   NMClientAsync mockNMClient;
+   TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Since we construct this field in `TestingYarnResourceManager`, I think there 
is no need to keep it declared as `NMClientAsync` now. Besides, we need to 
access those `setter` functions of it now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386073340
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   Agreed. We can leverage `CompletableFuture`.
   
   Regarding creating `TestingNMClientAsync` in `Context`, we need to create a 
`TestingYarnResourceManager` first and then pass it as the `callBackHandler` to 
the constructor of `TestingNMClientAsync`.
   So, I prefer to create `TestingNMClientAsync` in the constructor of 
`TestingYarnResourceManager` and add a "getter" function in 
`TestingYarnResourceManager`. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386067195
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   My concern for defining the behavior in the test case is:
   - If someone sets the `startContainerAsync`, which is called asynchronously, 
multiple times during the test, it may cause an unknown issue which is hard to 
debug.
   
   Regarding the constructor of Context, I think we could fix it by adding a 
builder class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386066649
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws 
Exception {
}
}
 
-   void verifyContainerHasBeenStarted(Container testingContainer) {
+   void verifyContainerHasBeenStarted(Container testingContainer, 
List startedContainerIds) throws Exception {
verify(mockResourceManagerClient, 
VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-   verify(mockNMClient, 
VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), 
any(ContainerLaunchContext.class));
+   // Wait the call of
+   for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 
1000) {
+   if 
(startedContainerIds.contains(testingContainer.getId())) {
+   return;
+   }
+   Thread.sleep(1000);
+   }
+   throw new Exception("The container has not been start 
before timeout.");
 
 Review comment:
   Make sense to me. Thanks for the explanation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386029438
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws 
Exception {
}
}
 
-   void verifyContainerHasBeenStarted(Container testingContainer) {
+   void verifyContainerHasBeenStarted(Container testingContainer, 
List startedContainerIds) throws Exception {
verify(mockResourceManagerClient, 
VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-   verify(mockNMClient, 
VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), 
any(ContainerLaunchContext.class));
+   // Wait the call of
+   for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 
1000) {
+   if 
(startedContainerIds.contains(testingContainer.getId())) {
+   return;
+   }
+   Thread.sleep(1000);
+   }
+   throw new Exception("The container has not been start 
before timeout.");
 
 Review comment:
   Yes, you are right. I think we could fix it by using `CopyOnWriteArrayList`. 
Is there any extra benefit of leveraging the `CompletableFuture` mechanism?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386028701
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   Currently, I don't see the benefit or requirement of changing the behavior 
during tests. Besides, some of its functions are called asynchronously. I 
prefer to add `TestingNMClientAsync` to the parameter list of the constructor 
of `Context`. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386028701
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
 
+   void setTestingNMClientAsync(TestingNMClientAsync 
testingNMClientAsync) {
+   this.testingNMClientAsync = testingNMClientAsync;
+   }
 
 Review comment:
   Currently, I don't see the benefit or requirement of changing the behavior 
during tests. Besides, some of its functions are called asynchronously. I 
prefer to add `TestingNMClientAsync` as a parameter of the constructor of 
`Context`. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-29 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386027835
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
static class TestingYarnResourceManager extends YarnResourceManager {
AMRMClientAsync 
mockResourceManagerClient;
-   NMClientAsync mockNMClient;
+   TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Good catch. It could be NMClientAsync in the current implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385533680
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
}
}
 
+   static class TestingNMClientAsync extends NMClientAsync {
+
+   public List containerStatuses = new 
ArrayList<>();
+
+   protected TestingNMClientAsync(CallbackHandler callbackHandler) 
{
+   super(callbackHandler);
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   // Do nothing.
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId 
nodeId) {
+   // Do nothing.
+   }
+
+   @Override
+   public void getContainerStatusAsync(ContainerId containerId, 
NodeId nodeId) {
+   for (ContainerStatus containerStatus: 
containerStatuses) {
+   if 
(containerStatus.getContainerId().equals(containerId)) {
+   
callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+   }
+   }
+   }
+   }
 
 Review comment:
   Good point!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services