Repository: hadoop
Updated Branches:
  refs/heads/trunk 2c1469036 -> cfba35505


HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran 
Dimitrijevic via raviprak)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cfba3550
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cfba3550
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cfba3550

Branch: refs/heads/trunk
Commit: cfba355052df15f8eb6cc9b8e90e2d8492bec7d7
Parents: 2c14690
Author: Ravi Prakash <ravip...@altiscale.com>
Authored: Tue Apr 21 16:43:02 2015 -0700
Committer: Ravi Prakash <ravip...@altiscale.com>
Committed: Tue Apr 21 16:49:37 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../apache/hadoop/tools/DistCpConstants.java    |   4 +
 .../apache/hadoop/tools/DistCpOptionSwitch.java |   9 +-
 .../org/apache/hadoop/tools/DistCpOptions.java  |  27 +++
 .../org/apache/hadoop/tools/OptionsParser.java  |  12 ++
 .../apache/hadoop/tools/SimpleCopyListing.java  | 169 +++++++++++++++---
 .../hadoop/tools/util/ProducerConsumer.java     | 177 +++++++++++++++++++
 .../apache/hadoop/tools/util/WorkReport.java    |  78 ++++++++
 .../apache/hadoop/tools/util/WorkRequest.java   |  53 ++++++
 .../hadoop/tools/util/WorkRequestProcessor.java |  38 ++++
 .../apache/hadoop/tools/TestCopyListing.java    |  20 ++-
 .../apache/hadoop/tools/TestIntegration.java    |  17 ++
 .../apache/hadoop/tools/TestOptionsParser.java  |  42 +++++
 .../hadoop/tools/util/TestProducerConsumer.java | 109 ++++++++++++
 14 files changed, 728 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 02066b6..a6814f8 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -499,6 +499,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-11819. HttpServerFunctionalTest#prepareTestWebapp should create web
     app directory if it does not exist. (Rohith via vinayakumarb)
 
