Repository: hadoop
Updated Branches:
  refs/heads/branch-2 fd7ba56f6 -> 3820bf055


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3820bf05/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index ab86a18..8a5441a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -58,6 +59,8 @@ import org.junit.Assert;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -105,6 +108,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -136,12 +140,19 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     super();
     this.remoteRootLogDir.mkdir();
   }
+  
+  DrainDispatcher dispatcher;
+  EventHandler<ApplicationEvent> appEventHandler;
 
   @Override
+  @SuppressWarnings("unchecked")
   public void setup() throws IOException {
     super.setup();
     NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
     ((NMContext)context).setNodeId(nodeId);
+    dispatcher = createDispatcher();
+    appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
   }
 
   @Override
@@ -149,10 +160,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     super.tearDown();
     createContainerExecutor().deleteAsUser(user,
         new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
+    dispatcher.await();
+    dispatcher.stop();
+    dispatcher.close();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLocalFileDeletionAfterUpload() throws Exception {
     this.delSrvc = new DeletionService(createContainerExecutor());
     delSrvc = spy(delSrvc);
@@ -161,10 +174,6 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
@@ -236,16 +245,11 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testNoContainerOnNode() throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler);
@@ -285,6 +289,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedEvents, true, "getType", 
"getApplicationID");
     dispatcher.stop();
+    logAggregationService.close();
   }
 
   @Test
@@ -294,6 +299,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
+    
     String[] fileNames = new String[] { "stdout", "stderr", "syslog" };
     DrainDispatcher dispatcher = createDispatcher();
     EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@@ -432,17 +438,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
   
   @Test
-  @SuppressWarnings("unchecked")
   public void testVerifyAndCreateRemoteDirsFailure()
       throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
@@ -456,8 +457,9 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     logAggregationService.start();
     
     // Now try to start an application
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
         this.user, null,
         ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
@@ -475,8 +477,9 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     Mockito.reset(logAggregationService);
     
     // Now try to start another one
-    ApplicationId appId2 = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId2 =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     File appLogDir =
         new File(localLogDir, ConverterUtils.toString(appId2));
     appLogDir.mkdir();
@@ -578,6 +581,8 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
     verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+    aggSvc.stop();
+    aggSvc.close();
   }
 
   @Test
@@ -588,19 +593,16 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     doThrow(new YarnRuntimeException("KABOOM!"))
       .when(logAggregationService).initAppAggregator(
           eq(appId), eq(user), any(Credentials.class),
@@ -634,26 +636,22 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLogAggregationCreateDirsFailsWithoutKillingNM()
       throws Exception {
     
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-    
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
+        
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
     
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     Exception e = new RuntimeException("KABOOM!");
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
@@ -905,18 +903,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
 
   @Test(timeout=20000)
-  @SuppressWarnings("unchecked")
   public void testStopAfterError() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);
 
     // get the AppLogAggregationImpl thread to crash
     LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
     when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
-
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+    
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, delSrvc,
                                   mockedDirSvc);
@@ -930,20 +923,16 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
+    logAggregationService.close();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLogAggregatorCleanup() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);
 
     // get the AppLogAggregationImpl thread to crash
     LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
 
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, delSrvc,
                                   mockedDirSvc);
@@ -964,6 +953,8 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     }
     Assert.assertEquals("Log aggregator failed to cleanup!", 0,
         logAggregationService.getNumAggregators());
+    logAggregationService.stop();
+    logAggregationService.close();
   }
   
   @SuppressWarnings("unchecked")
@@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     return sb.toString();
   }
 
