Repository: hadoop Updated Branches: refs/heads/branch-2.7 13b256ed2 -> c43cc8fd6
YARN-2902. Killing a container that is localizing can orphan resources in the DOWNLOADING state. Contributed by Varun Saxena (cherry picked from commit e2267de2076245bd9857f6a30e3c731df017fef8) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c43cc8fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c43cc8fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c43cc8fd Branch: refs/heads/branch-2.7 Commit: c43cc8fd67017207457f4e7b98fb89747391e780 Parents: 13b256e Author: Jason Lowe <jl...@apache.org> Authored: Thu Oct 29 16:48:57 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Thu Oct 29 16:48:57 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../nodemanager/DefaultContainerExecutor.java | 9 +- .../nodemanager/DockerContainerExecutor.java | 9 +- .../localizer/LocalResourcesTrackerImpl.java | 10 + .../localizer/ResourceLocalizationService.java | 13 + .../impl/container-executor.c | 9 +- .../test/test-container-executor.c | 11 + .../TestLocalResourcesTrackerImpl.java | 6 +- .../TestResourceLocalizationService.java | 295 ++++++++++++++++++- 9 files changed, 351 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b865af4..0421e41 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -94,6 +94,9 @@ Release 2.7.2 - UNRELEASED YARN-4041. Slow delegation token renewal can severely prolong RM recovery (Sunil G via jlowe) + YARN-2902. Killing a container that is localizing can orphan resources in + the DOWNLOADING state (Varun Saxena via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index e0ecea3..b5120f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; @@ -464,8 +465,12 @@ public class DefaultContainerExecutor extends ContainerExecutor { for (Path baseDir : baseDirs) { Path del = subDir == null ? baseDir : new Path(baseDir, subDir); LOG.info("Deleting path : " + del); - if (!lfs.delete(del, true)) { - LOG.warn("delete returned false for path: [" + del + "]"); + try { + if (!lfs.delete(del, true)) { + LOG.warn("delete returned false for path: [" + del + "]"); + } + } catch (FileNotFoundException e) { + continue; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java index c854173..323c683f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -408,8 +409,12 @@ public class DockerContainerExecutor extends ContainerExecutor { for (Path baseDir : baseDirs) { Path del = subDir == null ? baseDir : new Path(baseDir, subDir); LOG.info("Deleting path : " + del); - if (!lfs.delete(del, true)) { - LOG.warn("delete returned false for path: [" + del + "]"); + try { + if (!lfs.delete(del, true)) { + LOG.warn("delete returned false for path: [" + del + "]"); + } + } catch (FileNotFoundException e) { + continue; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 8f209e7..f9346d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -173,6 +173,16 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { rsrc.handle(event); + // Remove the resource if its downloading and its reference count has + // become 0 after RELEASE. This maybe because a container was killed while + // localizing and no other container is referring to the resource. + if (event.getType() == ResourceEventType.RELEASE) { + if (rsrc.getState() == ResourceState.DOWNLOADING && + rsrc.getRefCount() <= 0) { + removeResource(req); + } + } + if (event.getType() == ResourceEventType.LOCALIZED) { if (rsrc.getLocalPath() != null) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 1ba6b01..95f55a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1135,9 +1135,22 @@ public class ResourceLocalizationService extends CompositeService dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent( cId, null, exception.getMessage())); } + List<Path> paths = new ArrayList<Path>(); for (LocalizerResourceRequestEvent event : scheduled.values()) { + // This means some resources were in downloading state. Schedule + // deletion task for localization dir and tmp dir used for downloading + Path locRsrcPath = event.getResource().getLocalPath(); + if (locRsrcPath != null) { + Path locRsrcDirPath = locRsrcPath.getParent(); + paths.add(locRsrcDirPath); + paths.add(new Path(locRsrcDirPath + "_tmp")); + } event.getResource().unlock(); } + if (!paths.isEmpty()) { + delService.delete(context.getUser(), + null, paths.toArray(new Path[paths.size()])); + } delService.delete(null, nmPrivateCTokensPath, new Path[] {}); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c index 1c214c6..580422cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c @@ -1368,8 +1368,13 @@ int delete_as_user(const char *user, char* full_path = NULL; struct stat sb; if (stat(*ptr, &sb) != 0) { - fprintf(LOGFILE, "Could not stat %s\n", *ptr); - return -1; + if (errno == ENOENT) { + // Ignore missing dir. Continue deleting other directories. + continue; + } else { + fprintf(LOGFILE, "Could not stat %s - %s\n", *ptr, strerror(errno)); + return -1; + } } if (!S_ISDIR(sb.st_mode)) { if (!subDirEmptyStr) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c index be6cc49..4ac6527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c @@ -408,6 +408,17 @@ void test_delete_user() { exit(1); } + sprintf(buffer, "%s", app_dir); + char missing_dir[20]; + strcpy(missing_dir, "/some/missing/dir"); + char * dirs_with_missing[] = {missing_dir, buffer, 0}; + ret = delete_as_user(yarn_username, "" , dirs_with_missing); + printf("%d" , ret); + if (access(buffer, R_OK) == 0) { + printf("FAIL: directory not deleted\n"); + exit(1); + } + sprintf(buffer, "%s/local-1/usercache/%s", TEST_ROOT, yarn_username); if (access(buffer, R_OK) != 0) { printf("FAIL: directory missing before test\n"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index ef59499..bd33213 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -138,12 +138,12 @@ public class TestLocalResourcesTrackerImpl { tracker.handle(rel21Event); dispatcher.await(); - verifyTrackedResourceCount(tracker, 2); + verifyTrackedResourceCount(tracker, 1); // Verify resource with non zero ref count is not removed. Assert.assertEquals(2, lr1.getRefCount()); Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 2); + verifyTrackedResourceCount(tracker, 1); // Localize resource1 ResourceLocalizedEvent rle = @@ -158,7 +158,7 @@ public class TestLocalResourcesTrackerImpl { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + verifyTrackedResourceCount(tracker, 0); } finally { if (dispatcher != null) { dispatcher.stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c43cc8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 5823bae..7cbbce7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -43,6 +43,8 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectInputStream; import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URI; @@ -100,6 +102,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -149,9 +152,12 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.mockito.internal.matchers.VarargMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.Sets; + public class TestResourceLocalizationService { static final Path basedir = @@ -476,16 +482,14 @@ public class TestResourceLocalizationService { Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount()); pubRsrcs.remove(lr.getRequest()); } - Assert.assertEquals(0, pubRsrcs.size()); - Assert.assertEquals(2, pubRsrcCount); + Assert.assertEquals(2, pubRsrcs.size()); + Assert.assertEquals(0, pubRsrcCount); appRsrcCount = 0; for (LocalizedResource lr : appTracker) { appRsrcCount++; - Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount()); - Assert.assertEquals(appReq, lr.getRequest()); } - Assert.assertEquals(1, appRsrcCount); + Assert.assertEquals(0, appRsrcCount); } finally { dispatcher.stop(); delService.stop(); @@ -1056,6 +1060,287 @@ public class TestResourceLocalizationService { } } + private static class DownloadingPathsMatcher extends ArgumentMatcher<Path[]> + implements VarargMatcher { + static final long serialVersionUID = 0; + + private transient Set<Path> matchPaths; + + DownloadingPathsMatcher(Set<Path> matchPaths) { + this.matchPaths = matchPaths; + } + + @Override + public boolean matches(Object varargs) { + Path[] downloadingPaths = (Path[]) varargs; + if (matchPaths.size() != downloadingPaths.length) { + return false; + } + for (Path downloadingPath : downloadingPaths) { + if (!matchPaths.contains(downloadingPath)) { + return false; + } + } + return true; + } + + private void readObject(ObjectInputStream os) throws NotSerializableException { + throw new NotSerializableException(this.getClass().getName()); + } + } + + private static class DummyExecutor extends DefaultContainerExecutor { + private volatile boolean stopLocalization = false; + @Override + public void startLocalizer(Path nmPrivateContainerTokensPath, + InetSocketAddress nmAddr, String user, String appId, String locId, + LocalDirsHandlerService dirsHandler) throws IOException, + InterruptedException { + while (!stopLocalization) { + Thread.yield(); + } + } + void setStopLocalization() { + stopLocalization = true; + } + } + + @Test(timeout = 20000) + @SuppressWarnings("unchecked") + public void testDownloadingResourcesOnContainerKill() throws Exception { + List<Path> localDirs = new ArrayList<Path>(); + String[] sDirs = new String[1]; + localDirs.add(lfs.makeQualified(new Path(basedir, 0 + ""))); + sDirs[0] = localDirs.get(0).toString(); + + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler<ContainerEvent> containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + DummyExecutor exec = new DummyExecutor(); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); + delService.init(new Configuration()); + delService.start(); + + ResourceLocalizationService rawService = new ResourceLocalizationService( + dispatcher, exec, delService, dirsHandler, nmContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final Path userDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.USERCACHE); + final Path fileDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.FILECACHE); + final Path sysDir = + new Path(sDirs[0].substring("file:".length()), + ResourceLocalizationService.NM_PRIVATE_DIR); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", new Path(sDirs[0])); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", sysDir); + + doAnswer(new Answer<FileStatus>() { + @Override + public FileStatus answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args.length > 0) { + if (args[0].equals(userDir) || args[0].equals(fileDir)) { + return fs; + } + } + return nmFs; + } + }).when(spylfs).getFileStatus(isA(Path.class)); + + try { + spyService.init(conf); + spyService.start(); + + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + String user = "user0"; + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + ArgumentMatcher<ApplicationEvent> matchesAppInit = + new ArgumentMatcher<ApplicationEvent>() { + @Override + public boolean matches(Object o) { + ApplicationEvent evt = (ApplicationEvent) o; + return evt.getType() == ApplicationEventType.APPLICATION_INITED + && appId == evt.getApplicationID(); + } + }; + dispatcher.await(); + verify(applicationBus).handle(argThat(matchesAppInit)); + + // Initialize localizer. + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + final Container c1 = getMockContainer(appId, 42, "user0"); + final Container c2 = getMockContainer(appId, 43, "user0"); + FSDataOutputStream out = + new FSDataOutputStream(new DataOutputBuffer(), null); + doReturn(out).when(spylfs).createInternal(isA(Path.class), + isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), + anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), + anyBoolean()); + final LocalResource resource1 = getPrivateMockedResource(r); + LocalResource resource2 = null; + do { + resource2 = getPrivateMockedResource(r); + } while (resource2 == null || resource2.equals(resource1)); + LocalResource resource3 = null; + do { + resource3 = getPrivateMockedResource(r); + } while (resource3 == null || resource3.equals(resource1) + || resource3.equals(resource2)); + + // Send localization requests for container c1 and c2. + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + final LocalResourceRequest req2 = new LocalResourceRequest(resource2); + final LocalResourceRequest req3 = new LocalResourceRequest(resource3); + Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = + new HashMap<LocalResourceVisibility, + Collection<LocalResourceRequest>>(); + List<LocalResourceRequest> privateResourceList = + new ArrayList<LocalResourceRequest>(); + privateResourceList.add(req1); + privateResourceList.add(req2); + privateResourceList.add(req3); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); + + final LocalResourceRequest req1_1 = new LocalResourceRequest(resource2); + Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 = + new HashMap<LocalResourceVisibility, + Collection<LocalResourceRequest>>(); + List<LocalResourceRequest> privateResourceList1 = + new ArrayList<LocalResourceRequest>(); + privateResourceList1.add(req1_1); + rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1); + spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1)); + + dispatcher.await(); + final String containerIdStr = c1.getContainerId().toString(); + // Heartbeats from container localizer + LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class); + LocalizerStatus stat = mock(LocalizerStatus.class); + when(stat.getLocalizerId()).thenReturn(containerIdStr); + when(rsrc1success.getResource()).thenReturn(resource1); + when(rsrc2pending.getResource()).thenReturn(resource2); + when(rsrc1success.getLocalSize()).thenReturn(4344L); + URL locPath = getPath("/some/path"); + when(rsrc1success.getLocalPath()).thenReturn(locPath); + when(rsrc1success.getStatus()). + thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrc2pending.getStatus()). + thenReturn(ResourceStatusType.FETCH_PENDING); + + when(stat.getResources()) + .thenReturn(Collections.<LocalResourceStatus>emptyList()) + .thenReturn(Collections.singletonList(rsrc1success)) + .thenReturn(Collections.singletonList(rsrc2pending)) + .thenReturn(Collections.singletonList(rsrc2pending)) + .thenReturn(Collections.<LocalResourceStatus>emptyList()); + + // First heartbeat which schedules first resource. + LocalizerHeartbeatResponse response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + // Second heartbeat which reports first resource as success. + // Second resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath1 = response.getResourceSpecs().get(0). + getDestinationDirectory().getFile(); + + // Third heartbeat which reports second resource as pending. + // Third resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath2 = response.getResourceSpecs().get(0). + getDestinationDirectory().getFile(); + + // Container c1 is killed which leads to cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); + + // This heartbeat will indicate to container localizer to die as localizer + // runner has stopped. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); + + exec.setStopLocalization(); + dispatcher.await(); + // verify container notification + ArgumentMatcher<ContainerEvent> successContainerLoc = + new ArgumentMatcher<ContainerEvent>() { + @Override + public boolean matches(Object o) { + ContainerEvent evt = (ContainerEvent) o; + return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED + && c1.getContainerId() == evt.getContainerID(); + } + }; + // Only one resource gets localized for container c1. + verify(containerBus).handle(argThat(successContainerLoc)); + + Set<Path> paths = + Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"), + new Path(locPath2), new Path(locPath2 + "_tmp")); + // Verify if downloading resources were submitted for deletion. + verify(delService).delete(eq(user), + (Path) eq(null), argThat(new DownloadingPathsMatcher(paths))); + + LocalResourcesTracker tracker = spyService.getLocalResourcesTracker( + LocalResourceVisibility.PRIVATE, "user0", appId); + // Container c1 was killed but this resource was localized before kill + // hence its not removed despite ref cnt being 0. + LocalizedResource rsrc1 = tracker.getLocalizedResource(req1); + assertNotNull(rsrc1); + assertEquals(rsrc1.getState(), ResourceState.LOCALIZED); + assertEquals(rsrc1.getRefCount(), 0); + + // Container c1 was killed but this resource is referenced by container c2 + // as well hence its ref cnt is 1. + LocalizedResource rsrc2 = tracker.getLocalizedResource(req2); + assertNotNull(rsrc2); + assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING); + assertEquals(rsrc2.getRefCount(), 1); + + // As container c1 was killed and this resource was not referenced by any + // other container, hence its removed. + LocalizedResource rsrc3 = tracker.getLocalizedResource(req3); + assertNull(rsrc3); + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } + @Test @SuppressWarnings("unchecked") public void testPublicResourceInitializesLocalDir() throws Exception {