+    HADOOP-11827. Speed-up distcp buildListing() using threadpool
+    (Zoran Dimitrijevic via raviprak)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index a1af2af..7ecb6ce 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -23,6 +23,9 @@ package org.apache.hadoop.tools;
  */
 public class DistCpConstants {
 
+  /* Default number of threads to use for building file listing */
+  public static final int DEFAULT_LISTSTATUS_THREADS = 1;
+
   /* Default number of maps to use for DistCp */
   public static final int DEFAULT_MAPS = 20;
 
@@ -47,6 +50,7 @@ public class DistCpConstants {
   public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
   public static final String CONF_LABEL_DELETE_MISSING = 
"distcp.delete.missing.source";
   public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+  public static final String CONF_LABEL_LISTSTATUS_THREADS = 
"distcp.liststatus.threads";
   public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
   public static final String CONF_LABEL_SOURCE_LISTING = 
"distcp.source.listing";
   public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index e9c7d46..f90319d 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -83,7 +83,14 @@ public enum DistCpOptionSwitch {
   SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
       new Option("mapredSslConf", true, "Configuration for ssl config file" +
           ", to use with hftps://")),
-
+  /**
+   * Number of threads for building source file listing (before map-reduce
+   * phase, max one listStatus per thread at a time).
+   */
+  NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
+      new Option("numListstatusThreads", true, "Number of threads to " +
+          "use for building file listing (max " +
+          DistCpOptions.maxNumListstatusThreads + ").")),
   /**
    * Max number of maps to use during copy. DistCp will split work
    * as equally as possible among these maps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 709e583..d8f3ff7 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -44,6 +44,8 @@ public class DistCpOptions {
   private boolean blocking = true;
   private boolean useDiff = false;
 
+  public static final int maxNumListstatusThreads = 40;
+  private int numListstatusThreads = 0;  // Indicates that flag is not set.
   private int maxMaps = DistCpConstants.DEFAULT_MAPS;
   private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
 
@@ -124,6 +126,7 @@ public class DistCpOptions {
       this.overwrite = that.overwrite;
       this.skipCRC = that.skipCRC;
       this.blocking = that.blocking;
+      this.numListstatusThreads = that.numListstatusThreads;
       this.maxMaps = that.maxMaps;
       this.mapBandwidth = that.mapBandwidth;
       this.sslConfigurationFile = that.getSslConfigurationFile();
@@ -312,6 +315,30 @@ public class DistCpOptions {
     this.skipCRC = skipCRC;
   }
 
+  /** Get the number of threads to use for listStatus
+   *
+   * @return Number of threads to do listStatus
+   */
+  public int getNumListstatusThreads() {
+    return numListstatusThreads;
+  }
+
+  /** Set the number of threads to use for listStatus. We allow max 40
+   *  threads. Setting numThreads to zero signify we should use the value
+   *  from conf properties.
+   *
+   * @param numThreads - Number of threads
+   */
+  public void setNumListstatusThreads(int numThreads) {
+    if (numThreads > maxNumListstatusThreads) {
+      this.numListstatusThreads = maxNumListstatusThreads;
+    } else if (numThreads > 0) {
+      this.numListstatusThreads = numThreads;
+    } else {
+      this.numListstatusThreads = 0;
+    }
+  }
+
   /** Get the max number of maps to use for this copy
    *
    * @return Max number of maps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index a3a76ef..1729479 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -185,6 +185,18 @@ public class OptionsParser {
           getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
     }
 
+    if 
(command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
+      try {
+        Integer numThreads = Integer.parseInt(getVal(command,
+              DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
+        option.setNumListstatusThreads(numThreads);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Number of liststatus threads is invalid: " + getVal(command,
+                DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
+      }
+    }
+
     if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
       try {
         Integer maps = Integer.parseInt(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index e8a23aa..b9ba099 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -29,13 +29,17 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.util.ProducerConsumer;
+import org.apache.hadoop.tools.util.WorkReport;
+import org.apache.hadoop.tools.util.WorkRequest;
+import org.apache.hadoop.tools.util.WorkRequestProcessor;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.*;
-import java.util.Stack;
+import java.util.ArrayList;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -50,7 +54,10 @@ public class SimpleCopyListing extends CopyListing {
   private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
 
   private long totalPaths = 0;
+  private long totalDirs = 0;
   private long totalBytesToCopy = 0;
+  private int numListstatusThreads = 1;
+  private final int maxRetries = 3;
 
   /**
    * Protected constructor, to initialize configuration.
@@ -61,6 +68,16 @@ public class SimpleCopyListing extends CopyListing {
    */
   protected SimpleCopyListing(Configuration configuration, Credentials 
credentials) {
     super(configuration, credentials);
+    numListstatusThreads = getConf().getInt(
+        DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
+        DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
+  }
+
+  @VisibleForTesting
+  protected SimpleCopyListing(Configuration configuration, Credentials 
credentials,
+                              int numListstatusThreads) {
+    super(configuration, credentials);
+    this.numListstatusThreads = numListstatusThreads;
   }
 
   @Override
@@ -160,6 +177,10 @@ public class SimpleCopyListing extends CopyListing {
   @VisibleForTesting
   public void doBuildListing(SequenceFile.Writer fileListWriter,
       DistCpOptions options) throws IOException {
+    if (options.getNumListstatusThreads() > 0) {
+      numListstatusThreads = options.getNumListstatusThreads();
+    }
+
     try {
       for (Path path: options.getSourcePaths()) {
         FileSystem sourceFS = path.getFileSystem(getConf());
@@ -181,6 +202,7 @@ public class SimpleCopyListing extends CopyListing {
               sourcePathRoot, options);
         }
         if (explore) {
+          ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
           for (FileStatus sourceStatus: sourceFiles) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Recording source-path: " + sourceStatus.getPath() + " 
for copy.");
@@ -195,15 +217,18 @@ public class SimpleCopyListing extends CopyListing {
 
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Traversing source dir: " + sourceStatus.getPath());
+                LOG.debug("Adding source dir for traverse: " + 
sourceStatus.getPath());
               }
-              traverseDirectory(fileListWriter, sourceFS, sourceStatus,
-                                sourcePathRoot, options);
+              sourceDirs.add(sourceStatus);
             }
           }
+          traverseDirectory(fileListWriter, sourceFS, sourceDirs,
+                            sourcePathRoot, options);
         }
       }
       fileListWriter.close();
+      printStats();
+      LOG.info("Build file listing completed.");
       fileListWriter = null;
     } finally {
       IOUtils.cleanup(LOG, fileListWriter);
@@ -275,43 +300,135 @@ public class SimpleCopyListing extends CopyListing {
             
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
   }
 
-  private static FileStatus[] getChildren(FileSystem fileSystem,
-                                         FileStatus parent) throws IOException 
{
-    return fileSystem.listStatus(parent.getPath());
+  /*
+   *  Private class to implement WorkRequestProcessor interface. It processes
+   *  each directory (represented by FileStatus item) and returns a list of all
+   *  file-system objects in that directory (files and directories). In case of
+   *  retriable exceptions it increments retry counter and returns the same
+   *  directory for later retry.
+   */
+  private static class FileStatusProcessor
+      implements WorkRequestProcessor<FileStatus, FileStatus[]> {
+    private FileSystem fileSystem;
+
+    public FileStatusProcessor(FileSystem fileSystem) {
+      this.fileSystem = fileSystem;
+    }
+
+    /*
+     *  Processor for FileSystem.listStatus().
+     *
+     *  @param workRequest  Input work item that contains FileStatus item which
+     *                      is a parent directory we want to list.
+     *  @return Outputs WorkReport<FileStatus[]> with a list of objects in the
+     *          directory (array of objects, empty if parent directory is
+     *          empty). In case of intermittent exception we increment retry
+     *          counter and return the list containing the parent directory).
+     */
+    public WorkReport<FileStatus[]> processItem(
+        WorkRequest<FileStatus> workRequest) {
+      FileStatus parent = workRequest.getItem();
+      int retry = workRequest.getRetry();
+      WorkReport<FileStatus[]> result = null;
+      try {
+        if (retry > 0) {
+          int sleepSeconds = 2;
+          for (int i = 1; i < retry; i++) {
+            sleepSeconds *= 2;
+          }
+          try {
+            Thread.sleep(1000 * sleepSeconds);
+          } catch (InterruptedException ie) {
+            LOG.debug("Interrupted while sleeping in exponential backoff.");
+          }
+        }
+        result = new WorkReport<FileStatus[]>(
+            fileSystem.listStatus(parent.getPath()), 0, true);
+      } catch (FileNotFoundException fnf) {
+        LOG.error("FileNotFoundException exception in listStatus: " +
+                  fnf.getMessage());
+        result = new WorkReport<FileStatus[]>(new FileStatus[0], 0, true, fnf);
+      } catch (Exception e) {
+        LOG.error("Exception in listStatus. Will send for retry.");
+        FileStatus[] parentList = new FileStatus[1];
+        parentList[0] = parent;
+        result = new WorkReport<FileStatus[]>(parentList, retry + 1, false, e);
+      }
+      return result;
+    }
+  }
+
+  private void printStats() {
+    LOG.info("Paths (files+dirs) cnt = " + totalPaths +
+             "; dirCnt = " + totalDirs);
+  }
+
+  private void maybePrintStats() {
+    if (totalPaths % 100000 == 0) {
+      printStats();
+    }
   }
 
   private void traverseDirectory(SequenceFile.Writer fileListWriter,
                                  FileSystem sourceFS,
-                                 FileStatus sourceStatus,
+                                 ArrayList<FileStatus> sourceDirs,
                                  Path sourcePathRoot,
                                  DistCpOptions options)
                                  throws IOException {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
-    Stack<FileStatus> pathStack = new Stack<FileStatus>();
-    pathStack.push(sourceStatus);
 
-    while (!pathStack.isEmpty()) {
-      for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recording source-path: " + child.getPath() + " for 
copy.");
-        }
-        CopyListingFileStatus childCopyListingStatus =
-          DistCpUtils.toCopyListingFileStatus(sourceFS, child,
-            preserveAcls && child.isDirectory(),
-            preserveXAttrs && child.isDirectory(),
-            preserveRawXattrs && child.isDirectory());
-        writeToFileListing(fileListWriter, childCopyListingStatus,
-             sourcePathRoot, options);
-        if (child.isDirectory()) {
+    assert numListstatusThreads > 0;
+    LOG.debug("Starting thread pool of " + numListstatusThreads +
+              " listStatus workers.");
+    ProducerConsumer<FileStatus, FileStatus[]> workers =
+        new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+    for (int i = 0; i < numListstatusThreads; i++) {
+      workers.addWorker(
+          new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf())));
+    }
+
+    for (FileStatus status : sourceDirs) {
+      workers.put(new WorkRequest<FileStatus>(status, 0));
+      maybePrintStats();
+    }
+
+    while (workers.hasWork()) {
+      try {
+        WorkReport<FileStatus[]> workResult = workers.take();
+        int retry = workResult.getRetry();
+        for (FileStatus child: workResult.getItem()) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Traversing into source dir: " + child.getPath());
+            LOG.debug("Recording source-path: " + child.getPath() + " for 
copy.");
+          }
+          if (retry == 0) {
+            CopyListingFileStatus childCopyListingStatus =
+              DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                preserveAcls && child.isDirectory(),
+                preserveXAttrs && child.isDirectory(),
+                preserveRawXattrs && child.isDirectory());
+            writeToFileListing(fileListWriter, childCopyListingStatus,
+                 sourcePathRoot, options);
+          }
+          if (retry < maxRetries) {
+            if (child.isDirectory()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Traversing into source dir: " + child.getPath());
+              }
+              workers.put(new WorkRequest<FileStatus>(child, retry));
+              maybePrintStats();
+            }
+          } else {
+            LOG.error("Giving up on " + child.getPath() +
+                      " after " + retry + " retries.");
           }
-          pathStack.push(child);
         }
+      } catch (InterruptedException ie) {
+        LOG.error("Could not get item from childQueue. Retrying...");
       }
     }
