Author: vinodkv
Date: Tue Jul 16 23:31:54 2013
New Revision: 1503944

URL: http://svn.apache.org/r1503944
Log:
YARN-661. Fixed NM to cleanup users' local directories correctly when starting 
up. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1503942 ../../trunk/

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1503944&r1=1503943&r2=1503944&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Tue 
Jul 16 23:31:54 2013
@@ -21,6 +21,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-523. Modified a test-case to validate container diagnostics on
     localization failures. (Jian He via vinodkv)
 
+    YARN-661. Fixed NM to cleanup users' local directories correctly when
+    starting up. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.1.0-beta - 2013-07-02
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1503944&r1=1503943&r2=1503944&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
 Tue Jul 16 23:31:54 2013
@@ -18,23 +18,30 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class DeletionService extends AbstractService {
@@ -42,7 +49,8 @@ public class DeletionService extends Abs
   private int debugDelay;
   private final ContainerExecutor exec;
   private ScheduledThreadPoolExecutor sched;
-  private final FileContext lfs = getLfs();
+  private static final FileContext lfs = getLfs();
+
   static final FileContext getLfs() {
     try {
       return FileContext.getLocalFSFileContext();
@@ -68,11 +76,23 @@ public class DeletionService extends Abs
   public void delete(String user, Path subDir, Path... baseDirs) {
     // TODO if parent owned by NM, rename within parent inline
     if (debugDelay != -1) {
-      sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay,
-          TimeUnit.SECONDS);
+      if (baseDirs == null || baseDirs.length == 0) {
+        sched.schedule(new FileDeletionTask(this, user, subDir, null),
+          debugDelay, TimeUnit.SECONDS);
+      } else {
+        sched.schedule(
+          new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
+          debugDelay, TimeUnit.SECONDS);
+      }
     }
   }
-
+  
+  public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
+    if (debugDelay != -1) {
+      sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
+    }
+  }
+  
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     ThreadFactory tf = new ThreadFactoryBuilder()
@@ -118,46 +138,184 @@ public class DeletionService extends Abs
     return getServiceState() == STATE.STOPPED && sched.isTerminated();
   }
 
