Repository: hadoop Updated Branches: refs/heads/trunk 2c1469036 -> cfba35505
HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran Dimitrijevic via raviprak) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cfba3550 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cfba3550 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cfba3550 Branch: refs/heads/trunk Commit: cfba355052df15f8eb6cc9b8e90e2d8492bec7d7 Parents: 2c14690 Author: Ravi Prakash <ravip...@altiscale.com> Authored: Tue Apr 21 16:43:02 2015 -0700 Committer: Ravi Prakash <ravip...@altiscale.com> Committed: Tue Apr 21 16:49:37 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/tools/DistCpConstants.java | 4 + .../apache/hadoop/tools/DistCpOptionSwitch.java | 9 +- .../org/apache/hadoop/tools/DistCpOptions.java | 27 +++ .../org/apache/hadoop/tools/OptionsParser.java | 12 ++ .../apache/hadoop/tools/SimpleCopyListing.java | 169 +++++++++++++++--- .../hadoop/tools/util/ProducerConsumer.java | 177 +++++++++++++++++++ .../apache/hadoop/tools/util/WorkReport.java | 78 ++++++++ .../apache/hadoop/tools/util/WorkRequest.java | 53 ++++++ .../hadoop/tools/util/WorkRequestProcessor.java | 38 ++++ .../apache/hadoop/tools/TestCopyListing.java | 20 ++- .../apache/hadoop/tools/TestIntegration.java | 17 ++ .../apache/hadoop/tools/TestOptionsParser.java | 42 +++++ .../hadoop/tools/util/TestProducerConsumer.java | 109 ++++++++++++ 14 files changed, 728 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 02066b6..a6814f8 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -499,6 +499,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11819. HttpServerFunctionalTest#prepareTestWebapp should create web app directory if it does not exist. (Rohith via vinayakumarb) + HADOOP-11827. Speed-up distcp buildListing() using threadpool + (Zoran Dimitrijevic via raviprak) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index a1af2af..7ecb6ce 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -23,6 +23,9 @@ package org.apache.hadoop.tools; */ public class DistCpConstants { + /* Default number of threads to use for building file listing */ + public static final int DEFAULT_LISTSTATUS_THREADS = 1; + /* Default number of maps to use for DistCp */ public static final int DEFAULT_MAPS = 20; @@ -47,6 +50,7 @@ public class DistCpConstants { public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource"; + public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index e9c7d46..f90319d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -83,7 +83,14 @@ public enum DistCpOptionSwitch { SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF, new Option("mapredSslConf", true, "Configuration for ssl config file" + ", to use with hftps://")), - + /** + * Number of threads for building source file listing (before map-reduce + * phase, max one listStatus per thread at a time). + */ + NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, + new Option("numListstatusThreads", true, "Number of threads to " + + "use for building file listing (max " + + DistCpOptions.maxNumListstatusThreads + ").")), /** * Max number of maps to use during copy. DistCp will split work * as equally as possible among these maps http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 709e583..d8f3ff7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -44,6 +44,8 @@ public class DistCpOptions { private boolean blocking = true; private boolean useDiff = false; + public static final int maxNumListstatusThreads = 40; + private int numListstatusThreads = 0; // Indicates that flag is not set. private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; @@ -124,6 +126,7 @@ public class DistCpOptions { this.overwrite = that.overwrite; this.skipCRC = that.skipCRC; this.blocking = that.blocking; + this.numListstatusThreads = that.numListstatusThreads; this.maxMaps = that.maxMaps; this.mapBandwidth = that.mapBandwidth; this.sslConfigurationFile = that.getSslConfigurationFile(); @@ -312,6 +315,30 @@ public class DistCpOptions { this.skipCRC = skipCRC; } + /** Get the number of threads to use for listStatus + * + * @return Number of threads to do listStatus + */ + public int getNumListstatusThreads() { + return numListstatusThreads; + } + + /** Set the number of threads to use for listStatus. We allow max 40 + * threads. Setting numThreads to zero signify we should use the value + * from conf properties. + * + * @param numThreads - Number of threads + */ + public void setNumListstatusThreads(int numThreads) { + if (numThreads > maxNumListstatusThreads) { + this.numListstatusThreads = maxNumListstatusThreads; + } else if (numThreads > 0) { + this.numListstatusThreads = numThreads; + } else { + this.numListstatusThreads = 0; + } + } + /** Get the max number of maps to use for this copy * * @return Max number of maps http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index a3a76ef..1729479 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -185,6 +185,18 @@ public class OptionsParser { getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); } + if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { + try { + Integer numThreads = Integer.parseInt(getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim()); + option.setNumListstatusThreads(numThreads); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Number of liststatus threads is invalid: " + getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); + } + } + if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { try { Integer maps = Integer.parseInt( http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index e8a23aa..b9ba099 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -29,13 +29,17 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.util.DistCpUtils; +import org.apache.hadoop.tools.util.ProducerConsumer; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; import com.google.common.annotations.VisibleForTesting; import java.io.*; -import java.util.Stack; +import java.util.ArrayList; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -50,7 +54,10 @@ public class SimpleCopyListing extends CopyListing { private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class); private long totalPaths = 0; + private long totalDirs = 0; private long totalBytesToCopy = 0; + private int numListstatusThreads = 1; + private final int maxRetries = 3; /** * Protected constructor, to initialize configuration. @@ -61,6 +68,16 @@ public class SimpleCopyListing extends CopyListing { */ protected SimpleCopyListing(Configuration configuration, Credentials credentials) { super(configuration, credentials); + numListstatusThreads = getConf().getInt( + DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, + DistCpConstants.DEFAULT_LISTSTATUS_THREADS); + } + + @VisibleForTesting + protected SimpleCopyListing(Configuration configuration, Credentials credentials, + int numListstatusThreads) { + super(configuration, credentials); + this.numListstatusThreads = numListstatusThreads; } @Override @@ -160,6 +177,10 @@ public class SimpleCopyListing extends CopyListing { @VisibleForTesting public void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException { + if (options.getNumListstatusThreads() > 0) { + numListstatusThreads = options.getNumListstatusThreads(); + } + try { for (Path path: options.getSourcePaths()) { FileSystem sourceFS = path.getFileSystem(getConf()); @@ -181,6 +202,7 @@ public class SimpleCopyListing extends CopyListing { sourcePathRoot, options); } if (explore) { + ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>(); for (FileStatus sourceStatus: sourceFiles) { if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); @@ -195,15 +217,18 @@ public class SimpleCopyListing extends CopyListing { if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { - LOG.debug("Traversing source dir: " + sourceStatus.getPath()); + LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); } - traverseDirectory(fileListWriter, sourceFS, sourceStatus, - sourcePathRoot, options); + sourceDirs.add(sourceStatus); } } + traverseDirectory(fileListWriter, sourceFS, sourceDirs, + sourcePathRoot, options); } } fileListWriter.close(); + printStats(); + LOG.info("Build file listing completed."); fileListWriter = null; } finally { IOUtils.cleanup(LOG, fileListWriter); @@ -275,43 +300,135 @@ public class SimpleCopyListing extends CopyListing { SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); } - private static FileStatus[] getChildren(FileSystem fileSystem, - FileStatus parent) throws IOException { - return fileSystem.listStatus(parent.getPath()); + /* + * Private class to implement WorkRequestProcessor interface. It processes + * each directory (represented by FileStatus item) and returns a list of all + * file-system objects in that directory (files and directories). In case of + * retriable exceptions it increments retry counter and returns the same + * directory for later retry. + */ + private static class FileStatusProcessor + implements WorkRequestProcessor<FileStatus, FileStatus[]> { + private FileSystem fileSystem; + + public FileStatusProcessor(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + /* + * Processor for FileSystem.listStatus(). + * + * @param workRequest Input work item that contains FileStatus item which + * is a parent directory we want to list. + * @return Outputs WorkReport<FileStatus[]> with a list of objects in the + * directory (array of objects, empty if parent directory is + * empty). In case of intermittent exception we increment retry + * counter and return the list containing the parent directory). + */ + public WorkReport<FileStatus[]> processItem( + WorkRequest<FileStatus> workRequest) { + FileStatus parent = workRequest.getItem(); + int retry = workRequest.getRetry(); + WorkReport<FileStatus[]> result = null; + try { + if (retry > 0) { + int sleepSeconds = 2; + for (int i = 1; i < retry; i++) { + sleepSeconds *= 2; + } + try { + Thread.sleep(1000 * sleepSeconds); + } catch (InterruptedException ie) { + LOG.debug("Interrupted while sleeping in exponential backoff."); + } + } + result = new WorkReport<FileStatus[]>( + fileSystem.listStatus(parent.getPath()), 0, true); + } catch (FileNotFoundException fnf) { + LOG.error("FileNotFoundException exception in listStatus: " + + fnf.getMessage()); + result = new WorkReport<FileStatus[]>(new FileStatus[0], 0, true, fnf); + } catch (Exception e) { + LOG.error("Exception in listStatus. Will send for retry."); + FileStatus[] parentList = new FileStatus[1]; + parentList[0] = parent; + result = new WorkReport<FileStatus[]>(parentList, retry + 1, false, e); + } + return result; + } + } + + private void printStats() { + LOG.info("Paths (files+dirs) cnt = " + totalPaths + + "; dirCnt = " + totalDirs); + } + + private void maybePrintStats() { + if (totalPaths % 100000 == 0) { + printStats(); + } } private void traverseDirectory(SequenceFile.Writer fileListWriter, FileSystem sourceFS, - FileStatus sourceStatus, + ArrayList<FileStatus> sourceDirs, Path sourcePathRoot, DistCpOptions options) throws IOException { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); - Stack<FileStatus> pathStack = new Stack<FileStatus>(); - pathStack.push(sourceStatus); - while (!pathStack.isEmpty()) { - for (FileStatus child: getChildren(sourceFS, pathStack.pop())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recording source-path: " + child.getPath() + " for copy."); - } - CopyListingFileStatus childCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, child, - preserveAcls && child.isDirectory(), - preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory()); - writeToFileListing(fileListWriter, childCopyListingStatus, - sourcePathRoot, options); - if (child.isDirectory()) { + assert numListstatusThreads > 0; + LOG.debug("Starting thread pool of " + numListstatusThreads + + " listStatus workers."); + ProducerConsumer<FileStatus, FileStatus[]> workers = + new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads); + for (int i = 0; i < numListstatusThreads; i++) { + workers.addWorker( + new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()))); + } + + for (FileStatus status : sourceDirs) { + workers.put(new WorkRequest<FileStatus>(status, 0)); + maybePrintStats(); + } + + while (workers.hasWork()) { + try { + WorkReport<FileStatus[]> workResult = workers.take(); + int retry = workResult.getRetry(); + for (FileStatus child: workResult.getItem()) { if (LOG.isDebugEnabled()) { - LOG.debug("Traversing into source dir: " + child.getPath()); + LOG.debug("Recording source-path: " + child.getPath() + " for copy."); + } + if (retry == 0) { + CopyListingFileStatus childCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, child, + preserveAcls && child.isDirectory(), + preserveXAttrs && child.isDirectory(), + preserveRawXattrs && child.isDirectory()); + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot, options); + } + if (retry < maxRetries) { + if (child.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Traversing into source dir: " + child.getPath()); + } + workers.put(new WorkRequest<FileStatus>(child, retry)); + maybePrintStats(); + } + } else { + LOG.error("Giving up on " + child.getPath() + + " after " + retry + " retries."); } - pathStack.push(child); } + } catch (InterruptedException ie) { + LOG.error("Could not get item from childQueue. Retrying..."); } } + workers.shutdown(); } private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, @@ -351,6 +468,8 @@ public class SimpleCopyListing extends CopyListing { if (!fileStatus.isDirectory()) { totalBytesToCopy += fileStatus.getLen(); + } else { + totalDirs++; } totalPaths++; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java new file mode 100644 index 0000000..3dad4e3 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * ProducerConsumer class encapsulates input and output queues and a + * thread-pool of Workers that loop on WorkRequest<T> inputQueue and for each + * consumed WorkRequest Workers invoke WorkRequestProcessor.processItem() + * and output resulting WorkReport<R> to the outputQueue. + */ +public class ProducerConsumer<T, R> { + private Log LOG = LogFactory.getLog(ProducerConsumer.class); + private LinkedBlockingQueue<WorkRequest<T>> inputQueue; + private LinkedBlockingQueue<WorkReport<R>> outputQueue; + private ExecutorService executor; + private AtomicInteger workCnt; + + /** + * ProducerConsumer maintains input and output queues and a thread-pool of + * workers. + * + * @param numThreads Size of thread-pool to execute Workers. + */ + public ProducerConsumer(int numThreads) { + this.inputQueue = new LinkedBlockingQueue<WorkRequest<T>>(); + this.outputQueue = new LinkedBlockingQueue<WorkReport<R>>(); + executor = Executors.newFixedThreadPool(numThreads); + workCnt = new AtomicInteger(0); + } + + /** + * Add another worker that will consume WorkRequest<T> items from input + * queue, process each item using supplied processor, and for every + * processed item output WorkReport<R> to output queue. + * + * @param processor Processor implementing WorkRequestProcessor interface. + * + */ + public void addWorker(WorkRequestProcessor<T, R> processor) { + executor.execute(new Worker(processor)); + } + + /** + * Shutdown ProducerConsumer worker thread-pool without waiting for + * completion of any pending work. + */ + public void shutdown() { + executor.shutdown(); + } + + /** + * Returns number of pending ProducerConsumer items (submitted to input + * queue for processing via put() method but not yet consumed by take() + * or blockingTake(). + * + * @return Number of items in ProducerConsumer (either pending for + * processing or waiting to be consumed). + */ + public int getWorkCnt() { + return workCnt.get(); + } + + /** + * Returns true if there are items in ProducerConsumer that are either + * pending for processing or waiting to be consumed. + * + * @return True if there were more items put() to ProducerConsumer than + * consumed by take() or blockingTake(). + */ + public boolean hasWork() { + return workCnt.get() > 0; + } + + /** + * Blocking put workRequest to ProducerConsumer input queue. + * + * @param WorkRequest<T> item to be processed. + */ + public void put(WorkRequest<T> workRequest) { + boolean isDone = false; + while (!isDone) { + try { + inputQueue.put(workRequest); + workCnt.incrementAndGet(); + isDone = true; + } catch (InterruptedException ie) { + LOG.error("Could not put workRequest into inputQueue. Retrying..."); + } + } + } + + /** + * Blocking take from ProducerConsumer output queue that can be interrupted. + * + * @return WorkReport<R> item returned by processor's processItem(). + */ + public WorkReport<R> take() throws InterruptedException { + WorkReport<R> report = outputQueue.take(); + workCnt.decrementAndGet(); + return report; + } + + /** + * Blocking take from ProducerConsumer output queue (catches exceptions and + * retries forever). + * + * @return WorkReport<R> item returned by processor's processItem(). + */ + public WorkReport<R> blockingTake() { + while (true) { + try { + WorkReport<R> report = outputQueue.take(); + workCnt.decrementAndGet(); + return report; + } catch (InterruptedException ie) { + LOG.debug("Retrying in blockingTake..."); + } + } + } + + private class Worker implements Runnable { + private WorkRequestProcessor<T, R> processor; + + public Worker(WorkRequestProcessor<T, R> processor) { + this.processor = processor; + } + + public void run() { + while (true) { + try { + WorkRequest<T> work = inputQueue.take(); + WorkReport<R> result = processor.processItem(work); + + boolean isDone = false; + while (!isDone) { + try { + outputQueue.put(result); + isDone = true; + } catch (InterruptedException ie) { + LOG.debug("Could not put report into outputQueue. Retrying..."); + } + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while waiting for request from inputQueue."); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java new file mode 100644 index 0000000..91c9805 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +/** + * WorkReport<T> is a simple container for items of class T and its + * corresponding retry counter that indicates how many times this item + * was previously attempted to be processed. + */ +public class WorkReport<T> { + private T item; + private final boolean success; + private final int retry; + private final Exception exception; + + /** + * @param item Object representing work report. + * @param retry Number of unsuccessful attempts to process work. + * @param success Indicates whether work was successfully completed. + */ + public WorkReport(T item, int retry, boolean success) { + this(item, retry, success, null); + } + + /** + * @param item Object representing work report. + * @param retry Number of unsuccessful attempts to process work. + * @param success Indicates whether work was successfully completed. + * @param exception Exception thrown while processing work. + */ + public WorkReport(T item, int retry, boolean success, Exception exception) { + this.item = item; + this.retry = retry; + this.success = success; + this.exception = exception; + } + + public T getItem() { + return item; + } + + /** + * @return True if the work was processed successfully. + */ + public boolean getSuccess() { + return success; + } + + /** + * @return Number of unsuccessful attempts to process work. + */ + public int getRetry() { + return retry; + } + + /** + * @return Exception thrown while processing work. + */ + public Exception getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java new file mode 100644 index 0000000..339a3ab --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +/** + * WorkRequest<T> is a simple container for items of class T and its + * corresponding retry counter that indicates how many times this item + * was previously attempted to be processed. + */ +public class WorkRequest<T> { + private int retry; + private T item; + + public WorkRequest(T item) { + this(item, 0); + } + + /** + * @param item Object representing WorkRequest input data. + * @param retry Number of previous attempts to process this work request. + */ + public WorkRequest(T item, int retry) { + this.item = item; + this.retry = retry; + } + + public T getItem() { + return item; + } + + /** + * @return Number of previous attempts to process this work request. + */ + public int getRetry() { + return retry; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java new file mode 100644 index 0000000..b6d8a09 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; + +/** + * Interface for ProducerConsumer worker loop. + * + */ +public interface WorkRequestProcessor<T, R> { + + /** + * Work processor. + * + * @param workRequest Input work item. + * @return Outputs WorkReport after processing workRequest item. + * + */ + public WorkReport<R> processItem(WorkRequest<T> workRequest); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index d8f7e0b..8381c1b 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -32,6 +32,9 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.junit.Test; import org.junit.Assert; import org.junit.BeforeClass; @@ -40,9 +43,12 @@ import org.junit.AfterClass; import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.util.List; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +@RunWith(value = Parameterized.class) public class TestCopyListing extends SimpleCopyListing { private static final Log LOG = LogFactory.getLog(TestCopyListing.class); @@ -63,9 +69,15 @@ public class TestCopyListing extends SimpleCopyListing { cluster.shutdown(); } } - - public TestCopyListing() { - super(config, CREDENTIALS); + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 }, { 20} }; + return Arrays.asList(data); + } + + public TestCopyListing(int numListstatusThreads) { + super(config, CREDENTIALS, numListstatusThreads); } protected TestCopyListing(Configuration configuration) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 67d885a..5726342 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -30,14 +30,19 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +@RunWith(value = Parameterized.class) public class TestIntegration { private static final Log LOG = LogFactory.getLog(TestIntegration.class); @@ -46,6 +51,17 @@ public class TestIntegration { private static Path listFile; private static Path target; private static String root; + private int numListstatusThreads; + + public TestIntegration(int numListstatusThreads) { + this.numListstatusThreads = numListstatusThreads; + } + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 } }; + return Arrays.asList(data); + } private static Configuration getConf() { Configuration conf = new Configuration(); @@ -597,6 +613,7 @@ public class TestIntegration { options.setDeleteMissing(delete); options.setOverwrite(overwrite); options.setTargetPathExists(targetExists); + options.setNumListstatusThreads(numListstatusThreads); try { new DistCp(getConf(), options).execute(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index cc9da33..6eddfb2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -304,6 +304,48 @@ public class TestOptionsParser { } @Test + public void testParseNumListstatusThreads() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + // If command line argument isn't set, we expect .getNumListstatusThreads + // option to be zero (so that we know when to override conf properties). + Assert.assertEquals(0, options.getNumListstatusThreads()); + + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "12", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(12, options.getNumListstatusThreads()); + + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "0", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(0, options.getNumListstatusThreads()); + + try { + OptionsParser.parse(new String[] { + "--numListstatusThreads", + "hello", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Non numberic numListstatusThreads parsed"); + } catch (IllegalArgumentException ignore) { } + + // Ignore large number of threads. + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "100", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(DistCpOptions.maxNumListstatusThreads, + options.getNumListstatusThreads()); + } + + @Test public void testSourceListing() { DistCpOptions options = OptionsParser.parse(new String[] { "-f", http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfba3550/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java new file mode 100644 index 0000000..de0fcfd --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.hadoop.tools.util.ProducerConsumer; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.Exception; +import java.lang.Integer; + +public class TestProducerConsumer { + public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> { + public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) { + Integer item = new Integer(workRequest.getItem()); + return new WorkReport<Integer>(item, 0, true); + } + } + + public class ExceptionProcessor implements WorkRequestProcessor<Integer, Integer> { + @SuppressWarnings("null") + public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) { + try { + Integer item = null; + item.intValue(); // Throw NULL pointer exception. + + // We should never be here (null pointer exception above) + return new WorkReport<Integer>(item, 0, true); + } catch (Exception e) { + Integer item = new Integer(workRequest.getItem()); + return new WorkReport<Integer>(item, 1, false, e); + } + } + } + + @Test + public void testSimpleProducerConsumer() { + ProducerConsumer<Integer, Integer> worker = + new ProducerConsumer<Integer, Integer>(1); + worker.addWorker(new CopyProcessor()); + worker.put(new WorkRequest<Integer>(42)); + try { + WorkReport<Integer> report = worker.take(); + Assert.assertEquals(42, report.getItem().intValue()); + } catch (InterruptedException ie) { + Assert.assertTrue(false); + } + } + + @Test + public void testMultipleProducerConsumer() { + ProducerConsumer<Integer, Integer> workers = + new ProducerConsumer<Integer, Integer>(10); + for (int i = 0; i < 10; i++) { + workers.addWorker(new CopyProcessor()); + } + + int sum = 0; + int numRequests = 2000; + for (int i = 0; i < numRequests; i++) { + workers.put(new WorkRequest<Integer>(i + 42)); + sum += i + 42; + } + + int numReports = 0; + while (workers.getWorkCnt() > 0) { + WorkReport<Integer> report = workers.blockingTake(); + sum -= report.getItem().intValue(); + numReports++; + } + Assert.assertEquals(0, sum); + Assert.assertEquals(numRequests, numReports); + } + + @Test + public void testExceptionProducerConsumer() { + ProducerConsumer<Integer, Integer> worker = + new ProducerConsumer<Integer, Integer>(1); + worker.addWorker(new ExceptionProcessor()); + worker.put(new WorkRequest<Integer>(42)); + try { + WorkReport<Integer> report = worker.take(); + Assert.assertEquals(42, report.getItem().intValue()); + Assert.assertFalse(report.getSuccess()); + Assert.assertNotNull(report.getException()); + } catch (InterruptedException ie) { + Assert.assertTrue(false); + } + } +}