Repository: hadoop Updated Branches: refs/heads/branch-2 fd7ba56f6 -> 3820bf055
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3820bf05/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index ab86a18..8a5441a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -58,6 +59,8 @@ import org.junit.Assert; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -105,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -136,12 +140,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest { super(); this.remoteRootLogDir.mkdir(); } + + DrainDispatcher dispatcher; + EventHandler<ApplicationEvent> appEventHandler; @Override + @SuppressWarnings("unchecked") public void setup() throws IOException { super.setup(); NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555); ((NMContext)context).setNodeId(nodeId); + dispatcher = createDispatcher(); + appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); } @Override @@ -149,10 +160,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { super.tearDown(); createContainerExecutor().deleteAsUser(user, new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {}); + dispatcher.await(); + dispatcher.stop(); + dispatcher.close(); } @Test - @SuppressWarnings("unchecked") public void testLocalFileDeletionAfterUpload() throws Exception { this.delSrvc = new DeletionService(createContainerExecutor()); delSrvc = spy(delSrvc); @@ -161,10 +174,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); @@ -236,16 +245,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testNoContainerOnNode() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler); @@ -285,6 +289,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); dispatcher.stop(); + logAggregationService.close(); } @Test @@ -294,6 +299,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); + String[] fileNames = new String[] { "stdout", "stderr", "syslog" }; DrainDispatcher dispatcher = createDispatcher(); EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); @@ -432,17 +438,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testVerifyAndCreateRemoteDirsFailure() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); @@ -456,8 +457,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.start(); // Now try to start an application - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, @@ -475,8 +477,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Mockito.reset(logAggregationService); // Now try to start another one - ApplicationId appId2 = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId2 = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); File appLogDir = new File(localLogDir, ConverterUtils.toString(appId2)); appLogDir.mkdir(); @@ -578,6 +581,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + aggSvc.stop(); + aggSvc.close(); } @Test @@ -588,19 +593,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( eq(appId), eq(user), any(Credentials.class), @@ -634,26 +636,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testLogAggregationCreateDirsFailsWithoutKillingNM() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); Exception e = new RuntimeException("KABOOM!"); doThrow(e) .when(logAggregationService).createAppDir(any(String.class), @@ -905,18 +903,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test(timeout=20000) - @SuppressWarnings("unchecked") public void testStopAfterError() throws Exception { DeletionService delSrvc = mock(DeletionService.class); // get the AppLogAggregationImpl thread to crash LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); @@ -930,20 +923,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); + logAggregationService.close(); } @Test - @SuppressWarnings("unchecked") public void testLogAggregatorCleanup() throws Exception { DeletionService delSrvc = mock(DeletionService.class); // get the AppLogAggregationImpl thread to crash LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); @@ -964,6 +953,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } Assert.assertEquals("Log aggregator failed to cleanup!", 0, logAggregationService.getNumAggregators()); + logAggregationService.stop(); + logAggregationService.close(); } @SuppressWarnings("unchecked") @@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends BaseContainerManagerTest { return sb.toString(); } + /* + * Test to make sure we handle cases where the directories we get back from + * the LocalDirsHandler may have issues including the log dir not being + * present as well as other issues. The test uses helper functions from + * TestNonAggregatingLogHandler. + */ + @Test + public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception { + + // setup conf and services + DeletionService mockDelService = mock(DeletionService.class); + File[] localLogDirs = + TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass() + .getName(), 7); + final List<String> localLogDirPaths = + new ArrayList<String>(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + String localLogDirsString = StringUtils.join(localLogDirPaths, ","); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application1, 1); + + this.dirsHandler = new LocalDirsHandlerService(); + LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class); + + LogAggregationService logAggregationService = + spy(new LogAggregationService(dispatcher, this.context, mockDelService, + mockDirsHandler)); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logAggregationService).getLocalFileContext( + isA(Configuration.class)); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService, + application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs, + localLogDirs); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + verify(logAggregationService).closeFileSystems( + any(UserGroupInformation.class)); + + ApplicationEvent expectedEvents[] = + new ApplicationEvent[] { + new ApplicationEvent(appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) }; + + checkEvents(appEventHandler, expectedEvents, true, "getType", + "getApplicationID"); + } + @Test (timeout = 50000) @SuppressWarnings("unchecked") public void testLogAggregationServiceWithPatterns() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3820bf05/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 300ca28..d0f6472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -19,15 +19,36 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -45,25 +66,52 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.mockito.exceptions.verification.WantedButNotInvoked; +import org.mockito.internal.matchers.VarargMatcher; public class TestNonAggregatingLogHandler { + + DeletionService mockDelService; + Configuration conf; + DrainDispatcher dispatcher; + EventHandler<ApplicationEvent> appEventHandler; + String user = "testuser"; + ApplicationId appId; + ApplicationAttemptId appAttemptId; + ContainerId container11; + LocalDirsHandlerService dirsHandler; + + @Before + @SuppressWarnings("unchecked") + public void setup() { + mockDelService = mock(DeletionService.class); + conf = new YarnConfiguration(); + dispatcher = createDispatcher(conf); + appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + appId = BuilderUtils.newApplicationId(1234, 1); + appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); + container11 = BuilderUtils.newContainerId(appAttemptId, 1); + dirsHandler = new LocalDirsHandlerService(); + } + + @After + public void tearDown() throws IOException { + dirsHandler.stop(); + dirsHandler.close(); + dispatcher.await(); + dispatcher.stop(); + dispatcher.close(); + } @Test - @SuppressWarnings("unchecked") - public void testLogDeletion() { - DeletionService delService = mock(DeletionService.class); - Configuration conf = new YarnConfiguration(); - String user = "testuser"; - - File[] localLogDirs = new File[2]; - localLogDirs[0] = - new File("target", this.getClass().getName() + "-localLogDir0") - .getAbsoluteFile(); - localLogDirs[1] = - new File("target", this.getClass().getName() + "-localLogDir1") - .getAbsoluteFile(); + public void testLogDeletion() throws IOException { + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); String localLogDirsString = localLogDirs[0].getAbsolutePath() + "," + localLogDirs[1].getAbsolutePath(); @@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler { conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); - DrainDispatcher dispatcher = createDispatcher(conf); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - - LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); - ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); - ApplicationAttemptId appAttemptId1 = - BuilderUtils.newApplicationAttemptId(appId1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); + NonAggregatingLogHandler rawLogHandler = + new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler); + NonAggregatingLogHandler logHandler = spy(rawLogHandler); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logHandler) + .getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + doReturn(fs).when(spylfs).getFileStatus(isA(Path.class)); - NonAggregatingLogHandler logHandler = - new NonAggregatingLogHandler(dispatcher, delService, dirsHandler); logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); - logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); Path[] localAppLogDirs = new Path[2]; localAppLogDirs[0] = - new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[0].getAbsolutePath(), appId.toString()); localAppLogDirs[1] = - new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[1].getAbsolutePath(), appId.toString()); - // 5 seconds for the delete which is a separate thread. - long verifyStartTime = System.currentTimeMillis(); - WantedButNotInvoked notInvokedException = null; - boolean matched = false; - while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) { - try { - verify(delService).delete(eq(user), (Path) eq(null), - eq(localAppLogDirs[0]), eq(localAppLogDirs[1])); - matched = true; - } catch (WantedButNotInvoked e) { - notInvokedException = e; - try { - Thread.sleep(50l); - } catch (InterruptedException i) { - } - } - } - if (!matched) { - throw notInvokedException; + testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs); + logHandler.close(); + for (int i = 0; i < localLogDirs.length; i++) { + FileUtils.deleteDirectory(localLogDirs[i]); } } @Test - @SuppressWarnings("unchecked") - public void testDelayedDelete() { - DeletionService delService = mock(DeletionService.class); - Configuration conf = new YarnConfiguration(); - String user = "testuser"; - - File[] localLogDirs = new File[2]; - localLogDirs[0] = - new File("target", this.getClass().getName() + "-localLogDir0") - .getAbsoluteFile(); - localLogDirs[1] = - new File("target", this.getClass().getName() + "-localLogDir1") - .getAbsoluteFile(); + public void testDelayedDelete() throws IOException { + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); String localLogDirsString = localLogDirs[0].getAbsolutePath() + "," + localLogDirs[1].getAbsolutePath(); @@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); - DrainDispatcher dispatcher = createDispatcher(conf); - EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - - LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); - ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); - ApplicationAttemptId appAttemptId1 = - BuilderUtils.newApplicationAttemptId(appId1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); - NonAggregatingLogHandler logHandler = - new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService, + new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService, dirsHandler); logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); - logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); Path[] localAppLogDirs = new Path[2]; localAppLogDirs[0] = - new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[0].getAbsolutePath(), appId.toString()); localAppLogDirs[1] = - new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[1].getAbsolutePath(), appId.toString()); ScheduledThreadPoolExecutor mockSched = ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched; verify(mockSched).schedule(any(Runnable.class), eq(10800l), eq(TimeUnit.SECONDS)); + logHandler.close(); + for (int i = 0; i < localLogDirs.length; i++) { + FileUtils.deleteDirectory(localLogDirs[i]); + } } @Test @@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler { verify(logHandler.mockSched) .awaitTermination(eq(10l), eq(TimeUnit.SECONDS)); verify(logHandler.mockSched).shutdownNow(); + logHandler.close(); + aggregatingLogHandler.close(); } @Test - public void testHandlingApplicationFinishedEvent() { - Configuration conf = new Configuration(); - LocalDirsHandlerService dirsService = new LocalDirsHandlerService(); + public void testHandlingApplicationFinishedEvent() throws IOException { DeletionService delService = new DeletionService(null); NonAggregatingLogHandler aggregatingLogHandler = new NonAggregatingLogHandler(new InlineDispatcher(), delService, - dirsService); + dirsHandler); - dirsService.init(conf); - dirsService.start(); + dirsHandler.init(conf); + dirsHandler.start(); delService.init(conf); delService.start(); aggregatingLogHandler.init(conf); aggregatingLogHandler.start(); - ApplicationId appId = BuilderUtils.newApplicationId(1234, 1); + // It should NOT throw RejectedExecutionException aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); aggregatingLogHandler.stop(); @@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler { // It should NOT throw RejectedExecutionException after stopping // handler service. aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); + aggregatingLogHandler.close(); } private class NonAggregatingLogHandlerWithMockExecutor extends @@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler { dispatcher.start(); return dispatcher; } + + /* + * Test to ensure that we handle the cleanup of directories that may not have + * the application log dirs we're trying to delete or may have other problems. + * Test creates 7 log dirs, and fails the directory check for 4 of them and + * then checks to ensure we tried to delete only the ones that passed the + * check. + */ + @Test + public void testFailedDirLogDeletion() throws Exception { + + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7); + final List<String> localLogDirPaths = + new ArrayList<String>(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + String localLogDirsString = StringUtils.join(localLogDirPaths, ","); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); + + LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class); + + NonAggregatingLogHandler rawLogHandler = + new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler); + NonAggregatingLogHandler logHandler = spy(rawLogHandler); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logHandler) + .getLocalFileContext(isA(Configuration.class)); + logHandler.init(conf); + logHandler.start(); + runMockedFailedDirs(logHandler, appId, user, mockDelService, + mockDirsHandler, conf, spylfs, lfs, localLogDirs); + logHandler.close(); + } + + /** + * Function to run a log handler with directories failing the getFileStatus + * call. The function accepts the log handler, setup the mocks to fail with + * specific exceptions and ensures the deletion service has the correct calls. + * + * @param logHandler the logHandler implementation to test + * + * @param appId the application id that we wish when sending events to the log + * handler + * + * @param user the user name to use + * + * @param mockDelService a mock of the DeletionService which we will verify + * the delete calls against + * + * @param dirsHandler a spy or mock on the LocalDirsHandler service used to + * when creating the logHandler. It needs to be a spy so that we can intercept + * the getAllLogDirs() call. + * + * @param conf the configuration used + * + * @param spylfs a spy on the AbstractFileSystem object used when creating lfs + * + * @param lfs the FileContext object to be used to mock the getFileStatus() + * calls + * + * @param localLogDirs list of the log dirs to run the test against, must have + * at least 7 entries + */ + public static void runMockedFailedDirs(LogHandler logHandler, + ApplicationId appId, String user, DeletionService mockDelService, + LocalDirsHandlerService dirsHandler, Configuration conf, + AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs) + throws Exception { + Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); + if (localLogDirs.length < 7) { + throw new IllegalArgumentException( + "Argument localLogDirs must be at least of length 7"); + } + Path[] localAppLogDirPaths = new Path[localLogDirs.length]; + for (int i = 0; i < localAppLogDirPaths.length; i++) { + localAppLogDirPaths[i] = + new Path(localLogDirs[i].getAbsolutePath(), appId.toString()); + } + final List<String> localLogDirPaths = + new ArrayList<String>(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + // setup mocks + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + doReturn(fs).when(spylfs).getFileStatus(isA(Path.class)); + doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls)); + + // test case where some dirs have the log dir to delete + // mock some dirs throwing various exceptions + // verify deletion happens only on the others + Mockito.doThrow(new FileNotFoundException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[0])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1])); + Mockito.doThrow(new AccessControlException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[2])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3])); + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[4])); + Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[5])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6])); + + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); + + testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1], + localAppLogDirPaths[3], localAppLogDirPaths[6]); + + return; + } + + static class DeletePathsMatcher extends ArgumentMatcher<Path[]> implements + VarargMatcher { + + // to get rid of serialization warning + static final long serialVersionUID = 0; + + private transient Path[] matchPaths; + + DeletePathsMatcher(Path... matchPaths) { + this.matchPaths = matchPaths; + } + + @Override + public boolean matches(Object varargs) { + return new EqualsBuilder().append(matchPaths, varargs).isEquals(); + } + + // function to get rid of FindBugs warning + private void readObject(ObjectInputStream os) throws NotSerializableException { + throw new NotSerializableException(this.getClass().getName()); + } + } + + /** + * Function to verify that the DeletionService object received the right + * requests. + * + * @param delService the DeletionService mock which we verify against + * + * @param user the user name to use when verifying the deletion + * + * @param timeout amount in milliseconds to wait before we decide the calls + * didn't come through + * + * @param matchPaths the paths to match in the delete calls + * + * @throws WantedButNotInvoked if the calls could not be verified + */ + static void testDeletionServiceCall(DeletionService delService, String user, + long timeout, Path... matchPaths) { + + long verifyStartTime = System.currentTimeMillis(); + WantedButNotInvoked notInvokedException = null; + boolean matched = false; + while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) { + try { + verify(delService).delete(eq(user), (Path) eq(null), + Mockito.argThat(new DeletePathsMatcher(matchPaths))); + matched = true; + } catch (WantedButNotInvoked e) { + notInvokedException = e; + try { + Thread.sleep(50l); + } catch (InterruptedException i) { + } + } + } + if (!matched) { + throw notInvokedException; + } + return; + } + + public static File[] getLocalLogDirFiles(String name, int number) { + File[] dirs = new File[number]; + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new File("target", name + "-localLogDir" + i).getAbsoluteFile(); + } + return dirs; + } }