-  private class FileDeletion implements Runnable {
-    final String user;
-    final Path subDir;
-    final Path[] baseDirs;
-    FileDeletion(String user, Path subDir, Path[] baseDirs) {
+  public static class FileDeletionTask implements Runnable {
+    private final String user;
+    private final Path subDir;
+    private final List<Path> baseDirs;
+    private final AtomicInteger numberOfPendingPredecessorTasks;
+    private final Set<FileDeletionTask> successorTaskSet;
+    private final DeletionService delService;
+    // By default all tasks will start as success=true; however if any of
+    // the dependent task fails then it will be marked as false in
+    // fileDeletionTaskFinished().
+    private boolean success;
+    
+    private FileDeletionTask(DeletionService delService, String user,
+        Path subDir, List<Path> baseDirs) {
+      this.delService = delService;
       this.user = user;
       this.subDir = subDir;
       this.baseDirs = baseDirs;
+      this.successorTaskSet = new HashSet<FileDeletionTask>();
+      this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
+      success = true;
+    }
+    
+    /**
+     * increments and returns pending predecessor task count
+     */
+    public int incrementAndGetPendingPredecessorTasks() {
+      return numberOfPendingPredecessorTasks.incrementAndGet();
+    }
+    
+    /**
+     * decrements and returns pending predecessor task count
+     */
+    public int decrementAndGetPendingPredecessorTasks() {
+      return numberOfPendingPredecessorTasks.decrementAndGet();
     }
+    
+    @VisibleForTesting
+    public String getUser() {
+      return this.user;
+    }
+    
+    @VisibleForTesting
+    public Path getSubDir() {
+      return this.subDir;
+    }
+    
+    @VisibleForTesting
+    public List<Path> getBaseDirs() {
+      return this.baseDirs;
+    }
+    
+    public synchronized void setSuccess(boolean success) {
+      this.success = success;
+    }
+    
+    public synchronized boolean getSucess() {
+      return this.success;
+    }
+    
     @Override
     public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this);
+      }
+      boolean error = false;
       if (null == user) {
-        if (baseDirs == null || baseDirs.length == 0) {
+        if (baseDirs == null || baseDirs.size() == 0) {
           LOG.debug("NM deleting absolute path : " + subDir);
           try {
             lfs.delete(subDir, true);
           } catch (IOException e) {
+            error = true;
             LOG.warn("Failed to delete " + subDir);
           }
-          return;
-        }
-        for (Path baseDir : baseDirs) {
-          Path del = subDir == null? baseDir : new Path(baseDir, subDir);
-          LOG.debug("NM deleting path : " + del);
-          try {
-            lfs.delete(del, true);
-          } catch (IOException e) {
-            LOG.warn("Failed to delete " + subDir);
+        } else {
+          for (Path baseDir : baseDirs) {
+            Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+            LOG.debug("NM deleting path : " + del);
+            try {
+              lfs.delete(del, true);
+            } catch (IOException e) {
+              error = true;
+              LOG.warn("Failed to delete " + subDir);
+            }
           }
         }
       } else {
         try {
           LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
-          exec.deleteAsUser(user, subDir, baseDirs);
+          if (baseDirs == null || baseDirs.size() == 0) {
+            delService.exec.deleteAsUser(user, subDir, (Path[])null);
+          } else {
+            delService.exec.deleteAsUser(user, subDir,
+              baseDirs.toArray(new Path[0]));
+          }
         } catch (IOException e) {
+          error = true;
           LOG.warn("Failed to delete as user " + user, e);
         } catch (InterruptedException e) {
+          error = true;
           LOG.warn("Failed to delete as user " + user, e);
         }
       }
+      if (error) {
+        setSuccess(!error);        
+      }
+      fileDeletionTaskFinished();
     }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
+      sb.append("  user : ").append(this.user);
+      sb.append("  subDir : ").append(
+        subDir == null ? "null" : subDir.toString());
+      sb.append("  baseDir : ");
+      if (baseDirs == null || baseDirs.size() == 0) {
+        sb.append("null");
+      } else {
+        for (Path baseDir : baseDirs) {
+          sb.append(baseDir.toString()).append(',');
+        }
+      }
+      return sb.toString();
+    }
+    
+    /**
+     * If there is a task dependency between say tasks 1,2,3 such that
+     * task2 and task3 can be started only after task1 then we should define
+     * task2 and task3 as successor tasks for task1.
+     * Note:- Task dependency should be defined prior to
+     * @param successorTask
+     */
+    public synchronized void addFileDeletionTaskDependency(
+        FileDeletionTask successorTask) {
+      if (successorTaskSet.add(successorTask)) {
+        successorTask.incrementAndGetPendingPredecessorTasks();
+      }
+    }
+    
+    /*
+     * This is called when
+     * 1) Current file deletion task ran and finished.
+     * 2) This can be even directly called by predecessor task if one of the
+     * dependent tasks of it has failed marking its success = false.  
+     */
+    private synchronized void fileDeletionTaskFinished() {
+      Iterator<FileDeletionTask> successorTaskI =
+          this.successorTaskSet.iterator();
+      while (successorTaskI.hasNext()) {
+        FileDeletionTask successorTask = successorTaskI.next();
+        if (!success) {
+          successorTask.setSuccess(success);
+        }
+        int count = successorTask.decrementAndGetPendingPredecessorTasks();
+        if (count == 0) {
+          if (successorTask.getSucess()) {
+            successorTask.delService.scheduleFileDeletionTask(successorTask);
+          } else {
+            successorTask.fileDeletionTaskFinished();
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Helper method to create file deletion task. To be used only if we need
+   * a way to define dependencies between deletion tasks.
+   * @param user user on whose behalf this task is suppose to run
+   * @param subDir sub directory as required in 
+   * {@link DeletionService#delete(String, Path, Path...)}
+   * @param baseDirs base directories as required in
+   * {@link DeletionService#delete(String, Path, Path...)}
+   */
+  public FileDeletionTask createFileDeletionTask(String user, Path subDir,
+      Path[] baseDirs) {
+    return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
   }
-}
+}
\ No newline at end of file

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1503944&r1=1503943&r2=1503944&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 Tue Jul 16 23:31:54 2013
@@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import 
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import 
org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -1094,7 +1095,8 @@ public class ResourceLocalizationService
         try {
           if (status.getPath().getName().matches(".*" +
               ContainerLocalizer.USERCACHE + "_DEL_.*")) {
-            cleanUpFilesFromSubDir(lfs, del, status.getPath());
+            LOG.info("usercache path : " + status.getPath().toString());
+            cleanUpFilesPerUserDir(lfs, del, status.getPath());
           } else if (status.getPath().getName()
               .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
               ||
@@ -1111,17 +1113,28 @@ public class ResourceLocalizationService
     }
   }
 
-  private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
-      Path dirPath) throws IOException {
-    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
-    if (fileStatus != null) {
-      while (fileStatus.hasNext()) {
-        FileStatus status = fileStatus.next();
+  private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
+      Path userDirPath) throws IOException {
+    RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
+    FileDeletionTask dependentDeletionTask =
+        del.createFileDeletionTask(null, userDirPath, new Path[] {});
+    if (userDirStatus != null) {
+      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+      while (userDirStatus.hasNext()) {
+        FileStatus status = userDirStatus.next();
         String owner = status.getOwner();
-        del.delete(owner, status.getPath(), new Path[] {});
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(owner, null,
+              new Path[] { status.getPath() });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
       }
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
+      }
+    } else {
+      del.scheduleFileDeletionTask(dependentDeletionTask);
     }
-    del.delete(null, dirPath, new Path[] {});
   }
 
 }

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1503944&r1=1503943&r2=1503944&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
 Tue Jul 16 23:31:54 2013
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-
+import 
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.*;
-
 public class TestDeletionService {
 
   private static final FileContext lfs = getLfs();
@@ -210,4 +210,79 @@ public class TestDeletionService {
     }
     assertTrue(del.isTerminated());
   }
+  
+  @Test (timeout=60000)
+  public void testFileDeletionTaskDependency() throws Exception {
+    FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+    Configuration conf = new Configuration();
+    exec.setConf(conf);
+    DeletionService del = new DeletionService(exec);
+    del.init(conf);
+    del.start();
+    
+    try {
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      System.out.println("SEED: " + seed);
+      List<Path> dirs = buildDirs(r, base, 2);
+      createDirs(new Path("."), dirs);
+      
+      // first we will try to delete sub directories which are present. This
+      // should then trigger parent directory to be deleted.
+      List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
+      
+      FileDeletionTask dependentDeletionTask =
+          del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+      for (Path subDir : subDirs) {
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(null, null, new Path[] { subDir });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
+      }
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
+      }
+
+      int msecToWait = 20 * 1000;
+      while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
+        Thread.sleep(100);
+        msecToWait -= 100;
+      }
+      assertFalse(lfs.util().exists(dirs.get(0)));
+      
+
+      // Now we will try to delete sub directories; one of the deletion task we
+      // will mark as failure and then parent directory should not be deleted.
+      subDirs = buildDirs(r, dirs.get(1), 2);
+      subDirs.add(new Path(dirs.get(1), "absentFile"));
+      
+      dependentDeletionTask =
+          del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+      deletionTasks = new ArrayList<FileDeletionTask>();
+      for (Path subDir : subDirs) {
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(null, null, new Path[] { subDir });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
+      }
+      // marking one of the tasks as a failure.
+      deletionTasks.get(2).setSuccess(false);
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
+      }
+
+      msecToWait = 20 * 1000;
+      while (msecToWait > 0
+          && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
+            subDirs.get(1)))) {
+        Thread.sleep(100);
+        msecToWait -= 100;
+      }
+      assertTrue(lfs.util().exists(dirs.get(1)));
+    } finally {
+      del.stop();
+    }
+  }
 }

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1503944&r1=1503943&r2=1503944&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
 Tue Jul 16 23:31:54 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import 
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -69,8 +70,8 @@ import org.mockito.ArgumentMatcher;
 
 public class TestNodeManagerReboot {
 
-  static final File basedir =
-      new File("target", TestNodeManagerReboot.class.getName());
+  static final File basedir = new File("target",
+    TestNodeManagerReboot.class.getName());
   static final File logsDir = new File(basedir, "logs");
   static final File nmLocalDir = new File(basedir, "nm0");
   static final File localResourceDir = new File(basedir, "resource");
@@ -100,7 +101,8 @@ public class TestNodeManagerReboot {
     nm = new MyNodeManager();
     nm.start();
 
-    final ContainerManagementProtocol containerManager = 
nm.getContainerManager();
+    final ContainerManagementProtocol containerManager =
+        nm.getContainerManager();
 
     // create files under fileCache
     createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 
100);
@@ -112,16 +114,13 @@ public class TestNodeManagerReboot {
     ContainerId cId = createContainerId();
 
     URL localResourceUri =
-        ConverterUtils.getYarnUrlFromPath(localFS
-            .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+        ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
+          localResourceDir.getAbsolutePath())));
 
     LocalResource localResource =
