Repository: hadoop
Updated Branches:
  refs/heads/branch-2 b799ea764 -> b806201f0


Revert "YARN-5641. Localizer leaves behind tarballs after container is 
complete. Contributed by Eric Badger"

This reverts commit 45407acaed05284ad897d04f4575ab0a4ee7a88b.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b806201f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b806201f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b806201f

Branch: refs/heads/branch-2
Commit: b806201f0a1fc641ea82315acb390adcebfc68f3
Parents: b799ea7
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 26 14:42:22 2017 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 26 14:42:22 2017 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/util/Shell.java |  52 +--
 .../java/org/apache/hadoop/util/TestShell.java  |   4 +-
 .../localizer/ContainerLocalizer.java           |  53 +---
 .../localizer/TestContainerLocalizer.java       | 317 ++++---------------
 4 files changed, 78 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b806201f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 7bc1ea2..55f92d6 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -27,9 +27,7 @@ import java.io.InterruptedIOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.WeakHashMap;
@@ -52,8 +50,8 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class Shell {
-  private static final Map<Shell, Object> CHILD_SHELLS =
-      Collections.synchronizedMap(new WeakHashMap<Shell, Object>());
+  private static final Map <Process, Object> CHILD_PROCESSES =
+      Collections.synchronizedMap(new WeakHashMap<Process, Object>());
   public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
 
   /**
@@ -822,7 +820,6 @@ public abstract class Shell {
   private File dir;
   private Process process; // sub process used to execute the command
   private int exitCode;
-  private Thread waitingThread;
 
   /** Flag to indicate whether or not the script has finished executing. */
   private final AtomicBoolean completed = new AtomicBoolean(false);
@@ -927,9 +924,7 @@ public abstract class Shell {
     } else {
       process = builder.start();
     }
-
-    waitingThread = Thread.currentThread();
-    CHILD_SHELLS.put(this, null);
+    CHILD_PROCESSES.put(process, null);
 
     if (timeOutInterval > 0) {
       timeOutTimer = new Timer("Shell command timeout");
@@ -1026,8 +1021,7 @@ public abstract class Shell {
         LOG.warn("Error while closing the error stream", ioe);
       }
       process.destroy();
-      waitingThread = null;
-      CHILD_SHELLS.remove(this);
+      CHILD_PROCESSES.remove(process);
       lastTime = Time.monotonicNow();
     }
   }
@@ -1075,15 +1069,6 @@ public abstract class Shell {
     return exitCode;
   }
 
-  /** get the thread that is waiting on this instance of <code>Shell</code>.
-   * @return the thread that ran runCommand() that spawned this shell
-   * or null if no thread is waiting for this shell to complete
-   */
-  public Thread getWaitingThread() {
-    return waitingThread;
-  }
-
-
   /**
    * This is an IOException with exit code added.
    */
@@ -1337,27 +1322,20 @@ public abstract class Shell {
   }
 
   /**
-   * Static method to destroy all running <code>Shell</code> processes.
-   * Iterates through a map of all currently running <code>Shell</code>
-   * processes and destroys them one by one. This method is thread safe
+   * Static method to destroy all running <code>Shell</code> processes
+   * Iterates through a list of all currently running <code>Shell</code>
+   * processes and destroys them one by one. This method is thread safe and
+   * is intended to be used in a shutdown hook.
    */
