Repository: hadoop
Updated Branches:
  refs/heads/branch-2 f894eefec -> a72dcb9ca


YARN-6078. Containers stuck in Localizing state. Contributed by Billie Rinaldi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a72dcb9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a72dcb9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a72dcb9c

Branch: refs/heads/branch-2
Commit: a72dcb9cad7df7f7236092f5b86446c2ef4ea874
Parents: f894eef
Author: Junping Du <junping...@apache.org>
Authored: Mon Nov 13 18:22:30 2017 -0800
Committer: Junping Du <junping...@apache.org>
Committed: Mon Nov 13 18:22:30 2017 -0800

----------------------------------------------------------------------
 .../localizer/ResourceLocalizationService.java  |  40 ++++++
 .../TestResourceLocalizationService.java        | 144 +++++++++++++++++++
 2 files changed, 184 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a72dcb9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 29fc747..28a27a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskValidator;
 import org.apache.hadoop.util.DiskValidatorFactory;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
@@ -808,6 +809,7 @@ public class ResourceLocalizationService extends 
CompositeService
           return; // ignore; already gone
         }
         privLocalizers.remove(locId);
+        LOG.info("Interrupting localizer for " + locId);
         localizer.interrupt();
       }
     }
@@ -1189,6 +1191,44 @@ public class ResourceLocalizationService extends 
CompositeService
     }
 
     @Override
+    public void interrupt() {
+      boolean destroyedShell = false;
+      try {
+        for (Shell shell : Shell.getAllShells()) {
+          try {
+            if (shell.getWaitingThread() != null &&
+                shell.getWaitingThread().equals(this) &&
+                shell.getProcess() != null &&
+                processIsAlive(shell.getProcess())) {
+              LOG.info("Destroying localization shell process for " +
+                  localizerId);
+              shell.getProcess().destroy();
+              destroyedShell = true;
+              break;
+            }
+          } catch (Exception e) {
+            LOG.warn("Failed to destroy localization shell process for " +
+                localizerId, e);
+          }
+        }
+      } finally {
+        if (!destroyedShell) {
+          super.interrupt();
+        }
+      }
+    }
+
+    private boolean processIsAlive(Process p) {
+      try {
+        p.exitValue();
+        return false;
+      } catch (IllegalThreadStateException e) {
+        // this means the process is still alive
+      }
+      return true;
+    }
+
+    @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       Path nmPrivateCTokensPath = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a72dcb9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 6e59500..aecf742 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.util.Shell;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
 import 
org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
@@ -1197,6 +1198,149 @@ public class TestResourceLocalizationService {
     }
   }
 
+  private static class DummyShellExecutor extends DefaultContainerExecutor {
+    private AtomicInteger numLocalizers = new AtomicInteger(0);
+    @Override
+    public void startLocalizer(LocalizerStartContext ctx) throws IOException,
+        InterruptedException {
+      numLocalizers.incrementAndGet();
+      Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+          new String[]{"bash", "-c", "sleep 300"});
+      try {
+        shexec.execute();
+        Assert.fail("Shell finished without being interrupted");
+      } catch (IOException e) {
+        System.out.println("Got expected exception executing shell " +
+            e.toString());
+      }
+      numLocalizers.decrementAndGet();
+    }
+    private void waitForLocalizers(int num) {
+      while (numLocalizers.intValue() != num) {
+        Thread.yield();
+      }
+    }
+    private void waitForShellCount(int num) {
+      while (Shell.getAllShells().size() != num) {
+        Thread.yield();
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  @SuppressWarnings("unchecked")
+  public void testShellDestroyedOnContainerKill() throws Exception {
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[1];
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+    DummyShellExecutor exec = new DummyShellExecutor();
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
+    delService.init(new Configuration());
+    delService.start();
+
+    DrainDispatcher dispatcher = getDispatcher(conf);
+    ResourceLocalizationService rawService = new ResourceLocalizationService(
+        dispatcher, exec, delService, dirsHandler, nmContext, metrics);
+
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration
+        .class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+            defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+            nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      String user = "user0";
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      List<LocalResource> resources = initializeLocalizer(appId);
+      LocalResource resource1 = resources.get(0);
+      final Container c1 = getMockContainer(appId, 42, "user0");
+
+      EventHandler<ApplicationEvent> applicationBus =
+          getApplicationBus(dispatcher);
+      EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
+      initApp(spyService, applicationBus, app, appId, dispatcher);
+
+      // Send localization request for container c1.
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+          new HashMap<>();
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<>();
+      privateResourceList.add(req1);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+
+      // Wait for localizer of container c1 to begin.
+      exec.waitForLocalizers(1);
+      exec.waitForShellCount(1);
+      LocalizerRunner localizerRunner =
+          spyService.getLocalizerRunner(c1.getContainerId().toString());
+
+      // Container c1 is killed which leads to cleanup
+      spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+      dispatcher.await();
+
+      // Wait for localizer of container c1 to stop.
+      exec.waitForShellCount(0);
+      exec.waitForLocalizers(0);
+
+      // Check that the thread is no longer running
+      while (localizerRunner.isAlive()) {
+        Thread.sleep(10);
+      }
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
   private DrainDispatcher getDispatcher(Configuration config) {
     DrainDispatcher dispatcher = new DrainDispatcher();
     dispatcher.init(config);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to