-        Records.newRecord(LocalResource.class);
-    localResource.setResource(localResourceUri);
-    localResource.setSize(-1);
-    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    localResource.setType(LocalResourceType.FILE);
-    localResource.setTimestamp(localResourceDir.lastModified());
+        LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
+          LocalResourceVisibility.APPLICATION, -1,
+          localResourceDir.lastModified());
     String destinationFile = "dest_file";
     Map<String, LocalResource> localResources =
         new HashMap<String, LocalResource>();
@@ -129,7 +128,7 @@ public class TestNodeManagerReboot {
     containerLaunchContext.setLocalResources(localResources);
     List<String> commands = new ArrayList<String>();
     containerLaunchContext.setCommands(commands);
-    
+
     final StartContainerRequest startRequest =
         Records.newRecord(StartContainerRequest.class);
     startRequest.setContainerLaunchContext(containerLaunchContext);
@@ -137,8 +136,9 @@ public class TestNodeManagerReboot {
     startRequest.setContainerToken(TestContainerManager.createContainerToken(
       cId, 0, nodeId, destinationFile, nm.getNMContext()
         .getContainerTokenSecretManager()));
-    final UserGroupInformation currentUser = UserGroupInformation
-        .createRemoteUser(cId.getApplicationAttemptId().toString());
+    final UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
+          .toString());
     NMTokenIdentifier nmIdentifier =
         new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 
123);
     currentUser.addTokenIdentifier(nmIdentifier);