-  public static void destroyAllShellProcesses() {
-    synchronized (CHILD_SHELLS) {
-      for (Shell shell : CHILD_SHELLS.keySet()) {
-        if (shell.getProcess() != null) {
-          shell.getProcess().destroy();
+  public static void destroyAllProcesses() {
+    synchronized (CHILD_PROCESSES) {
+      for (Process key : CHILD_PROCESSES.keySet()) {
+        Process process = key;
+        if (key != null) {
+          process.destroy();
         }
       }
-      CHILD_SHELLS.clear();
-    }
-  }
-
-  /**
-   * Static method to return a Set of all <code>Shell</code> objects.
-   */
-  public static Set<Shell> getAllShells() {
-    synchronized (CHILD_SHELLS) {
-      return new HashSet<>(CHILD_SHELLS.keySet());
+      CHILD_PROCESSES.clear();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b806201f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index 7e53883..2707573 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -479,7 +479,7 @@ public class TestShell extends Assert {
   }
 
   @Test(timeout=120000)
-  public void testDestroyAllShellProcesses() throws Throwable {
+  public void testShellKillAllProcesses() throws Throwable {
     Assume.assumeFalse(WINDOWS);
     StringBuffer sleepCommand = new StringBuffer();
     sleepCommand.append("sleep 200");
@@ -524,7 +524,7 @@ public class TestShell extends Assert {
       }
     }, 10, 10000);
 
-    Shell.destroyAllShellProcesses();
+    Shell.destroyAllProcesses();
     shexc1.getProcess().waitFor();
     shexc2.getProcess().waitFor();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b806201f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
index 613c0a9..04be631 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
@@ -24,13 +24,10 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
@@ -56,7 +53,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskValidator;
 import org.apache.hadoop.util.DiskValidatorFactory;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -79,8 +75,6 @@ import org.apache.hadoop.yarn.util.FSDownload;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import static org.apache.hadoop.util.Shell.getAllShells;
-
 public class ContainerLocalizer {
 
   static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
@@ -107,9 +101,6 @@ public class ContainerLocalizer {
   private final String appCacheDirContextName;
   private final DiskValidator diskValidator;
 
-  private Set<Thread> localizingThreads =
-      Collections.synchronizedSet(new HashSet<Thread>());
-
   public ContainerLocalizer(FileContext lfs, String user, String appId,
       String localizerId, List<Path> localDirs,
       RecordFactory recordFactory) throws IOException {
@@ -187,14 +178,13 @@ public class ContainerLocalizer {
       exec = createDownloadThreadPool();
       CompletionService<Path> ecs = createCompletionService(exec);
       localizeFiles(nodeManager, ecs, ugi);
+      return;
     } catch (Throwable e) {
       throw new IOException(e);
     } finally {
       try {
         if (exec != null) {
-          exec.shutdown();
-          destroyShellProcesses(getAllShells());
-          exec.awaitTermination(10, TimeUnit.SECONDS);
+          exec.shutdownNow();
         }
         LocalDirAllocator.removeContext(appCacheDirContextName);
       } finally {
@@ -212,34 +202,10 @@ public class ContainerLocalizer {
     return new ExecutorCompletionService<Path>(exec);
   }
 
-  class FSDownloadWrapper extends FSDownload {
-
-    FSDownloadWrapper(FileContext files, UserGroupInformation ugi,
-        Configuration conf, Path destDirPath, LocalResource resource) {
-      super(files, ugi, conf, destDirPath, resource);
-    }
-
-    @Override
-    public Path call() throws Exception {
-      Thread currentThread = Thread.currentThread();
-      localizingThreads.add(currentThread);
-      try {
-        return doDownloadCall();
-      } finally {
-        localizingThreads.remove(currentThread);
-      }
-    }
-
-    Path doDownloadCall() throws Exception {
-      return super.call();
-    }
-
-  }
-
   Callable<Path> download(Path path, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
     diskValidator.checkStatus(new File(path.toUri().getRawPath()));
-    return new FSDownloadWrapper(lfs, ugi, conf, path, rsrc);
+    return new FSDownload(lfs, ugi, conf, path, rsrc);
   }
 
   static long getEstimatedSize(LocalResource rsrc) {
@@ -397,7 +363,6 @@ public class ContainerLocalizer {
 
   public static void main(String[] argv) throws Throwable {
     Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
-    int nRet = 0;
     // usage: $0 user appId locId host port app_log_dir user_dir [user_dir]*
     // let $x = $x/usercache for $local.dir
     // MKDIR $x/$user/appcache/$appid
@@ -434,9 +399,7 @@ public class ContainerLocalizer {
       // space in both DefaultCE and LCE cases
       e.printStackTrace(System.out);
       LOG.error("Exception in main:", e);
-      nRet = -1;
-    } finally {
-      System.exit(nRet);
+      System.exit(-1);
     }
   }
 
@@ -473,12 +436,4 @@ public class ContainerLocalizer {
       lfs.setPermission(dirPath, perms);
     }
   }
-
-  private void destroyShellProcesses(Set<Shell> shells) {
-    for (Shell shell : shells) {
-      if(localizingThreads.contains(shell.getWaitingThread())) {
-        shell.getProcess().destroy();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b806201f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
index 10eb239..fac7086 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
@@ -17,8 +17,6 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import static junit.framework.TestCase.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -27,7 +25,6 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -48,9 +45,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -65,9 +60,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -84,7 +76,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -101,18 +92,18 @@ public class TestContainerLocalizer {
   static final InetSocketAddress nmAddr =
       new InetSocketAddress("foobar", 8040);
 
+  private AbstractFileSystem spylfs;
+  private Random random;
+  private List<Path> localDirs;
+  private Path tokenPath;
+  private LocalizationProtocol nmProxy;
 
   @Test
   public void testMain() throws Exception {
-    ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
+    FileContext fs = FileContext.getLocalFSFileContext();
+    spylfs = spy(fs.getDefaultFileSystem());
     ContainerLocalizer localizer =
-        wrapper.setupContainerLocalizerForTest();
-    Random random = wrapper.random;
-    List<Path> localDirs = wrapper.localDirs;
-    Path tokenPath = wrapper.tokenPath;
-    LocalizationProtocol nmProxy = wrapper.nmProxy;
-    AbstractFileSystem spylfs = wrapper.spylfs;
-    mockOutDownloads(localizer);
+        setupContainerLocalizerForTest();
 
     // verify created cache
     List<Path> privCacheList = new ArrayList<Path>();
@@ -140,7 +131,7 @@ public class TestContainerLocalizer {
     ResourceLocalizationSpec rsrcD =
         getMockRsrc(random, LocalResourceVisibility.PRIVATE,
           privCacheList.get(0));
-
+    
     when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcA)))
@@ -211,10 +202,10 @@ public class TestContainerLocalizer {
 
   @Test(timeout = 15000)
   public void testMainFailure() throws Exception {
-    ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
-    ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
-    LocalizationProtocol nmProxy = wrapper.nmProxy;
-    mockOutDownloads(localizer);
+
+    FileContext fs = FileContext.getLocalFSFileContext();
+    spylfs = spy(fs.getDefaultFileSystem());
+    ContainerLocalizer localizer = setupContainerLocalizerForTest();
 
     // Assume the NM heartbeat fails say because of absent tokens.
     when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow(
@@ -232,11 +223,9 @@ public class TestContainerLocalizer {
   @Test
   @SuppressWarnings("unchecked")
   public void testLocalizerTokenIsGettingRemoved() throws Exception {
-    ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
-    ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
-    Path tokenPath = wrapper.tokenPath;
-    AbstractFileSystem spylfs = wrapper.spylfs;
-    mockOutDownloads(localizer);
+    FileContext fs = FileContext.getLocalFSFileContext();
+    spylfs = spy(fs.getDefaultFileSystem());
+    ContainerLocalizer localizer = setupContainerLocalizerForTest();
     doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
         any(CompletionService.class), any(UserGroupInformation.class));
     localizer.runLocalization(nmAddr);
@@ -248,10 +237,10 @@ public class TestContainerLocalizer {
   public void testContainerLocalizerClosesFilesystems() throws Exception {
 
     // verify filesystems are closed when localizer doesn't fail
-    ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
+    FileContext fs = FileContext.getLocalFSFileContext();
+    spylfs = spy(fs.getDefaultFileSystem());
 
-    ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
-    mockOutDownloads(localizer);
+    ContainerLocalizer localizer = setupContainerLocalizerForTest();
     doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
         any(CompletionService.class), any(UserGroupInformation.class));
     verify(localizer, never()).closeFileSystems(
@@ -260,8 +249,10 @@ public class TestContainerLocalizer {
     localizer.runLocalization(nmAddr);
     verify(localizer).closeFileSystems(any(UserGroupInformation.class));
 
+    spylfs = spy(fs.getDefaultFileSystem());
+
     // verify filesystems are closed when localizer fails
-    localizer = wrapper.setupContainerLocalizerForTest();
+    localizer = setupContainerLocalizerForTest();
     doThrow(new YarnRuntimeException("Forced 
Failure")).when(localizer).localizeFiles(
         any(LocalizationProtocol.class), any(CompletionService.class),
         any(UserGroupInformation.class));
@@ -275,102 +266,41 @@ public class TestContainerLocalizer {
     }
   }
 
-  @Test
-  public void testMultipleLocalizers() throws Exception {
-    FakeContainerLocalizerWrapper testA = new FakeContainerLocalizerWrapper();
-    FakeContainerLocalizerWrapper testB = new FakeContainerLocalizerWrapper();
-
-    final FakeContainerLocalizer localizerA = testA.init();
-    final FakeContainerLocalizer localizerB = testB.init();
-
-    // run localization
-    Thread threadA = new Thread() {
-      @Override
-      public void run() {
-        try {
-          localizerA.runLocalization(nmAddr);
-        } catch (Exception e) {
-          LOG.warn(e);
-        }
-      }
-    };
-    Thread threadB = new Thread() {
-      @Override
-      public void run() {
-        try {
-          localizerB.runLocalization(nmAddr);
-        } catch (Exception e) {
-          LOG.warn(e);
-        }
-      }
-    };
-    ShellCommandExecutor shexcA = null;
-    ShellCommandExecutor shexcB = null;
-    try {
-      threadA.start();
-      threadB.start();
-
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          FakeContainerLocalizer.FakeLongDownload downloader =
-              localizerA.getDownloader();
-          return downloader != null && downloader.getShexc() != null &&
-              downloader.getShexc().getProcess() != null;
-        }
-      }, 10, 30000);
-
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          FakeContainerLocalizer.FakeLongDownload downloader =
-              localizerB.getDownloader();
-          return downloader != null && downloader.getShexc() != null &&
-              downloader.getShexc().getProcess() != null;
-        }
-      }, 10, 30000);
-
-      shexcA = localizerA.getDownloader().getShexc();
-      shexcB = localizerB.getDownloader().getShexc();
-
-      assertTrue("Localizer A process not running, but should be",
-          shexcA.getProcess().isAlive());
-      assertTrue("Localizer B process not running, but should be",
-          shexcB.getProcess().isAlive());
-
-      // Stop heartbeat from giving anymore resources to download
-      testA.heartbeatResponse++;
-      testB.heartbeatResponse++;
-
-      // Send DIE to localizerA. This should kill its subprocesses
-      testA.heartbeatResponse++;
-
-      threadA.join();
-      shexcA.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
-
-      assertFalse("Localizer A process is still running, but shouldn't be",
-          shexcA.getProcess().isAlive());
-      assertTrue("Localizer B process not running, but should be",
-          shexcB.getProcess().isAlive());
-
-    } finally {
-      // Make sure everything gets cleaned up
-      // Process A should already be dead
-      shexcA.getProcess().destroy();
-      shexcB.getProcess().destroy();
-      shexcA.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
-      shexcB.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
-
-      threadA.join();
-      // Send DIE to localizer B
-      testB.heartbeatResponse++;
-      threadB.join();
+  @SuppressWarnings("unchecked") // mocked generics
+  private ContainerLocalizer setupContainerLocalizerForTest()
+      throws Exception {
+    // don't actually create dirs
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+    Configuration conf = new Configuration();
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    localDirs = new ArrayList<Path>();
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
     }
-  }
+    RecordFactory mockRF = getMockLocalizerRecordFactory();
+    ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
+        appId, containerId, localDirs, mockRF);
+    ContainerLocalizer localizer = spy(concreteLoc);
+
+    // return credential stream instead of opening local file
+    random = new Random();
+    long seed = random.nextLong();
+    System.out.println("SEED: " + seed);
+    random.setSeed(seed);
+    DataInputBuffer appTokens = createFakeCredentials(random, 10);
+    tokenPath =
+      lfs.makeQualified(new Path(
+            String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
+                containerId)));
+    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+        ).when(spylfs).open(tokenPath);
+    nmProxy = mock(LocalizationProtocol.class);
+    doReturn(nmProxy).when(localizer).getProxy(nmAddr);
+    doNothing().when(localizer).sleep(anyInt());
+    
 
-
-
-  private void mockOutDownloads(ContainerLocalizer localizer) {
     // return result instantly for deterministic test
     ExecutorService syncExec = mock(ExecutorService.class);
     CompletionService<Path> cs = mock(CompletionService.class);
@@ -388,6 +318,8 @@ public class TestContainerLocalizer {
         });
     doReturn(syncExec).when(localizer).createDownloadThreadPool();
     doReturn(cs).when(localizer).createCompletionService(syncExec);
+
+    return localizer;
   }
 
   static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
@@ -431,141 +363,6 @@ public class TestContainerLocalizer {
     }
   }
 
-  class FakeContainerLocalizer extends ContainerLocalizer  {
-    private FakeLongDownload downloader;
-
-    FakeContainerLocalizer(FileContext lfs, String user, String appId,
-        String localizerId, List<Path> localDirs,
-        RecordFactory recordFactory) throws IOException {
-      super(lfs, user, appId, localizerId, localDirs, recordFactory);
-    }
-
-    FakeLongDownload getDownloader() {
-      return downloader;
-    }
-
-    @Override
-    Callable<Path> download(Path path, LocalResource rsrc,
-        UserGroupInformation ugi) throws IOException {
-      downloader = new FakeLongDownload(Mockito.mock(FileContext.class), ugi,
-          new Configuration(), path, rsrc);
-      return downloader;
-    }
-
-    class FakeLongDownload extends ContainerLocalizer.FSDownloadWrapper {
-      private final Path localPath;
-      private Shell.ShellCommandExecutor shexc;
-      FakeLongDownload(FileContext files, UserGroupInformation ugi,
-          Configuration conf, Path destDirPath, LocalResource resource) {
-        super(files, ugi, conf, destDirPath, resource);
-        this.localPath = new Path("file:///localcache");
-      }
-
-      Shell.ShellCommandExecutor getShexc() {
-        return shexc;
-      }
-
-      @Override
-      public Path doDownloadCall() throws IOException {
-        String sleepCommand = "sleep 30";
-        String[] shellCmd = {"bash", "-c", sleepCommand};
-        shexc = new Shell.ShellCommandExecutor(shellCmd);
-        shexc.execute();
-
-        return localPath;
-      }
-    }
-  }
-
-  class ContainerLocalizerWrapper {
-    AbstractFileSystem spylfs;
-    Random random;
-    List<Path> localDirs;
-    Path tokenPath;
-    LocalizationProtocol nmProxy;
-
-    @SuppressWarnings("unchecked") // mocked generics
-    FakeContainerLocalizer setupContainerLocalizerForTest()
-        throws Exception {
-
-      FileContext fs = FileContext.getLocalFSFileContext();
-      spylfs = spy(fs.getDefaultFileSystem());
-      // don't actually create dirs
-      doNothing().when(spylfs).mkdir(
-          isA(Path.class), isA(FsPermission.class), anyBoolean());
-
-      Configuration conf = new Configuration();
-      FileContext lfs = FileContext.getFileContext(spylfs, conf);
-      localDirs = new ArrayList<Path>();
-      for (int i = 0; i < 4; ++i) {
-        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
-      }
-      RecordFactory mockRF = getMockLocalizerRecordFactory();
-      FakeContainerLocalizer concreteLoc = new FakeContainerLocalizer(lfs,
-          appUser, appId, containerId, localDirs, mockRF);
-      FakeContainerLocalizer localizer = spy(concreteLoc);
-
-      // return credential stream instead of opening local file
-      random = new Random();
-      long seed = random.nextLong();
-      System.out.println("SEED: " + seed);
-      random.setSeed(seed);
-      DataInputBuffer appTokens = createFakeCredentials(random, 10);
-      tokenPath =
-        lfs.makeQualified(new Path(
-              String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
-                  containerId)));
-      doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
-          ).when(spylfs).open(tokenPath);
-      nmProxy = mock(LocalizationProtocol.class);
-      doReturn(nmProxy).when(localizer).getProxy(nmAddr);
-      doNothing().when(localizer).sleep(anyInt());
-
-      return localizer;
-    }
-
-  }
-
-  class FakeContainerLocalizerWrapper extends ContainerLocalizerWrapper{
-    private int heartbeatResponse = 0;
-    public FakeContainerLocalizer init() throws Exception {
-      FileContext fs = FileContext.getLocalFSFileContext();
-      FakeContainerLocalizer localizer = setupContainerLocalizerForTest();
-
-      // verify created cache
-      List<Path> privCacheList = new ArrayList<Path>();
-      for (Path p : localDirs) {
-        Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE),
-            appUser);
-        Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
-        privCacheList.add(privcache);
-      }
-
-      final ResourceLocalizationSpec rsrc = getMockRsrc(random,
-          LocalResourceVisibility.PRIVATE, privCacheList.get(0));
-
-      // mock heartbeat responses from NM
-      doAnswer(new Answer<MockLocalizerHeartbeatResponse>() {
-        @Override
-        public MockLocalizerHeartbeatResponse answer(
-            InvocationOnMock invocationOnMock) throws Throwable {
-          if(heartbeatResponse == 0) {
-            return new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
-                Collections.singletonList(rsrc));
-          } else if (heartbeatResponse < 2) {
-            return new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
-                Collections.<ResourceLocalizationSpec>emptyList());
-          } else {
-            return new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
-                null);
-          }
-        }
-      }).when(nmProxy).heartbeat(isA(LocalizerStatus.class));
-
-      return localizer;
-    }
-  }
-
   static RecordFactory getMockLocalizerRecordFactory() {
     RecordFactory mockRF = mock(RecordFactory.class);
     when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to