Author: vinodkv
Date: Thu Jul 25 04:16:37 2013
New Revision: 1506816

URL: http://svn.apache.org/r1506816
Log:
YARN-688. Fixed NodeManager to properly cleanup containers when it is shut 
down. Contributed by Jian He.
svn merge --ignore-ancestry -c 1506814 ../../trunk/

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1506816&r1=1506815&r2=1506816&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Thu 
Jul 25 04:16:37 2013
@@ -717,6 +717,14 @@ Release 2.1.0-beta - 2013-07-02
     YARN-875. Application can hang if AMRMClientAsync callback thread has
     exception (Xuan Gong via bikas)
 
+    YARN-461. Fair scheduler should not accept apps with empty string queue 
name. 
+    (ywskycn via tucu)
+
+    YARN-968. RM admin commands don't work. (vinodkv via kihwal)
+
+    YARN-688. Fixed NodeManager to properly cleanup containers when it is shut
+    down. (Jian He via vinodkv)
+
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.
@@ -782,8 +790,6 @@ Release 2.1.0-beta - 2013-07-02
     YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan 
Liu
     via cnauroth)
 
-    YARN-968. RM admin commands don't work. (vinodkv via kihwal)
-
 Release 2.0.5-alpha - 06/06/2013
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1506816&r1=1506815&r2=1506816&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 Thu Jul 25 04:16:37 2013
@@ -229,6 +229,15 @@ public class NodeManager extends Composi
     return "NodeManager";
   }
 
+  protected void shutDown() {
+    new Thread() {
+      @Override
+      public void run() {
+        NodeManager.this.stop();
+      }
+    }.start();
+  }
+
   protected void resyncWithRM() {
     //we do not want to block dispatcher thread here
     new Thread() {
@@ -265,6 +274,8 @@ public class NodeManager extends Composi
       while (!containers.isEmpty()
           && System.currentTimeMillis() - waitStartTime < 
waitForContainersOnShutdownMillis) {
         try {
+          //To remove done containers in NM context
+          nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
           Thread.sleep(1000);
         } catch (InterruptedException ex) {
           LOG.warn("Interrupted while sleeping on container kill on shutdown",
@@ -276,7 +287,6 @@ public class NodeManager extends Composi
       while (!containers.isEmpty()) {
         try {
           Thread.sleep(1000);
-          //to remove done containers from the map
           nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
         } catch (InterruptedException ex) {
           LOG.warn("Interrupted while sleeping on container kill on resync",
@@ -409,7 +419,7 @@ public class NodeManager extends Composi
   public void handle(NodeManagerEvent event) {
     switch (event.getType()) {
     case SHUTDOWN:
-      stop();
+      shutDown();
       break;
     case RESYNC:
       resyncWithRM();

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1506816&r1=1506815&r2=1506816&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 Thu Jul 25 04:16:37 2013
@@ -385,7 +385,7 @@ public class NodeStatusUpdaterImpl exten
             }
             if (response.getNodeAction() == NodeAction.RESYNC) {
               LOG.warn("Node is out of sync with ResourceManager,"
-                  + " hence rebooting.");
+                  + " hence resyncing.");
               LOG.warn("Message from ResourceManager: "
                   + response.getDiagnosticsMessage());
               // Invalidate the RMIdentifier while resync
@@ -418,6 +418,7 @@ public class NodeStatusUpdaterImpl exten
                 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
             throw new YarnRuntimeException(e);
           } catch (Throwable e) {
+
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1506816&r1=1506815&r2=1506816&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 Thu Jul 25 04:16:37 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -97,8 +99,12 @@ public class TestNodeStatusUpdater {
   }
 
   static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
-  static final Path basedir =
-      new Path("target", TestNodeStatusUpdater.class.getName());
+  static final File basedir =
+      new File("target", TestNodeStatusUpdater.class.getName());
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File tmpDir = new File(basedir, "tmpDir");
+  static final File remoteLogsDir = new File(basedir, "remotelogs");
+  static final File logsDir = new File(basedir, "logs");
   private static final RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
 
@@ -110,9 +116,14 @@ public class TestNodeStatusUpdater {
   private NodeManager nm;
   private boolean containerStatusBackupSuccessfully = true;
   private List<ContainerStatus> completedContainerStatusList = new 
ArrayList<ContainerStatus>();
+  private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
 
   @Before
   public void setUp() {
+    nmLocalDir.mkdirs();
+    tmpDir.mkdirs();
+    logsDir.mkdirs();
+    remoteLogsDir.mkdirs();
     conf = createNMConfig();
   }
 
@@ -121,6 +132,7 @@ public class TestNodeStatusUpdater {
     this.registeredNodes.clear();
     heartBeatID = 0;
     ServiceOperations.stop(nm);
+    assertionFailedInThread.set(false);
     DefaultMetricsSystem.shutdown();
   }
 
@@ -442,6 +454,13 @@ public class TestNodeStatusUpdater {
     protected void serviceStop() throws Exception {
       super.serviceStop();
       isStopped = true;
+      ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+      .containermanager.container.Container> containers =
+          getNMContext().getContainers();
+      // ensure that containers are empty
+      if(!containers.isEmpty()) {
+        assertionFailedInThread.set(true);
+      }
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     }
   }
@@ -723,7 +742,7 @@ public class TestNodeStatusUpdater {
   @After
   public void deleteBaseDir() throws IOException {
     FileContext lfs = FileContext.getLocalFSFileContext();
-    lfs.delete(basedir, true);
+    lfs.delete(new Path(basedir.getPath()), true);
   }
 
   @Test
@@ -1095,7 +1114,7 @@ public class TestNodeStatusUpdater {
 
   @Test(timeout = 200000)
   public void testNodeStatusUpdaterRetryAndNMShutdown() 
-      throws InterruptedException {
+      throws Exception {
     final long connectionWaitSecs = 1;
     final long connectionRetryIntervalSecs = 1;
     YarnConfiguration conf = createNMConfig();
@@ -1104,14 +1123,23 @@ public class TestNodeStatusUpdater {
     conf.setLong(YarnConfiguration
         .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
         connectionRetryIntervalSecs);
+    conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
     nm = new MyNodeManager2(syncBarrier, conf);
     nm.init(conf);
     nm.start();
+    // start a container
+    ContainerId cId = TestNodeManagerShutdown.createContainerId();
+    FileContext localFS = FileContext.getLocalFSFileContext();
+    TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
+      new File("start_file.txt"));
+
     try {
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
     }
+    Assert.assertFalse("Containers not cleaned up when NM stopped",
+      assertionFailedInThread.get());
     Assert.assertTrue(((MyNodeManager2) nm).isStopped);
     Assert.assertTrue("calculate heartBeatCount based on" +
         " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
@@ -1229,15 +1257,13 @@ public class TestNodeStatusUpdater {
 
   private YarnConfiguration createNMConfig() {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
     conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
     conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
-    conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
-        .getPath());
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
-        "remotelogs").toUri().getPath());
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
-        .toUri().getPath());
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      remoteLogsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
     return conf;
   }
   

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1506816&r1=1506815&r2=1506816&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 Thu Jul 25 04:16:37 2013
@@ -253,7 +253,7 @@ public class ResourceTrackerService exte
     RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
     if (rmNode == null) {
       /* node does not exist */
-      String message = "Node not found rebooting " + 
remoteNodeStatus.getNodeId();
+      String message = "Node not found resyncing " + 
remoteNodeStatus.getNodeId();
       LOG.info(message);
       resync.setDiagnosticsMessage(message);
       return resync;


Reply via email to