@@ -170,27 +170,31 @@ public class TestNodeManagerReboot {
 
     Assert.assertEquals(ContainerState.DONE, container.getContainerState());
 
-    Assert.assertTrue(
-        "The container should create a subDir named currentUser: " + user +
-            "under localDir/usercache",
+    Assert
+      .assertTrue(
+        "The container should create a subDir named currentUser: " + user
+            + "under localDir/usercache",
         numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-            ContainerLocalizer.USERCACHE) > 0);
+          ContainerLocalizer.USERCACHE) > 0);
 
-    Assert.assertTrue("There should be files or Dirs under nm_private when " +
-        "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+    Assert.assertTrue(
+      "There should be files or Dirs under nm_private when "
+          + "container is launched",
+      numOfLocalDirs(nmLocalDir.getAbsolutePath(),
         ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
 
     // restart the NodeManager
     nm.stop();
     nm = new MyNodeManager();
-    nm.start();    
+    nm.start();
 
     numTries = 0;
-    while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
-        .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-        ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
-        .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
-        > 0) && numTries < MAX_TRIES) {
+    while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+      ContainerLocalizer.USERCACHE) > 0
+        || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+          ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
+      nmLocalDir.getAbsolutePath(), 
ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
+        && numTries < MAX_TRIES) {
       try {
         Thread.sleep(500);
       } catch (InterruptedException ex) {
@@ -199,21 +203,27 @@ public class TestNodeManagerReboot {
       numTries++;
     }
 
-    Assert.assertTrue("After NM reboots, all local files should be deleted",
-        numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
-            .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-            ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
-            .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
-              == 0);
+    Assert
+      .assertTrue(
+        "After NM reboots, all local files should be deleted",
+        numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+          ContainerLocalizer.USERCACHE) == 0
+            && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+              ContainerLocalizer.FILECACHE) == 0
+            && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+              ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
     verify(delService, times(1)).delete(
-        (String) isNull(),
-        argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
-            + "_DEL_")));
+      (String) isNull(),
+      argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+          + "_DEL_")));
     verify(delService, times(1)).delete((String) isNull(),
-        argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
-    verify(delService, times(1)).delete((String) isNull(),
-        argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
-
+      argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+    verify(delService, times(1)).scheduleFileDeletionTask(
+      argThat(new FileDeletionInclude(user, null,
+        new String[] { destinationFile })));
+    verify(delService, times(1)).scheduleFileDeletionTask(
+      argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
+          + "_DEL_", new String[] {})));
   }
 
   private int numOfLocalDirs(String localDir, String localSubDir) {
@@ -238,7 +248,8 @@ public class TestNodeManagerReboot {
 
   private ContainerId createContainerId() {
     ApplicationId appId = ApplicationId.newInstance(0, 0);
-    ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
     return containerId;
   }
@@ -253,8 +264,8 @@ public class TestNodeManagerReboot {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
-          context, dispatcher, healthChecker, metrics);
+      MockNodeStatusUpdater myNodeStatusUpdater =
+          new MockNodeStatusUpdater(context, dispatcher, healthChecker, 
metrics);
       return myNodeStatusUpdater;
     }
 
@@ -288,4 +299,58 @@ public class TestNodeManagerReboot {
       return ((Path) o).getName().indexOf(part) != -1;
     }
   }
