Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 13b256ed2 -> c43cc8fd6


YARN-2902. Killing a container that is localizing can orphan resources in the 
DOWNLOADING state. Contributed by Varun Saxena
(cherry picked from commit e2267de2076245bd9857f6a30e3c731df017fef8)

Conflicts:

        
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
        
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c43cc8fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c43cc8fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c43cc8fd

Branch: refs/heads/branch-2.7
Commit: c43cc8fd67017207457f4e7b98fb89747391e780
Parents: 13b256e
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Oct 29 16:48:57 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Oct 29 16:48:57 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../nodemanager/DefaultContainerExecutor.java   |   9 +-
 .../nodemanager/DockerContainerExecutor.java    |   9 +-
 .../localizer/LocalResourcesTrackerImpl.java    |  10 +
 .../localizer/ResourceLocalizationService.java  |  13 +
 .../impl/container-executor.c                   |   9 +-
 .../test/test-container-executor.c              |  11 +
 .../TestLocalResourcesTrackerImpl.java          |   6 +-
 .../TestResourceLocalizationService.java        | 295 ++++++++++++++++++-
 9 files changed, 351 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b865af4..0421e41 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -94,6 +94,9 @@ Release 2.7.2 - UNRELEASED
     YARN-4041. Slow delegation token renewal can severely prolong RM recovery
     (Sunil G via jlowe)
 
