Repository: hadoop
Updated Branches:
  refs/heads/trunk cd976b263 -> 7114baddb


YARN-4051. ContainerKillEvent lost when container is still recovering and 
application finishes. Contributed by sandflee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7114badd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7114badd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7114badd

Branch: refs/heads/trunk
Commit: 7114baddb627628a54cdab77f68504332a5a0e28
Parents: cd976b2
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Thu Mar 16 09:30:10 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Thu Mar 16 09:30:10 2017 -0500

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  | 57 +++++++++++++++++---
 .../application/ApplicationImpl.java            |  4 +-
 .../containermanager/container/Container.java   |  2 +
 .../container/ContainerImpl.java                |  8 +++
 .../TestContainerManagerRecovery.java           | 18 ++++---
 .../nodemanager/webapp/MockContainer.java       |  5 ++
 6 files changed, 76 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 9f1655f..f3fe8cc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -402,8 +402,9 @@ public class ContainerManagerImpl extends CompositeService 
implements
     LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
         + " with exit code " + rcs.getExitCode());
 
-    if (context.getApplications().containsKey(appId)) {
-      recoverActiveContainer(launchContext, token, rcs);
+    Application app = context.getApplications().get(appId);
+    if (app != null) {
+      recoverActiveContainer(app, launchContext, token, rcs);
       if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
         dispatcher.getEventHandler().handle(
             new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
@@ -423,7 +424,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
    * Recover a running container.
    */
   @SuppressWarnings("unchecked")
-  protected void recoverActiveContainer(
+  protected void recoverActiveContainer(Application app,
       ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
       RecoveredContainerState rcs) throws IOException {
     Credentials credentials = YarnServerSecurityUtils.parseCredentials(
@@ -431,8 +432,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
     Container container = new ContainerImpl(getConfig(), dispatcher,
         launchContext, credentials, metrics, token, context, rcs);
     context.getContainers().put(token.getContainerID(), container);
-    dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
-        container));
+    app.handle(new ApplicationContainerInitEvent(container));
   }
 
   private void waitForRecoveredContainers() throws InterruptedException {
@@ -1286,6 +1286,10 @@ public class ContainerManagerImpl extends 
CompositeService implements
           + " is not handled by this NodeManager");
       }
     } else {
+      if (container.isRecovering()) {
+        throw new NMNotYetReadyException("Container " + containerIDStr
+            + " is recovering, try later");
+      }
       context.getNMStateStore().storeContainerKilled(containerID);
       container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
           "Container killed by the ApplicationMaster.");
@@ -1455,6 +1459,21 @@ public class ContainerManagerImpl extends 
CompositeService implements
               + " FINISH_APPS event");
           continue;
         }
+
+        boolean shouldDropEvent = false;
+        for (Container container : app.getContainers().values()) {
+          if (container.isRecovering()) {
+            LOG.info("drop FINISH_APPS event to " + appID + " because "
+                + "container " + container.getContainerId()
+                + " is recovering");
+            shouldDropEvent = true;
+            break;
+          }
+        }
+        if (shouldDropEvent) {
+          continue;
+        }
+
         String diagnostic = "";
         if (appsFinishedEvent.getReason() == 
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
           diagnostic = "Application killed on shutdown";
@@ -1469,10 +1488,32 @@ public class ContainerManagerImpl extends 
CompositeService implements
     case FINISH_CONTAINERS:
       CMgrCompletedContainersEvent containersFinishedEvent =
           (CMgrCompletedContainersEvent) event;
-      for (ContainerId container : containersFinishedEvent
+      for (ContainerId containerId : containersFinishedEvent
           .getContainersToCleanup()) {
-          this.dispatcher.getEventHandler().handle(
-              new ContainerKillEvent(container,
+        ApplicationId appId =
+            containerId.getApplicationAttemptId().getApplicationId();
+        Application app = this.context.getApplications().get(appId);
+        if (app == null) {
+          LOG.warn("couldn't find app " + appId + " while processing"
+              + " FINISH_CONTAINERS event");
+          continue;
+        }
+
+        Container container = app.getContainers().get(containerId);
+        if (container == null) {
+          LOG.warn("couldn't find container " + containerId
+              + " while processing FINISH_CONTAINERS event");
+          continue;
+        }
+
+        if (container.isRecovering()) {
+          LOG.info("drop FINISH_CONTAINERS event to " + containerId
+              + " because container is recovering");
+          continue;
+        }
+
+        this.dispatcher.getEventHandler().handle(
+              new ContainerKillEvent(containerId,
                   ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
                   "Container Killed by ResourceManager"));
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 112b43a..444581c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -20,8 +20,8 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -89,7 +89,7 @@ public class ApplicationImpl implements Application {
   private LogAggregationContext logAggregationContext;
 
   Map<ContainerId, Container> containers =
-      new HashMap<ContainerId, Container>();
+      new ConcurrentHashMap<>();
 
   /**
    * The timestamp when the log aggregation has started for this application.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 8004f33..bd3f06d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -92,4 +92,6 @@ public interface Container extends 
EventHandler<ContainerEvent> {
   void sendLaunchEvent();
 
   void sendKillEvent(int exitStatus, String description);
+
+  boolean isRecovering();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index cae30cd..055e12c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -1756,4 +1756,12 @@ public class ContainerImpl implements Container {
   public void commitUpgrade() {
     this.reInitContext = null;
   }
+
+  @Override
+  public boolean isRecovering() {
+    boolean isRecovering = (
+        recoveredStatus != RecoveredContainerStatus.REQUESTED &&
+        getContainerState() == ContainerState.NEW);
+    return isRecovering;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 31546f1..ff19e9b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -86,6 +86,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -248,8 +249,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     // simulate application completion
     List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
     finishedApps.add(appId);
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
 
     // restart and verify app is marked for finishing
@@ -263,8 +264,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     assertNotNull(app);
     // no longer saving FINISH_APP event in NM stateStore,
     // simulate by resending FINISH_APP event
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
     assertTrue(context.getApplicationACLsManager().checkAccess(
         UserGroupInformation.createRemoteUser(modUser),
@@ -335,8 +336,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     // simulate application completion
     List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
     finishedApps.add(appId);
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
 
     app.handle(new ApplicationEvent(app.getAppId(),
@@ -357,8 +358,9 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
 
     // no longer saving FINISH_APP event in NM stateStore,
     // simulate by resending FINISH_APP event
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
+
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
     // TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
     // is needed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7114badd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 686a0d9..022baea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -230,4 +230,9 @@ public class MockContainer implements Container {
   public void sendKillEvent(int exitStatus, String description) {
 
   }
+
+  @Override
+  public boolean isRecovering() {
+    return false;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to