+  
+  class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
+    final String user;
+    final String subDirIncludes;
+    final String[] baseDirIncludes;
+    
+    public FileDeletionInclude(String user, String subDirIncludes,
+        String [] baseDirIncludes) {
+      this.user = user;
+      this.subDirIncludes = subDirIncludes;
+      this.baseDirIncludes = baseDirIncludes;
+    }
+    
+    @Override
+    public boolean matches(Object o) {
+      FileDeletionTask fd = (FileDeletionTask)o;
+      if (fd.getUser() == null && user != null) {
+        return false;
+      } else if (fd.getUser() != null && user == null) {
+        return false;
+      } else if (fd.getUser() != null && user != null) {
+        return fd.getUser().equals(user);
+      }
+      if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
+        return false;
+      }
+      if (baseDirIncludes == null && fd.getBaseDirs() != null) {
+        return false;
+      } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
+        return false;
+      } else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
+        if (baseDirIncludes.length != fd.getBaseDirs().size()) {
+          return false;
+        }
+        for (int i =0 ; i < baseDirIncludes.length; i++) {
+          if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+    
+    public boolean comparePaths(Path p1, String p2) {
+      if (p1 == null && p2 != null){
+        return false;
+      } else if (p1 != null && p2 == null) {
+        return false;
+      } else if (p1 != null && p2 != null ){
+        return p1.toUri().getPath().contains(p2.toString());
+      }
+      return true;
+    }
+  }
 }


Reply via email to