+    workers.shutdown();
   }
 
   private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
@@ -351,6 +468,8 @@ public class SimpleCopyListing extends CopyListing {
 
     if (!fileStatus.isDirectory()) {
       totalBytesToCopy += fileStatus.getLen();
+    } else {
+      totalDirs++;
     }
     totalPaths++;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
new file mode 100644
index 0000000..3dad4e3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.util.WorkReport;
+import org.apache.hadoop.tools.util.WorkRequest;
+import org.apache.hadoop.tools.util.WorkRequestProcessor;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * ProducerConsumer class encapsulates input and output queues and a
+ * thread-pool of Workers that loop on WorkRequest<T> inputQueue and for each
+ * consumed WorkRequest Workers invoke WorkRequestProcessor.processItem()
+ * and output resulting WorkReport<R> to the outputQueue.
+ */
+public class ProducerConsumer<T, R> {
+  private Log LOG = LogFactory.getLog(ProducerConsumer.class);
+  private LinkedBlockingQueue<WorkRequest<T>> inputQueue;
+  private LinkedBlockingQueue<WorkReport<R>> outputQueue;
+  private ExecutorService executor;
+  private AtomicInteger workCnt;
+
+  /**
+   *  ProducerConsumer maintains input and output queues and a thread-pool of
+   *  workers.
+   *
+   *  @param numThreads   Size of thread-pool to execute Workers.
+   */
+  public ProducerConsumer(int numThreads) {
+    this.inputQueue = new LinkedBlockingQueue<WorkRequest<T>>();
+    this.outputQueue = new LinkedBlockingQueue<WorkReport<R>>();
+    executor = Executors.newFixedThreadPool(numThreads);
+    workCnt = new AtomicInteger(0);
+  }
+
+  /**
+   *  Add another worker that will consume WorkRequest<T> items from input
+   *  queue, process each item using supplied processor, and for every
+   *  processed item output WorkReport<R> to output queue.
+   *
+   *  @param processor  Processor implementing WorkRequestProcessor interface.
+   *
+   */
+  public void addWorker(WorkRequestProcessor<T, R> processor) {
+    executor.execute(new Worker(processor));
+  }
+
+  /**
+   *  Shutdown ProducerConsumer worker thread-pool without waiting for
+   *  completion of any pending work.
+   */
+  public void shutdown() {
+    executor.shutdown();
+  }
+
+  /**
+   *  Returns number of pending ProducerConsumer items (submitted to input
+   *  queue for processing via put() method but not yet consumed by take()
+   *  or blockingTake().
+   *
+   *  @return  Number of items in ProducerConsumer (either pending for
+   *           processing or waiting to be consumed).
+   */
+  public int getWorkCnt() {
+    return workCnt.get();
+  }
+
+  /**
+   *  Returns true if there are items in ProducerConsumer that are either
+   *  pending for processing or waiting to be consumed.
+   *
+   *  @return  True if there were more items put() to ProducerConsumer than
+   *           consumed by take() or blockingTake().
+   */
+  public boolean hasWork() {
+    return workCnt.get() > 0;
+  }
+
+  /**
+   *  Blocking put workRequest to ProducerConsumer input queue.
+   *
+   *  @param  WorkRequest<T> item to be processed.
+   */
+  public void put(WorkRequest<T> workRequest) {
+    boolean isDone = false;
+    while (!isDone) {
+      try {
+        inputQueue.put(workRequest);
+        workCnt.incrementAndGet();
+        isDone = true;
+      } catch (InterruptedException ie) {
+        LOG.error("Could not put workRequest into inputQueue. Retrying...");
+      }
+    }
+  }
+
+  /**
+   *  Blocking take from ProducerConsumer output queue that can be interrupted.
+   *
+   *  @return  WorkReport<R> item returned by processor's processItem().
+   */
+  public WorkReport<R> take() throws InterruptedException {
+    WorkReport<R> report = outputQueue.take();
+    workCnt.decrementAndGet();
+    return report;
+  }
+
+  /**
+   *  Blocking take from ProducerConsumer output queue (catches exceptions and
+   *  retries forever).
+   *
+   *  @return  WorkReport<R> item returned by processor's processItem().
+   */
+  public WorkReport<R> blockingTake() {
+    while (true) {
+      try {
+        WorkReport<R> report = outputQueue.take();
+        workCnt.decrementAndGet();
+        return report;
+      } catch (InterruptedException ie) {
+        LOG.debug("Retrying in blockingTake...");
+      }
+    }
+  }
+
+  private class Worker implements Runnable {
+    private WorkRequestProcessor<T, R> processor;
+
+    public Worker(WorkRequestProcessor<T, R> processor) {
+      this.processor = processor;
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          WorkRequest<T> work = inputQueue.take();
+          WorkReport<R> result = processor.processItem(work);
+
+          boolean isDone = false;
+          while (!isDone) {
+            try {
+              outputQueue.put(result);
+              isDone = true;
+            } catch (InterruptedException ie) {
+              LOG.debug("Could not put report into outputQueue. Retrying...");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.debug("Interrupted while waiting for request from inputQueue.");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java
new file mode 100644
index 0000000..91c9805
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+/**
+ *  WorkReport<T> is a simple container for items of class T and its
+ *  corresponding retry counter that indicates how many times this item
+ *  was previously attempted to be processed.
+ */
+public class WorkReport<T> {
+  private T item;
+  private final boolean success;
+  private final int retry;
+  private final Exception exception;
+
+  /**
+   *  @param  item       Object representing work report.
+   *  @param  retry      Number of unsuccessful attempts to process work.
+   *  @param  success    Indicates whether work was successfully completed.
+   */
+  public WorkReport(T item, int retry, boolean success) {
+    this(item, retry, success, null);
+  }
+
+  /**
+   *  @param  item       Object representing work report.
+   *  @param  retry      Number of unsuccessful attempts to process work.
+   *  @param  success    Indicates whether work was successfully completed.
+   *  @param  exception  Exception thrown while processing work.
+   */
+  public WorkReport(T item, int retry, boolean success, Exception exception) {
+    this.item = item;
+    this.retry = retry;
+    this.success = success;
+    this.exception = exception;
+  }
+
+  public T getItem() {
+    return item;
+  }
+
+  /**
+   *  @return True if the work was processed successfully.
+   */
+  public boolean getSuccess() {
+    return success;
+  }
+
+  /**
+   *  @return  Number of unsuccessful attempts to process work.
+   */
+  public int getRetry() {
+    return retry;
+  }
+
+  /**
+   *  @return  Exception thrown while processing work.
+   */
+  public Exception getException() {
+    return exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java
new file mode 100644
index 0000000..339a3ab
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+/**
+ *  WorkRequest<T> is a simple container for items of class T and its
+ *  corresponding retry counter that indicates how many times this item
+ *  was previously attempted to be processed.
+ */
+public class WorkRequest<T> {
+  private int retry;
+  private T item;
+
+  public WorkRequest(T item) {
+    this(item, 0);
+  }
+
+  /**
+   *  @param  item   Object representing WorkRequest input data.
+   *  @param  retry  Number of previous attempts to process this work request.
+   */
+  public WorkRequest(T item, int retry) {
+    this.item = item;
+    this.retry = retry;
+  }
+
+  public T getItem() {
+    return item;
+  }
+
+  /**
+   *  @return  Number of previous attempts to process this work request.
+   */
+  public int getRetry() {
+    return retry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
new file mode 100644
index 0000000..b6d8a09
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.tools.util.WorkReport;
+import org.apache.hadoop.tools.util.WorkRequest;
+
+/**
+ *  Interface for ProducerConsumer worker loop.
+ *
+ */
+public interface WorkRequestProcessor<T, R> {
+
+  /**
+   * Work processor.
+   *
+   * @param   workRequest  Input work item.
+   * @return  Outputs WorkReport after processing workRequest item.
+   *
+   */
+  public WorkReport<R> processItem(WorkRequest<T> workRequest);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
index d8f7e0b..8381c1b 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
@@ -32,6 +32,9 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.junit.Test;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -40,9 +43,12 @@ import org.junit.AfterClass;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.List;
+import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
+@RunWith(value = Parameterized.class)
 public class TestCopyListing extends SimpleCopyListing {
   private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
 
@@ -63,9 +69,15 @@ public class TestCopyListing extends SimpleCopyListing {
       cluster.shutdown();
     }
   }
-  
-  public TestCopyListing() {
-    super(config, CREDENTIALS);
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 }, { 20} };
+    return Arrays.asList(data);
+  }
+
+  public TestCopyListing(int numListstatusThreads) {
+    super(config, CREDENTIALS, numListstatusThreads);
   }
 
   protected TestCopyListing(Configuration configuration) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
index 67d885a..5726342 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
@@ -30,14 +30,19 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
+@RunWith(value = Parameterized.class)
 public class TestIntegration {
   private static final Log LOG = LogFactory.getLog(TestIntegration.class);
 
@@ -46,6 +51,17 @@ public class TestIntegration {
   private static Path listFile;
   private static Path target;
   private static String root;
+  private int numListstatusThreads;
+
+  public TestIntegration(int numListstatusThreads) {
+    this.numListstatusThreads = numListstatusThreads;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 } };
+    return Arrays.asList(data);
+  }
 
   private static Configuration getConf() {
     Configuration conf = new Configuration();
@@ -597,6 +613,7 @@ public class TestIntegration {
     options.setDeleteMissing(delete);
     options.setOverwrite(overwrite);
     options.setTargetPathExists(targetExists);
+    options.setNumListstatusThreads(numListstatusThreads);
     try {
       new DistCp(getConf(), options).execute();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index cc9da33..6eddfb2 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -304,6 +304,48 @@ public class TestOptionsParser {
   }
 
   @Test
+  public void testParseNumListstatusThreads() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    // If command line argument isn't set, we expect .getNumListstatusThreads
+    // option to be zero (so that we know when to override conf properties).
+    Assert.assertEquals(0, options.getNumListstatusThreads());
+
+    options = OptionsParser.parse(new String[] {
+        "--numListstatusThreads",
+        "12",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(12, options.getNumListstatusThreads());
+
+    options = OptionsParser.parse(new String[] {
+        "--numListstatusThreads",
+        "0",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(0, options.getNumListstatusThreads());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "--numListstatusThreads",
+          "hello",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Non numberic numListstatusThreads parsed");
+    } catch (IllegalArgumentException ignore) { }
+
+    // Ignore large number of threads.
+    options = OptionsParser.parse(new String[] {
+        "--numListstatusThreads",
+        "100",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(DistCpOptions.maxNumListstatusThreads,
+                        options.getNumListstatusThreads());
+  }
+
+  @Test
   public void testSourceListing() {
     DistCpOptions options = OptionsParser.parse(new String[] {
         "-f",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
new file mode 100644
index 0000000..de0fcfd
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.tools.util.ProducerConsumer;
+import org.apache.hadoop.tools.util.WorkReport;
+import org.apache.hadoop.tools.util.WorkRequest;
+import org.apache.hadoop.tools.util.WorkRequestProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.Exception;
+import java.lang.Integer;
+
+public class TestProducerConsumer {
+  public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> 
{
+    public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
+      Integer item = new Integer(workRequest.getItem());
+      return new WorkReport<Integer>(item, 0, true);
+    }
+  }
+
+  public class ExceptionProcessor implements WorkRequestProcessor<Integer, 
Integer> {
+    @SuppressWarnings("null")
+    public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
+      try {
+        Integer item = null;
+        item.intValue(); // Throw NULL pointer exception.
+
+        // We should never be here (null pointer exception above)
+        return new WorkReport<Integer>(item, 0, true);
+      } catch (Exception e) {
+        Integer item = new Integer(workRequest.getItem());
+        return new WorkReport<Integer>(item, 1, false, e);
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleProducerConsumer() {
+    ProducerConsumer<Integer, Integer> worker =
+        new ProducerConsumer<Integer, Integer>(1);
+    worker.addWorker(new CopyProcessor());
+    worker.put(new WorkRequest<Integer>(42));
+    try {
+      WorkReport<Integer> report = worker.take();
+      Assert.assertEquals(42, report.getItem().intValue());
+    } catch (InterruptedException ie) {
+      Assert.assertTrue(false);
+    }
+  }
+
+  @Test
+  public void testMultipleProducerConsumer() {
+    ProducerConsumer<Integer, Integer> workers =
+        new ProducerConsumer<Integer, Integer>(10);
+    for (int i = 0; i < 10; i++) {
+      workers.addWorker(new CopyProcessor());
+    }
+
+    int sum = 0;
+    int numRequests = 2000;
+    for (int i = 0; i < numRequests; i++) {
+      workers.put(new WorkRequest<Integer>(i + 42));
+      sum += i + 42;
+    }
+
+    int numReports = 0;
+    while (workers.getWorkCnt() > 0) {
+      WorkReport<Integer> report = workers.blockingTake();
+      sum -= report.getItem().intValue();
+      numReports++;
+    }
+    Assert.assertEquals(0, sum);
+    Assert.assertEquals(numRequests, numReports);
+  }
+
+  @Test
+  public void testExceptionProducerConsumer() {
+    ProducerConsumer<Integer, Integer> worker =
+        new ProducerConsumer<Integer, Integer>(1);
+    worker.addWorker(new ExceptionProcessor());
+    worker.put(new WorkRequest<Integer>(42));
+    try {
+      WorkReport<Integer> report = worker.take();
+      Assert.assertEquals(42, report.getItem().intValue());
+      Assert.assertFalse(report.getSuccess());
+      Assert.assertNotNull(report.getException());
+    } catch (InterruptedException ie) {
+      Assert.assertTrue(false);
+    }
+  }
+}

Reply via email to