+  /*
+   * Test to make sure we handle cases where the directories we get back from
+   * the LocalDirsHandler may have issues including the log dir not being
+   * present as well as other issues. The test uses helper functions from
+   * TestNonAggregatingLogHandler.
+   */
+  @Test
+  public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception {
+
+    // setup conf and services
+    DeletionService mockDelService = mock(DeletionService.class);
+    File[] localLogDirs =
+        TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass()
+          .getName(), 7);
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      this.remoteRootLogDir.getAbsolutePath());
+    this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+
+    this.dirsHandler = new LocalDirsHandlerService();
+    LocalDirsHandlerService mockDirsHandler = 
mock(LocalDirsHandlerService.class);
+
+    LogAggregationService logAggregationService =
+        spy(new LogAggregationService(dispatcher, this.context, mockDelService,
+          mockDirsHandler));
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logAggregationService).getLocalFileContext(
+      isA(Configuration.class));
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService,
+      application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs,
+      localLogDirs);
+
+    logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
+    verify(logAggregationService).closeFileSystems(
+      any(UserGroupInformation.class));
+
+    ApplicationEvent expectedEvents[] =
+        new ApplicationEvent[] {
+            new ApplicationEvent(appAttemptId.getApplicationId(),
+              ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+            new ApplicationEvent(appAttemptId.getApplicationId(),
+              ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
+
+    checkEvents(appEventHandler, expectedEvents, true, "getType",
+      "getApplicationID");
+  }
+
   @Test (timeout = 50000)
   @SuppressWarnings("unchecked")
   public void testLogAggregationServiceWithPatterns() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3820bf05/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 300ca28..d0f6472 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -19,15 +19,36 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -45,25 +66,52 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.mockito.exceptions.verification.WantedButNotInvoked;
+import org.mockito.internal.matchers.VarargMatcher;
 
 public class TestNonAggregatingLogHandler {
+  
+  DeletionService mockDelService;
+  Configuration conf;
+  DrainDispatcher dispatcher;
+  EventHandler<ApplicationEvent> appEventHandler;
+  String user = "testuser";
+  ApplicationId appId;
+  ApplicationAttemptId appAttemptId;
+  ContainerId container11;
+  LocalDirsHandlerService dirsHandler;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() {
+    mockDelService = mock(DeletionService.class);
+    conf = new YarnConfiguration();
+    dispatcher = createDispatcher(conf);
+    appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    appId = BuilderUtils.newApplicationId(1234, 1);
+    appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    container11 = BuilderUtils.newContainerId(appAttemptId, 1);
+    dirsHandler = new LocalDirsHandlerService();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    dirsHandler.stop();
+    dirsHandler.close();
+    dispatcher.await();
+    dispatcher.stop();
+    dispatcher.close();
+  }  
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void testLogDeletion() {
-    DeletionService delService = mock(DeletionService.class);
-    Configuration conf = new YarnConfiguration();
-    String user = "testuser";
-
-    File[] localLogDirs = new File[2];
-    localLogDirs[0] =
-        new File("target", this.getClass().getName() + "-localLogDir0")
-            .getAbsoluteFile();
-    localLogDirs[1] =
-        new File("target", this.getClass().getName() + "-localLogDir1")
-            .getAbsoluteFile();
+  public void testLogDeletion() throws IOException {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
     String localLogDirsString =
         localLogDirs[0].getAbsolutePath() + ","
             + localLogDirs[1].getAbsolutePath();
@@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler {
     conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
 
-    DrainDispatcher dispatcher = createDispatcher(conf);
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
-    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
 
-    ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
-    ApplicationAttemptId appAttemptId1 =
-        BuilderUtils.newApplicationAttemptId(appId1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+    NonAggregatingLogHandler rawLogHandler =
+        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+    NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logHandler)
+      .getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "",
+          new Path(localLogDirs[0].getAbsolutePath()));
+    doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
 
-    NonAggregatingLogHandler logHandler =
-        new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
-    logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
     Path[] localAppLogDirs = new Path[2];
     localAppLogDirs[0] =
-        new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
     localAppLogDirs[1] =
-        new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
 
-    // 5 seconds for the delete which is a separate thread.
-    long verifyStartTime = System.currentTimeMillis();
-    WantedButNotInvoked notInvokedException = null;
-    boolean matched = false;
-    while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
-      try {
-        verify(delService).delete(eq(user), (Path) eq(null),
-            eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
-        matched = true;
-      } catch (WantedButNotInvoked e) {
-        notInvokedException = e;
-        try {
-          Thread.sleep(50l);
-        } catch (InterruptedException i) {
-        }
-      }
-    }
-    if (!matched) {
-      throw notInvokedException;
+    testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
+    logHandler.close();
+    for (int i = 0; i < localLogDirs.length; i++) {
+      FileUtils.deleteDirectory(localLogDirs[i]);
     }
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void testDelayedDelete() {
-    DeletionService delService = mock(DeletionService.class);
-    Configuration conf = new YarnConfiguration();
-    String user = "testuser";
-
-    File[] localLogDirs = new File[2];
-    localLogDirs[0] =
-        new File("target", this.getClass().getName() + "-localLogDir0")
-            .getAbsoluteFile();
-    localLogDirs[1] =
-        new File("target", this.getClass().getName() + "-localLogDir1")
-            .getAbsoluteFile();
+  public void testDelayedDelete() throws IOException {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
     String localLogDirsString =
         localLogDirs[0].getAbsolutePath() + ","
             + localLogDirs[1].getAbsolutePath();
@@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler {
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
             YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
 
-    DrainDispatcher dispatcher = createDispatcher(conf);
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
-    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
 
-    ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
-    ApplicationAttemptId appAttemptId1 =
-        BuilderUtils.newApplicationAttemptId(appId1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
-
     NonAggregatingLogHandler logHandler =
-        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
+        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, 
mockDelService,
                                                      dirsHandler);
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
-    logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
     Path[] localAppLogDirs = new Path[2];
     localAppLogDirs[0] =
-        new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
     localAppLogDirs[1] =
-        new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
 
     ScheduledThreadPoolExecutor mockSched =
         ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
 
     verify(mockSched).schedule(any(Runnable.class), eq(10800l),
         eq(TimeUnit.SECONDS));
+    logHandler.close();
+    for (int i = 0; i < localLogDirs.length; i++) {
+      FileUtils.deleteDirectory(localLogDirs[i]);
+    }
   }
   
   @Test
@@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler {
     verify(logHandler.mockSched)
         .awaitTermination(eq(10l), eq(TimeUnit.SECONDS));
     verify(logHandler.mockSched).shutdownNow();
+    logHandler.close();
+    aggregatingLogHandler.close();
   }
 
   @Test
-  public void testHandlingApplicationFinishedEvent() {
-    Configuration conf = new Configuration();
-    LocalDirsHandlerService dirsService  = new LocalDirsHandlerService();
+  public void testHandlingApplicationFinishedEvent() throws IOException {
     DeletionService delService = new DeletionService(null);
     NonAggregatingLogHandler aggregatingLogHandler =
         new NonAggregatingLogHandler(new InlineDispatcher(),
             delService,
-            dirsService);
+            dirsHandler);
 
-    dirsService.init(conf);
-    dirsService.start();
+    dirsHandler.init(conf);
+    dirsHandler.start();
     delService.init(conf);
     delService.start();
     aggregatingLogHandler.init(conf);
     aggregatingLogHandler.start();
-    ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
+    
     // It should NOT throw RejectedExecutionException
     aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
     aggregatingLogHandler.stop();
@@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler {
     // It should NOT throw RejectedExecutionException after stopping
     // handler service.
     aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
+    aggregatingLogHandler.close();
   }
 
   private class NonAggregatingLogHandlerWithMockExecutor extends
@@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler {
     dispatcher.start();
     return dispatcher;
   }
+  
+  /*
+   * Test to ensure that we handle the cleanup of directories that may not have
+   * the application log dirs we're trying to delete or may have other 
problems.
+   * Test creates 7 log dirs, and fails the directory check for 4 of them and
+   * then checks to ensure we tried to delete only the ones that passed the
+   * check.
+   */
+  @Test
+  public void testFailedDirLogDeletion() throws Exception {
+
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7);
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+    conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
+
+    LocalDirsHandlerService mockDirsHandler = 
mock(LocalDirsHandlerService.class);
+
+    NonAggregatingLogHandler rawLogHandler =
+        new NonAggregatingLogHandler(dispatcher, mockDelService, 
mockDirsHandler);
+    NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logHandler)
+      .getLocalFileContext(isA(Configuration.class));
+    logHandler.init(conf);
+    logHandler.start();
+    runMockedFailedDirs(logHandler, appId, user, mockDelService,
+      mockDirsHandler, conf, spylfs, lfs, localLogDirs);
+    logHandler.close();
+  }
+  
+  /**
+   * Function to run a log handler with directories failing the getFileStatus
+   * call. The function accepts the log handler, setup the mocks to fail with
+   * specific exceptions and ensures the deletion service has the correct 
calls.
+   * 
+   * @param logHandler the logHandler implementation to test
+   * 
+   * @param appId the application id that we wish when sending events to the 
log
+   * handler
+   * 
+   * @param user the user name to use
+   * 
+   * @param mockDelService a mock of the DeletionService which we will verify
+   * the delete calls against
+   * 
+   * @param dirsHandler a spy or mock on the LocalDirsHandler service used to
+   * when creating the logHandler. It needs to be a spy so that we can 
intercept
+   * the getAllLogDirs() call.
+   * 
+   * @param conf the configuration used
+   * 
+   * @param spylfs a spy on the AbstractFileSystem object used when creating 
lfs
+   * 
+   * @param lfs the FileContext object to be used to mock the getFileStatus()
+   * calls
+   * 
+   * @param localLogDirs list of the log dirs to run the test against, must 
have
+   * at least 7 entries
+   */
+  public static void runMockedFailedDirs(LogHandler logHandler,
+      ApplicationId appId, String user, DeletionService mockDelService,
+      LocalDirsHandlerService dirsHandler, Configuration conf,
+      AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs)
+      throws Exception {
+    Map<ApplicationAccessType, String> appAcls = new 
HashMap<ApplicationAccessType, String>();
+    if (localLogDirs.length < 7) {
+      throw new IllegalArgumentException(
+        "Argument localLogDirs must be at least of length 7");
+    }
+    Path[] localAppLogDirPaths = new Path[localLogDirs.length];
+    for (int i = 0; i < localAppLogDirPaths.length; i++) {
+      localAppLogDirPaths[i] =
+          new Path(localLogDirs[i].getAbsolutePath(), appId.toString());
+    }
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    // setup mocks
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "",
+          new Path(localLogDirs[0].getAbsolutePath()));
+    doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
+    doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
+
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+      ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
+
+    // test case where some dirs have the log dir to delete
+    // mock some dirs throwing various exceptions
+    // verify deletion happens only on the others
+    Mockito.doThrow(new FileNotFoundException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[0]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1]));
+    Mockito.doThrow(new AccessControlException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[2]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3]));
+    Mockito.doThrow(new IOException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[4]));
+    Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[5]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6]));
+
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+    testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1],
+      localAppLogDirPaths[3], localAppLogDirPaths[6]);
+
+    return;
+  }
+
+  static class DeletePathsMatcher extends ArgumentMatcher<Path[]> implements
+      VarargMatcher {
+    
+    // to get rid of serialization warning
+    static final long serialVersionUID = 0;
+
+    private transient Path[] matchPaths;
+
+    DeletePathsMatcher(Path... matchPaths) {
+      this.matchPaths = matchPaths;
+    }
+
+    @Override
+    public boolean matches(Object varargs) {
+      return new EqualsBuilder().append(matchPaths, varargs).isEquals();
+    }
+
+    // function to get rid of FindBugs warning
+    private void readObject(ObjectInputStream os) throws 
NotSerializableException {
+      throw new NotSerializableException(this.getClass().getName());
+    }
+  }
+
+  /**
+   * Function to verify that the DeletionService object received the right
+   * requests.
+   * 
+   * @param delService the DeletionService mock which we verify against
+   * 
+   * @param user the user name to use when verifying the deletion
+   * 
+   * @param timeout amount in milliseconds to wait before we decide the calls
+   * didn't come through
+   * 
+   * @param matchPaths the paths to match in the delete calls
+   * 
+   * @throws WantedButNotInvoked if the calls could not be verified
+   */
+  static void testDeletionServiceCall(DeletionService delService, String user,
+      long timeout, Path... matchPaths) {
+
+    long verifyStartTime = System.currentTimeMillis();
+    WantedButNotInvoked notInvokedException = null;
+    boolean matched = false;
+    while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) 
{
+      try {
+        verify(delService).delete(eq(user), (Path) eq(null),
+          Mockito.argThat(new DeletePathsMatcher(matchPaths)));
+        matched = true;
+      } catch (WantedButNotInvoked e) {
+        notInvokedException = e;
+        try {
+          Thread.sleep(50l);
+        } catch (InterruptedException i) {
+        }
+      }
+    }
+    if (!matched) {
+      throw notInvokedException;
+    }
+    return;
+  }
+
+  public static File[] getLocalLogDirFiles(String name, int number) {
+    File[] dirs = new File[number];
+    for (int i = 0; i < dirs.length; i++) {
+      dirs[i] = new File("target", name + "-localLogDir" + 
i).getAbsoluteFile();
+    }
+    return dirs;
+  }
 }

Reply via email to