+    YARN-2902. Killing a container that is localizing can orphan resources in
+    the DOWNLOADING state (Varun Saxena via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index e0ecea3..b5120f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -464,8 +465,12 @@ public class DefaultContainerExecutor extends 
ContainerExecutor {
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      if (!lfs.delete(del, true)) {
-        LOG.warn("delete returned false for path: [" + del + "]");
+      try {
+        if (!lfs.delete(del, true)) {
+          LOG.warn("delete returned false for path: [" + del + "]");
+        }
+      } catch (FileNotFoundException e) {
+        continue;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
index c854173..323c683f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -408,8 +409,12 @@ public class DockerContainerExecutor extends 
ContainerExecutor {
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      if (!lfs.delete(del, true)) {
-        LOG.warn("delete returned false for path: [" + del + "]");
+      try {
+        if (!lfs.delete(del, true)) {
+          LOG.warn("delete returned false for path: [" + del + "]");
+        }
+      } catch (FileNotFoundException e) {
+        continue;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 8f209e7..f9346d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -173,6 +173,16 @@ class LocalResourcesTrackerImpl implements 
LocalResourcesTracker {
 
     rsrc.handle(event);
 
+    // Remove the resource if its downloading and its reference count has
+    // become 0 after RELEASE. This maybe because a container was killed while
+    // localizing and no other container is referring to the resource.
+    if (event.getType() == ResourceEventType.RELEASE) {
+      if (rsrc.getState() == ResourceState.DOWNLOADING &&
+          rsrc.getRefCount() <= 0) {
+        removeResource(req);
+      }
+    }
+
     if (event.getType() == ResourceEventType.LOCALIZED) {
       if (rsrc.getLocalPath() != null) {
         try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 1ba6b01..95f55a0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1135,9 +1135,22 @@ public class ResourceLocalizationService extends 
CompositeService
           dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
               cId, null, exception.getMessage()));
         }
+        List<Path> paths = new ArrayList<Path>();
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
+          // This means some resources were in downloading state. Schedule
+          // deletion task for localization dir and tmp dir used for 
downloading
+          Path locRsrcPath = event.getResource().getLocalPath();
+          if (locRsrcPath != null) {
+            Path locRsrcDirPath = locRsrcPath.getParent();
+            paths.add(locRsrcDirPath);
+            paths.add(new Path(locRsrcDirPath + "_tmp"));
+          }
           event.getResource().unlock();
         }
+        if (!paths.isEmpty()) {
+          delService.delete(context.getUser(),
+              null, paths.toArray(new Path[paths.size()]));
+        }
         delService.delete(null, nmPrivateCTokensPath, new Path[] {});
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
index 1c214c6..580422cc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
@@ -1368,8 +1368,13 @@ int delete_as_user(const char *user,
     char* full_path = NULL;
     struct stat sb;
     if (stat(*ptr, &sb) != 0) {
-      fprintf(LOGFILE, "Could not stat %s\n", *ptr);
-      return -1;
+      if (errno == ENOENT) {
+        // Ignore missing dir. Continue deleting other directories.
+        continue;
+      } else {
+        fprintf(LOGFILE, "Could not stat %s - %s\n", *ptr, strerror(errno));
+        return -1;
+      }
     }
     if (!S_ISDIR(sb.st_mode)) {
       if (!subDirEmptyStr) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
index be6cc49..4ac6527 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
@@ -408,6 +408,17 @@ void test_delete_user() {
     exit(1);
   }
 
+  sprintf(buffer, "%s", app_dir);
+  char missing_dir[20];
+  strcpy(missing_dir, "/some/missing/dir");
+  char * dirs_with_missing[] = {missing_dir, buffer, 0};
+  ret = delete_as_user(yarn_username, "" , dirs_with_missing);
+  printf("%d" , ret);
+  if (access(buffer, R_OK) == 0) {
+    printf("FAIL: directory not deleted\n");
+    exit(1);
+  }
+
   sprintf(buffer, "%s/local-1/usercache/%s", TEST_ROOT, yarn_username);
   if (access(buffer, R_OK) != 0) {
     printf("FAIL: directory missing before test\n");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index ef59499..bd33213 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -138,12 +138,12 @@ public class TestLocalResourcesTrackerImpl {
       tracker.handle(rel21Event);
 
       dispatcher.await();
-      verifyTrackedResourceCount(tracker, 2);
+      verifyTrackedResourceCount(tracker, 1);
 
       // Verify resource with non zero ref count is not removed.
       Assert.assertEquals(2, lr1.getRefCount());
       Assert.assertFalse(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 2);
+      verifyTrackedResourceCount(tracker, 1);
 
       // Localize resource1
       ResourceLocalizedEvent rle =
@@ -158,7 +158,7 @@ public class TestLocalResourcesTrackerImpl {
 
       // Verify resources in state LOCALIZED with ref-count=0 is removed.
       Assert.assertTrue(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 1);
+      verifyTrackedResourceCount(tracker, 0);
     } finally {
       if (dispatcher != null) {
         dispatcher.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 5823bae..7cbbce7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -43,6 +43,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -100,6 +102,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -149,9 +152,12 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.mockito.internal.matchers.VarargMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Sets;
+
 public class TestResourceLocalizationService {
 
   static final Path basedir =
@@ -476,16 +482,14 @@ public class TestResourceLocalizationService {
         Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
         pubRsrcs.remove(lr.getRequest());
       }
-      Assert.assertEquals(0, pubRsrcs.size());
-      Assert.assertEquals(2, pubRsrcCount);
+      Assert.assertEquals(2, pubRsrcs.size());
+      Assert.assertEquals(0, pubRsrcCount);
 
       appRsrcCount = 0;
       for (LocalizedResource lr : appTracker) {
         appRsrcCount++;
-        Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
-        Assert.assertEquals(appReq, lr.getRequest());
       }
-      Assert.assertEquals(1, appRsrcCount);
+      Assert.assertEquals(0, appRsrcCount);
     } finally {
       dispatcher.stop();
       delService.stop();
@@ -1056,6 +1060,287 @@ public class TestResourceLocalizationService {
     }
   }
 
+  private static class DownloadingPathsMatcher extends ArgumentMatcher<Path[]>
+      implements VarargMatcher {
+    static final long serialVersionUID = 0;
+
+    private transient Set<Path> matchPaths;
+
+    DownloadingPathsMatcher(Set<Path> matchPaths) {
+      this.matchPaths = matchPaths;
+    }
+
+    @Override
+    public boolean matches(Object varargs) {
+      Path[] downloadingPaths = (Path[]) varargs;
+      if (matchPaths.size() != downloadingPaths.length) {
+        return false;
+      }
+      for (Path downloadingPath : downloadingPaths) {
+        if (!matchPaths.contains(downloadingPath)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private void readObject(ObjectInputStream os) throws 
NotSerializableException {
+      throw new NotSerializableException(this.getClass().getName());
+    }
+  }
+
+  private static class DummyExecutor extends DefaultContainerExecutor {
+    private volatile boolean stopLocalization = false;
+    @Override
+    public void startLocalizer(Path nmPrivateContainerTokensPath,
+        InetSocketAddress nmAddr, String user, String appId, String locId,
+        LocalDirsHandlerService dirsHandler) throws IOException,
+        InterruptedException {
+      while (!stopLocalization) {
+        Thread.yield();
+      }
+    }
+    void setStopLocalization() {
+      stopLocalization = true;
+    }
+  }
+
+  @Test(timeout = 20000)
+  @SuppressWarnings("unchecked")
+  public void testDownloadingResourcesOnContainerKill() throws Exception {
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[1];
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    DummyExecutor exec = new DummyExecutor();
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
+    delService.init(new Configuration());
+    delService.start();
+
+    ResourceLocalizationService rawService = new ResourceLocalizationService(
+        dispatcher, exec, delService, dirsHandler, nmContext);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      String user = "user0";
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      ArgumentMatcher<ApplicationEvent> matchesAppInit =
+        new ArgumentMatcher<ApplicationEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ApplicationEvent evt = (ApplicationEvent) o;
+            return evt.getType() == ApplicationEventType.APPLICATION_INITED
+              && appId == evt.getApplicationID();
+          }
+        };
+      dispatcher.await();
+      verify(applicationBus).handle(argThat(matchesAppInit));
+
+      // Initialize localizer.
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      final Container c1 = getMockContainer(appId, 42, "user0");
+      final Container c2 = getMockContainer(appId, 43, "user0");
+      FSDataOutputStream out =
+        new FSDataOutputStream(new DataOutputBuffer(), null);
+      doReturn(out).when(spylfs).createInternal(isA(Path.class),
+          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
+          anyBoolean());
+      final LocalResource resource1 = getPrivateMockedResource(r);
+      LocalResource resource2 = null;
+      do {
+        resource2 = getPrivateMockedResource(r);
+      } while (resource2 == null || resource2.equals(resource1));
+      LocalResource resource3 = null;
+      do {
+        resource3 = getPrivateMockedResource(r);
+      } while (resource3 == null || resource3.equals(resource1)
+          || resource3.equals(resource2));
+
+      // Send localization requests for container c1 and c2.
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+        new HashMap<LocalResourceVisibility,
+                    Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList.add(req1);
+      privateResourceList.add(req2);
+      privateResourceList.add(req3);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+
+      final LocalResourceRequest req1_1 = new LocalResourceRequest(resource2);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 =
+        new HashMap<LocalResourceVisibility,
+                    Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> privateResourceList1 =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList1.add(req1_1);
+      rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1);
+      spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
+
+      dispatcher.await();
+      final String containerIdStr = c1.getContainerId().toString();
+      // Heartbeats from container localizer
+      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+      LocalizerStatus stat = mock(LocalizerStatus.class);
+      when(stat.getLocalizerId()).thenReturn(containerIdStr);
+      when(rsrc1success.getResource()).thenReturn(resource1);
+      when(rsrc2pending.getResource()).thenReturn(resource2);
+      when(rsrc1success.getLocalSize()).thenReturn(4344L);
+      URL locPath = getPath("/some/path");
+      when(rsrc1success.getLocalPath()).thenReturn(locPath);
+      when(rsrc1success.getStatus()).
+          thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc2pending.getStatus()).
+          thenReturn(ResourceStatusType.FETCH_PENDING);
+
+      when(stat.getResources())
+        .thenReturn(Collections.<LocalResourceStatus>emptyList())
+        .thenReturn(Collections.singletonList(rsrc1success))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(Collections.<LocalResourceStatus>emptyList());
+
+      // First heartbeat which schedules first resource.
+      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      // Second heartbeat which reports first resource as success.
+      // Second resource is scheduled.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      final String locPath1 = response.getResourceSpecs().get(0).
+          getDestinationDirectory().getFile();
+
+      // Third heartbeat which reports second resource as pending.
+      // Third resource is scheduled.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      final String locPath2 = response.getResourceSpecs().get(0).
+          getDestinationDirectory().getFile();
+
+      // Container c1 is killed which leads to cleanup
+      spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+
+      // This heartbeat will indicate to container localizer to die as 
localizer
+      // runner has stopped.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+      exec.setStopLocalization();
+      dispatcher.await();
+      // verify container notification
+      ArgumentMatcher<ContainerEvent> successContainerLoc =
+        new ArgumentMatcher<ContainerEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ContainerEvent evt = (ContainerEvent) o;
+            return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+              && c1.getContainerId() == evt.getContainerID();
+          }
+        };
+      // Only one resource gets localized for container c1.
+      verify(containerBus).handle(argThat(successContainerLoc));
+
+      Set<Path> paths =
+          Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
+              new Path(locPath2), new Path(locPath2 + "_tmp"));
+      // Verify if downloading resources were submitted for deletion.
+      verify(delService).delete(eq(user),
+          (Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
+
+      LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PRIVATE, "user0", appId);
+      // Container c1 was killed but this resource was localized before kill
+      // hence its not removed despite ref cnt being 0.
+      LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
+      assertNotNull(rsrc1);
+      assertEquals(rsrc1.getState(), ResourceState.LOCALIZED);
+      assertEquals(rsrc1.getRefCount(), 0);
+
+      // Container c1 was killed but this resource is referenced by container 
c2
+      // as well hence its ref cnt is 1.
+      LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
+      assertNotNull(rsrc2);
+      assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING);
+      assertEquals(rsrc2.getRefCount(), 1);
+
+      // As container c1 was killed and this resource was not referenced by any
+      // other container, hence its removed.
+      LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
+      assertNull(rsrc3);
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testPublicResourceInitializesLocalDir() throws Exception {

Reply via email to