Author: kasha
Date: Sat Jul 19 00:20:16 2014
New Revision: 1611841
URL: http://svn.apache.org/r1611841
Log:
YARN-2244. FairScheduler missing handling of containers for unknown application
attempts. (Anubhav Dhoot via kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jul 19
00:20:16 2014
@@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED
after RM recovery but before scheduler learns about apps and app-attempts.
(Jian He via vinodkv)
+ YARN-2244. FairScheduler missing handling of containers for unknown
+ application attempts. (Anubhav Dhoot via kasha)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
Sat Jul 19 00:20:16 2014
@@ -123,6 +123,23 @@ public abstract class AbstractYarnSchedu
return maximumAllocation;
}
+ protected void containerLaunchedOnNode(ContainerId containerId,
+ SchedulerNode node) {
+ // Get the application for the finished container
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+ (containerId);
+ if (application == null) {
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
+ }
+
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
+ }
+
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Sat Jul 19 00:20:16 2014
@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -866,21 +865,6 @@ public class CapacityScheduler extends
}
- private void containerLaunchedOnNode(ContainerId containerId,
FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Sat Jul 19 00:20:16 2014
@@ -929,22 +929,6 @@ public class FairScheduler extends
}
/**
- * Process a container which has launched on a node, as reported by the node.
- */
- private void containerLaunchedOnNode(ContainerId containerId,
FSSchedulerNode node) {
- // Get the application for the finished container
- FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
- /**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Sat Jul 19 00:20:16 2014
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -831,23 +830,6 @@ public class FifoScheduler extends
}
}
- private void containerLaunchedOnNode(ContainerId containerId,
FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- // Some unknown container sneaked into the system. Kill it.
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Lock(FifoScheduler.class)
private synchronized void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1611841&r1=1611840&r2=1611841&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Sat Jul 19 00:20:16 2014
@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- List<ContainerId> contsToClean = resp.getContainersToCleanup();
- int cleanedConts = contsToClean.size();
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " +
cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts += contsToClean.size();
- }
- LOG.info("Got cleanup for " + contsToClean.get(0));
- Assert.assertEquals(1, cleanedConts);
+ waitForContainerCleanup(dispatcher, nm1, resp);
// Now to test the case when RM already gave cleanup, and NM suddenly
// realizes that the container is running.
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts = contsToClean.size();
// The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl.
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " +
cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
+ waitForContainerCleanup(dispatcher, nm1, resp);
+
+ rm.stop();
+ }
+
+ protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+ NodeHeartbeatResponse resp) throws Exception {
+ int waitCount = 0, cleanedConts = 0;
+ List<ContainerId> contsToClean;
+ do {
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
+ if (cleanedConts >= 1) {
+ break;
+ }
+ Thread.sleep(100);
+ resp = nm.nodeHeartbeat(true);
+ } while(waitCount++ < 200);
+
+ if (contsToClean.isEmpty()) {
+ LOG.error("Failed to get any containers to cleanup");
+ } else {
+ LOG.info("Got cleanup for " + contsToClean.get(0));
}
- LOG.info("Got cleanup for " + contsToClean.get(0));
Assert.assertEquals(1, cleanedConts);
-
- rm.stop();
}
-
+
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
throws Exception {
while (true) {
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
rm2.stop();
}
+ @SuppressWarnings("resource")
+ @Test (timeout = 60000)
+ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+ Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
ContainerState.RUNNING);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+ // start new RM
+ final DrainDispatcher dispatcher2 = new DrainDispatcher();
+ MockRM rm2 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher2;
+ }
+ };
+ rm2.start();
+
+ // nm1 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Add unknown container for application unknown to scheduler
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+ .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+ waitForContainerCleanup(dispatcher2, nm1, response);
+
+ rm1.stop();
+ rm2.stop();
+ }
+
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();