HADOOP-14267. Make DistCpOptions immutable. Contributed by Mingliang Liu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/26172a94 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/26172a94 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/26172a94 Branch: refs/heads/HDFS-7240 Commit: 26172a94d6431e70d7fe15d66be9a7e195f79f60 Parents: 73835c7 Author: Mingliang Liu <[email protected]> Authored: Thu Jun 23 00:21:49 2016 -0700 Committer: Mingliang Liu <[email protected]> Committed: Fri Mar 31 20:04:26 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/tools/CopyListing.java | 38 +- .../java/org/apache/hadoop/tools/DistCp.java | 106 +-- .../org/apache/hadoop/tools/DistCpContext.java | 198 ++++ .../apache/hadoop/tools/DistCpOptionSwitch.java | 2 +- .../org/apache/hadoop/tools/DistCpOptions.java | 925 +++++++++---------- .../org/apache/hadoop/tools/DistCpSync.java | 42 +- .../hadoop/tools/FileBasedCopyListing.java | 12 +- .../apache/hadoop/tools/GlobbedCopyListing.java | 17 +- .../org/apache/hadoop/tools/OptionsParser.java | 299 ++---- .../apache/hadoop/tools/SimpleCopyListing.java | 115 +-- .../hadoop/tools/mapred/CopyCommitter.java | 15 +- .../apache/hadoop/tools/util/DistCpUtils.java | 8 +- .../apache/hadoop/tools/TestCopyListing.java | 51 +- .../apache/hadoop/tools/TestDistCpOptions.java | 500 ++++++++++ .../org/apache/hadoop/tools/TestDistCpSync.java | 68 +- .../hadoop/tools/TestDistCpSyncReverseBase.java | 44 +- .../apache/hadoop/tools/TestDistCpViewFs.java | 10 +- .../hadoop/tools/TestFileBasedCopyListing.java | 9 +- .../hadoop/tools/TestGlobbedCopyListing.java | 11 +- .../apache/hadoop/tools/TestIntegration.java | 20 +- .../apache/hadoop/tools/TestOptionsParser.java | 81 +- .../contract/AbstractContractDistCpTest.java | 6 +- .../hadoop/tools/mapred/TestCopyCommitter.java | 34 +- .../mapred/TestUniformSizeInputFormat.java | 15 +- .../mapred/lib/TestDynamicInputFormat.java | 17 +- 25 files changed, 1574 insertions(+), 1069 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 9ebf9d2..908b558 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -77,41 +77,41 @@ public abstract class CopyListing extends Configured { * TARGET IS DIR : Key-"/file1", Value-FileStatus(/tmp/file1) * * @param pathToListFile - Output file where the listing would be stored - * @param options - Input options to distcp + * @param distCpContext - distcp context associated with input options * @throws IOException - Exception if any */ public final void buildListing(Path pathToListFile, - DistCpOptions options) throws IOException { - validatePaths(options); - doBuildListing(pathToListFile, options); + DistCpContext distCpContext) throws IOException { + validatePaths(distCpContext); + doBuildListing(pathToListFile, distCpContext); Configuration config = getConf(); config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString()); config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy()); config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths()); - validateFinalListing(pathToListFile, options); + validateFinalListing(pathToListFile, distCpContext); LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths()); } /** * Validate input and output paths * - * @param options - Input options + * @param distCpContext - Distcp context * @throws InvalidInputException If inputs are invalid * @throws IOException any Exception with FS */ - protected abstract void validatePaths(DistCpOptions options) + protected abstract void validatePaths(DistCpContext distCpContext) throws IOException, InvalidInputException; /** * The interface to be implemented by sub-classes, to create the source/target file listing. * @param pathToListFile Path on HDFS where the listing file is written. - * @param options Input Options for DistCp (indicating source/target paths.) + * @param distCpContext - Distcp context * @throws IOException Thrown on failure to create the listing file. */ protected abstract void doBuildListing(Path pathToListFile, - DistCpOptions options) throws IOException; + DistCpContext distCpContext) throws IOException; /** * Return the total bytes that distCp should copy for the source paths @@ -135,17 +135,17 @@ public abstract class CopyListing extends Configured { * If preserving XAttrs, checks that file system can support XAttrs. * * @param pathToListFile - path listing build by doBuildListing - * @param options - Input options to distcp + * @param context - Distcp context with associated input options * @throws IOException - Any issues while checking for duplicates and throws * @throws DuplicateFileException - if there are duplicates */ - private void validateFinalListing(Path pathToListFile, DistCpOptions options) + private void validateFinalListing(Path pathToListFile, DistCpContext context) throws DuplicateFileException, IOException { Configuration config = getConf(); FileSystem fs = pathToListFile.getFileSystem(config); - final boolean splitLargeFile = options.splitLargeFile(); + final boolean splitLargeFile = context.splitLargeFile(); // When splitLargeFile is enabled, we don't randomize the copylist // earlier, so we don't do the sorting here. For a file that has @@ -188,7 +188,7 @@ public abstract class CopyListing extends Configured { } } reader.getCurrentValue(lastFileStatus); - if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { + if (context.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config); URI lastFsUri = lastFs.getUri(); if (!aclSupportCheckFsSet.contains(lastFsUri)) { @@ -196,7 +196,7 @@ public abstract class CopyListing extends Configured { aclSupportCheckFsSet.add(lastFsUri); } } - if (options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { + if (context.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config); URI lastFsUri = lastFs.getUri(); if (!xAttrSupportCheckFsSet.contains(lastFsUri)) { @@ -210,7 +210,7 @@ public abstract class CopyListing extends Configured { lastChunkOffset = lastFileStatus.getChunkOffset(); lastChunkLength = lastFileStatus.getChunkLength(); } - if (options.shouldUseDiff() && LOG.isDebugEnabled()) { + if (context.shouldUseDiff() && LOG.isDebugEnabled()) { LOG.debug("Copy list entry " + idx + ": " + lastFileStatus.getPath().toUri().getPath()); idx++; @@ -253,14 +253,12 @@ public abstract class CopyListing extends Configured { * Public Factory method with which the appropriate CopyListing implementation may be retrieved. * @param configuration The input configuration. * @param credentials Credentials object on which the FS delegation tokens are cached - * @param options The input Options, to help choose the appropriate CopyListing Implementation. + * @param context Distcp context with associated input options * @return An instance of the appropriate CopyListing implementation. * @throws java.io.IOException - Exception if any */ public static CopyListing getCopyListing(Configuration configuration, - Credentials credentials, - DistCpOptions options) - throws IOException { + Credentials credentials, DistCpContext context) throws IOException { String copyListingClassName = configuration.get(DistCpConstants. CONF_LABEL_COPY_LISTING_CLASS, ""); Class<? extends CopyListing> copyListingClass; @@ -270,7 +268,7 @@ public abstract class CopyListing extends Configured { CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class, CopyListing.class); } else { - if (options.getSourceFileListing() == null) { + if (context.getSourceFileListing() == null) { copyListingClass = GlobbedCopyListing.class; } else { copyListingClass = FileBasedCopyListing.class; http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 8c2fa24..df9c328 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -21,6 +21,7 @@ package org.apache.hadoop.tools; import java.io.IOException; import java.util.Random; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyOutputFormat; @@ -66,7 +66,9 @@ public class DistCp extends Configured implements Tool { static final Log LOG = LogFactory.getLog(DistCp.class); - private DistCpOptions inputOptions; + @VisibleForTesting + DistCpContext context; + private Path metaFolder; private static final String PREFIX = "_distcp"; @@ -79,15 +81,14 @@ public class DistCp extends Configured implements Tool { private FileSystem jobFS; private void prepareFileListing(Job job) throws Exception { - if (inputOptions.shouldUseSnapshotDiff()) { + if (context.shouldUseSnapshotDiff()) { // When "-diff" or "-rdiff" is passed, do sync() first, then // create copyListing based on snapshot diff. - DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); + DistCpSync distCpSync = new DistCpSync(context, getConf()); if (distCpSync.sync()) { createInputFileListingWithDiff(job, distCpSync); } else { - throw new Exception("DistCp sync failed, input options: " - + inputOptions); + throw new Exception("DistCp sync failed, input options: " + context); } } else { // When no "-diff" or "-rdiff" is passed, create copyListing @@ -99,16 +100,19 @@ public class DistCp extends Configured implements Tool { /** * Public Constructor. Creates DistCp object with specified input-parameters. * (E.g. source-paths, target-location, etc.) - * @param inputOptions Options (indicating source-paths, target-location.) - * @param configuration The Hadoop configuration against which the Copy-mapper must run. + * @param configuration configuration against which the Copy-mapper must run + * @param inputOptions Immutable options * @throws Exception */ - public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception { + public DistCp(Configuration configuration, DistCpOptions inputOptions) + throws Exception { Configuration config = new Configuration(configuration); config.addResource(DISTCP_DEFAULT_XML); config.addResource(DISTCP_SITE_XML); setConf(config); - this.inputOptions = inputOptions; + if (inputOptions != null) { + this.context = new DistCpContext(inputOptions); + } this.metaFolder = createMetaFolderPath(); } @@ -134,10 +138,10 @@ public class DistCp extends Configured implements Tool { } try { - inputOptions = (OptionsParser.parse(argv)); - setOptionsForSplitLargeFile(); + context = new DistCpContext(OptionsParser.parse(argv)); + checkSplitLargeFile(); setTargetPathExists(); - LOG.info("Input Options: " + inputOptions); + LOG.info("Input Options: " + context); } catch (Throwable e) { LOG.error("Invalid arguments: ", e); System.err.println("Invalid arguments: " + e.getMessage()); @@ -173,9 +177,11 @@ public class DistCp extends Configured implements Tool { * @throws Exception */ public Job execute() throws Exception { + Preconditions.checkState(context != null, + "The DistCpContext should have been created before running DistCp!"); Job job = createAndSubmitJob(); - if (inputOptions.shouldBlock()) { + if (context.shouldBlock()) { waitForJobCompletion(job); } return job; @@ -186,7 +192,7 @@ public class DistCp extends Configured implements Tool { * @return The mapreduce job object that has been submitted */ public Job createAndSubmitJob() throws Exception { - assert inputOptions != null; + assert context != null; assert getConf() != null; Job job = null; try { @@ -230,53 +236,36 @@ public class DistCp extends Configured implements Tool { * for the benefit of CopyCommitter */ private void setTargetPathExists() throws IOException { - Path target = inputOptions.getTargetPath(); + Path target = context.getTargetPath(); FileSystem targetFS = target.getFileSystem(getConf()); boolean targetExists = targetFS.exists(target); - inputOptions.setTargetPathExists(targetExists); + context.setTargetPathExists(targetExists); getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); } /** - * Check if concat is supported by fs. - * Throws UnsupportedOperationException if not. + * Check splitting large files is supported and populate configs. */ - private void checkConcatSupport(FileSystem fs) { + private void checkSplitLargeFile() throws IOException { + if (!context.splitLargeFile()) { + return; + } + + final Path target = context.getTargetPath(); + final FileSystem targetFS = target.getFileSystem(getConf()); try { Path[] src = null; Path tgt = null; - fs.concat(tgt, src); + targetFS.concat(tgt, src); } catch (UnsupportedOperationException use) { throw new UnsupportedOperationException( DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + - " is not supported since the target file system doesn't" + - " support concat.", use); + " is not supported since the target file system doesn't" + + " support concat.", use); } catch (Exception e) { // Ignore other exception } - } - - /** - * Set up needed options for splitting large files. - */ - private void setOptionsForSplitLargeFile() throws IOException { - if (!inputOptions.splitLargeFile()) { - return; - } - Path target = inputOptions.getTargetPath(); - FileSystem targetFS = target.getFileSystem(getConf()); - checkConcatSupport(targetFS); - - LOG.info("Enabling preserving blocksize since " - + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed."); - inputOptions.preserve(FileAttribute.BLOCKSIZE); - - LOG.info("Set " + - DistCpOptionSwitch.APPEND.getSwitch() - + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() - + " is passed."); - inputOptions.setAppend(false); LOG.info("Set " + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES @@ -286,7 +275,6 @@ public class DistCp extends Configured implements Tool { DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); } - /** * Create Job object for submitting it, with all the configuration * @@ -300,7 +288,7 @@ public class DistCp extends Configured implements Tool { jobName += ": " + userChosenName; Job job = Job.getInstance(getConf()); job.setJobName(jobName); - job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); + job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context)); job.setJarByClass(CopyMapper.class); configureOutputFormat(job); @@ -311,9 +299,9 @@ public class DistCp extends Configured implements Tool { job.setOutputFormatClass(CopyOutputFormat.class); job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); job.getConfiguration().set(JobContext.NUM_MAPS, - String.valueOf(inputOptions.getMaxMaps())); + String.valueOf(context.getMaxMaps())); - inputOptions.appendToConf(job.getConfiguration()); + context.appendToConf(job.getConfiguration()); return job; } @@ -325,18 +313,20 @@ public class DistCp extends Configured implements Tool { */ private void configureOutputFormat(Job job) throws IOException { final Configuration configuration = job.getConfiguration(); - Path targetPath = inputOptions.getTargetPath(); + Path targetPath = context.getTargetPath(); FileSystem targetFS = targetPath.getFileSystem(configuration); targetPath = targetPath.makeQualified(targetFS.getUri(), targetFS.getWorkingDirectory()); - if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { + if (context.shouldPreserve( + DistCpOptions.FileAttribute.ACL)) { DistCpUtils.checkFileSystemAclSupport(targetFS); } - if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { + if (context.shouldPreserve( + DistCpOptions.FileAttribute.XATTR)) { DistCpUtils.checkFileSystemXAttrSupport(targetFS); } - if (inputOptions.shouldAtomicCommit()) { - Path workDir = inputOptions.getAtomicWorkPath(); + if (context.shouldAtomicCommit()) { + Path workDir = context.getAtomicWorkPath(); if (workDir == null) { workDir = targetPath.getParent(); } @@ -353,7 +343,7 @@ public class DistCp extends Configured implements Tool { } CopyOutputFormat.setCommitDirectory(job, targetPath); - Path logPath = inputOptions.getLogPath(); + Path logPath = context.getLogPath(); if (logPath == null) { logPath = new Path(metaFolder, "_logs"); } else { @@ -374,8 +364,8 @@ public class DistCp extends Configured implements Tool { protected Path createInputFileListing(Job job) throws IOException { Path fileListingPath = getFileListingPath(); CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), - job.getCredentials(), inputOptions); - copyListing.buildListing(fileListingPath, inputOptions); + job.getCredentials(), context); + copyListing.buildListing(fileListingPath, context); return fileListingPath; } @@ -391,7 +381,7 @@ public class DistCp extends Configured implements Tool { Path fileListingPath = getFileListingPath(); CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(), job.getCredentials(), distCpSync); - copyListing.buildListing(fileListingPath, inputOptions); + copyListing.buildListing(fileListingPath, context); return fileListingPath; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java new file mode 100644 index 0000000..c34005e --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; + +import java.util.List; +import java.util.Set; + +/** + * This is the context of the distcp at runtime. + * + * It has the immutable {@link DistCpOptions} and mutable runtime status. + */ [email protected] [email protected] +public class DistCpContext { + private final DistCpOptions options; + + /** The source paths can be set at runtime via snapshots. */ + private List<Path> sourcePaths; + + /** This is a derived field, it's initialized in the beginning of distcp. */ + private boolean targetPathExists = true; + + /** Indicate that raw.* xattrs should be preserved if true. */ + private boolean preserveRawXattrs = false; + + public DistCpContext(DistCpOptions options) { + this.options = options; + this.sourcePaths = options.getSourcePaths(); + } + + public void setSourcePaths(List<Path> sourcePaths) { + this.sourcePaths = sourcePaths; + } + + /** + * @return the sourcePaths. Please note this method does not directly delegate + * to the {@link #options}. + */ + public List<Path> getSourcePaths() { + return sourcePaths; + } + + public Path getSourceFileListing() { + return options.getSourceFileListing(); + } + + public Path getTargetPath() { + return options.getTargetPath(); + } + + public boolean shouldAtomicCommit() { + return options.shouldAtomicCommit(); + } + + public boolean shouldSyncFolder() { + return options.shouldSyncFolder(); + } + + public boolean shouldDeleteMissing() { + return options.shouldDeleteMissing(); + } + + public boolean shouldIgnoreFailures() { + return options.shouldIgnoreFailures(); + } + + public boolean shouldOverwrite() { + return options.shouldOverwrite(); + } + + public boolean shouldAppend() { + return options.shouldAppend(); + } + + public boolean shouldSkipCRC() { + return options.shouldSkipCRC(); + } + + public boolean shouldBlock() { + return options.shouldBlock(); + } + + public boolean shouldUseDiff() { + return options.shouldUseDiff(); + } + + public boolean shouldUseRdiff() { + return options.shouldUseRdiff(); + } + + public boolean shouldUseSnapshotDiff() { + return options.shouldUseSnapshotDiff(); + } + + public String getFromSnapshot() { + return options.getFromSnapshot(); + } + + public String getToSnapshot() { + return options.getToSnapshot(); + } + + public final String getFiltersFile() { + return options.getFiltersFile(); + } + + public int getNumListstatusThreads() { + return options.getNumListstatusThreads(); + } + + public int getMaxMaps() { + return options.getMaxMaps(); + } + + public float getMapBandwidth() { + return options.getMapBandwidth(); + } + + public Set<FileAttribute> getPreserveAttributes() { + return options.getPreserveAttributes(); + } + + public boolean shouldPreserve(FileAttribute attribute) { + return options.shouldPreserve(attribute); + } + + public boolean shouldPreserveRawXattrs() { + return preserveRawXattrs; + } + + public void setPreserveRawXattrs(boolean preserveRawXattrs) { + this.preserveRawXattrs = preserveRawXattrs; + } + + public Path getAtomicWorkPath() { + return options.getAtomicWorkPath(); + } + + public Path getLogPath() { + return options.getLogPath(); + } + + public String getCopyStrategy() { + return options.getCopyStrategy(); + } + + public int getBlocksPerChunk() { + return options.getBlocksPerChunk(); + } + + public final boolean splitLargeFile() { + return options.getBlocksPerChunk() > 0; + } + + public void setTargetPathExists(boolean targetPathExists) { + this.targetPathExists = targetPathExists; + } + + public boolean isTargetPathExists() { + return targetPathExists; + } + + public void appendToConf(Configuration conf) { + options.appendToConf(conf); + } + + @Override + public String toString() { + return options.toString() + + ", sourcePaths=" + sourcePaths + + ", targetPathExists=" + targetPathExists + + ", preserveRawXattrs" + preserveRawXattrs; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ced9b54..81abb7d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -81,7 +81,7 @@ public enum DistCpOptionSwitch { NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, new Option("numListstatusThreads", true, "Number of threads to " + "use for building file listing (max " + - DistCpOptions.maxNumListstatusThreads + ").")), + DistCpOptions.MAX_NUM_LISTSTATUS_THREADS + ").")), /** * Max number of maps to use during copy. DistCp will split work * as equally as possible among these maps http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 9822d83..97ae0c4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -18,43 +18,88 @@ package org.apache.hadoop.tools; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.util.DistCpUtils; +import java.util.Collections; import java.util.EnumSet; -import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; /** * The Options class encapsulates all DistCp options. - * These may be set from command-line (via the OptionsParser) - * or may be set manually. + * + * When you add a new option, please: + * - Add the field along with javadoc in DistCpOptions and its Builder + * - Add setter method in the {@link Builder} class + * + * This class is immutable. */ -public class DistCpOptions { - - private boolean atomicCommit = false; - private boolean syncFolder = false; - private boolean deleteMissing = false; - private boolean ignoreFailures = false; - private boolean overwrite = false; - private boolean append = false; - private boolean skipCRC = false; - private boolean blocking = true; +public final class DistCpOptions { + private static final Logger LOG = LoggerFactory.getLogger(Builder.class); + public static final int MAX_NUM_LISTSTATUS_THREADS = 40; + + /** File path (hdfs:// or file://) that contains the list of actual files to + * copy. + */ + private final Path sourceFileListing; + + /** List of source-paths (including wildcards) to be copied to target. */ + private final List<Path> sourcePaths; + + /** Destination path for the dist-copy. */ + private final Path targetPath; + + /** Whether data need to be committed automatically. */ + private final boolean atomicCommit; + + /** the work path for atomic commit. If null, the work + * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath). */ + private final Path atomicWorkPath; + + /** Whether source and target folder contents be sync'ed up. */ + private final boolean syncFolder; + + /** Whether files only present in target should be deleted. */ + private boolean deleteMissing; + + /** Whether failures during copy be ignored. */ + private final boolean ignoreFailures; + + /** Whether files should always be overwritten on target. */ + private final boolean overwrite; + + /** Whether we want to append new data to target files. This is valid only + * with update option and CRC is not skipped. */ + private final boolean append; + + /** Whether checksum comparison should be skipped while determining if source + * and destination files are identical. */ + private final boolean skipCRC; + + /** Whether to run blocking or non-blocking. */ + private final boolean blocking; + // When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1 // to s2) of source cluster to the target cluster to sync target cluster with // the source cluster. Referred to as "Fdiff" in the code. // It's required that s2 is newer than s1. - private boolean useDiff = false; + private final boolean useDiff; // When "-rdiff s2 s1 src tgt" is passed, apply reversed snapshot diff (from // s2 to s1) of target cluster to the target cluster, so to make target // cluster go back to s1. Referred to as "Rdiff" in the code. // It's required that s2 is newer than s1, and src and tgt have exact same // content at their s1, if src is not the same as tgt. - private boolean useRdiff = false; + private final boolean useRdiff; // For both -diff and -rdiff, given the example command line switches, two // steps are taken: @@ -66,44 +111,53 @@ public class DistCpOptions { // could be the tgt itself (HDFS-9820). // - public static final int maxNumListstatusThreads = 40; - private int numListstatusThreads = 0; // Indicates that flag is not set. - private int maxMaps = DistCpConstants.DEFAULT_MAPS; - private float mapBandwidth = 0; // Indicates that we should use the default. - - private String copyStrategy = DistCpConstants.UNIFORMSIZE; + private final String fromSnapshot; + private final String toSnapshot; - private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class); + /** The path to a file containing a list of paths to filter out of copy. */ + private final String filtersFile; - private boolean preserveRawXattrs; + /** Path where output logs are stored. If not specified, it will use the + * default value JobStagingDir/_logs and delete upon job completion. */ + private final Path logPath; - private Path atomicWorkPath; + /** Set the copy strategy to use. Should map to a strategy implementation + * in distp-default.xml. */ + private final String copyStrategy; - private Path logPath; + /** per map bandwidth in MB. */ + private final float mapBandwidth; - private Path sourceFileListing; - private List<Path> sourcePaths; + /** The number of threads to use for listStatus. We allow max + * {@link #MAX_NUM_LISTSTATUS_THREADS} threads. Setting numThreads to zero + * signify we should use the value from conf properties. */ + private final int numListstatusThreads; - private String fromSnapshot; - private String toSnapshot; + /** The max number of maps to use for copy. */ + private final int maxMaps; - private Path targetPath; - - /** - * The path to a file containing a list of paths to filter out of the copy. - */ - private String filtersFile; - - // targetPathExist is a derived field, it's initialized in the - // beginning of distcp. - private boolean targetPathExists = true; + /** File attributes that need to be preserved. */ + private final EnumSet<FileAttribute> preserveStatus; // Size of chunk in number of blocks when splitting large file into chunks // to copy in parallel. Default is 0 and file are not splitted. - private int blocksPerChunk = 0; + private final int blocksPerChunk; - public static enum FileAttribute{ - REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES; + /** + * File attributes for preserve. + * + * Each enum entry uses the first char as its symbol. + */ + public enum FileAttribute { + REPLICATION, // R + BLOCKSIZE, // B + USER, // U + GROUP, // G + PERMISSION, // P + CHECKSUMTYPE, // C + ACL, // A + XATTR, // X + TIMES; // T public static FileAttribute getAttribute(char symbol) { for (FileAttribute attribute : values()) { @@ -115,187 +169,86 @@ public class DistCpOptions { } } - /** - * Constructor, to initialize source/target paths. - * @param sourcePaths List of source-paths (including wildcards) - * to be copied to target. - * @param targetPath Destination path for the dist-copy. - */ - public DistCpOptions(List<Path> sourcePaths, Path targetPath) { - assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths"; - assert targetPath != null : "Invalid Target path"; + private DistCpOptions(Builder builder) { + this.sourceFileListing = builder.sourceFileListing; + this.sourcePaths = builder.sourcePaths; + this.targetPath = builder.targetPath; + + this.atomicCommit = builder.atomicCommit; + this.atomicWorkPath = builder.atomicWorkPath; + this.syncFolder = builder.syncFolder; + this.deleteMissing = builder.deleteMissing; + this.ignoreFailures = builder.ignoreFailures; + this.overwrite = builder.overwrite; + this.append = builder.append; + this.skipCRC = builder.skipCRC; + this.blocking = builder.blocking; + + this.useDiff = builder.useDiff; + this.useRdiff = builder.useRdiff; + this.fromSnapshot = builder.fromSnapshot; + this.toSnapshot = builder.toSnapshot; - this.sourcePaths = sourcePaths; - this.targetPath = targetPath; + this.filtersFile = builder.filtersFile; + this.logPath = builder.logPath; + this.copyStrategy = builder.copyStrategy; + + this.mapBandwidth = builder.mapBandwidth; + this.numListstatusThreads = builder.numListstatusThreads; + this.maxMaps = builder.maxMaps; + + this.preserveStatus = builder.preserveStatus; + + this.blocksPerChunk = builder.blocksPerChunk; } - /** - * Constructor, to initialize source/target paths. - * @param sourceFileListing File containing list of source paths - * @param targetPath Destination path for the dist-copy. - */ - public DistCpOptions(Path sourceFileListing, Path targetPath) { - assert sourceFileListing != null : "Invalid source paths"; - assert targetPath != null : "Invalid Target path"; + public Path getSourceFileListing() { + return sourceFileListing; + } - this.sourceFileListing = sourceFileListing; - this.targetPath = targetPath; + public List<Path> getSourcePaths() { + return sourcePaths == null ? + null : Collections.unmodifiableList(sourcePaths); } - /** - * Copy constructor. - * @param that DistCpOptions being copied from. - */ - public DistCpOptions(DistCpOptions that) { - if (this != that && that != null) { - this.atomicCommit = that.atomicCommit; - this.syncFolder = that.syncFolder; - this.deleteMissing = that.deleteMissing; - this.ignoreFailures = that.ignoreFailures; - this.overwrite = that.overwrite; - this.skipCRC = that.skipCRC; - this.blocking = that.blocking; - this.useDiff = that.useDiff; - this.useRdiff = that.useRdiff; - this.numListstatusThreads = that.numListstatusThreads; - this.maxMaps = that.maxMaps; - this.mapBandwidth = that.mapBandwidth; - this.copyStrategy = that.copyStrategy; - this.preserveStatus = that.preserveStatus; - this.preserveRawXattrs = that.preserveRawXattrs; - this.atomicWorkPath = that.getAtomicWorkPath(); - this.logPath = that.getLogPath(); - this.sourceFileListing = that.getSourceFileListing(); - this.sourcePaths = that.getSourcePaths(); - this.targetPath = that.getTargetPath(); - this.targetPathExists = that.getTargetPathExists(); - this.filtersFile = that.getFiltersFile(); - this.blocksPerChunk = that.blocksPerChunk; - } + public Path getTargetPath() { + return targetPath; } - /** - * Should the data be committed atomically? - * - * @return true if data should be committed automically. false otherwise - */ public boolean shouldAtomicCommit() { return atomicCommit; } - /** - * Set if data need to be committed automatically - * - * @param atomicCommit - boolean switch - */ - public void setAtomicCommit(boolean atomicCommit) { - this.atomicCommit = atomicCommit; + public Path getAtomicWorkPath() { + return atomicWorkPath; } - /** - * Should the data be sync'ed between source and target paths? - * - * @return true if data should be sync'ed up. false otherwise - */ public boolean shouldSyncFolder() { return syncFolder; } - /** - * Set if source and target folder contents be sync'ed up - * - * @param syncFolder - boolean switch - */ - public void setSyncFolder(boolean syncFolder) { - this.syncFolder = syncFolder; - } - - /** - * Should target files missing in source should be deleted? - * - * @return true if zoombie target files to be removed. false otherwise - */ public boolean shouldDeleteMissing() { return deleteMissing; } - /** - * Set if files only present in target should be deleted - * - * @param deleteMissing - boolean switch - */ - public void setDeleteMissing(boolean deleteMissing) { - this.deleteMissing = deleteMissing; - } - - /** - * Should failures be logged and ignored during copy? - * - * @return true if failures are to be logged and ignored. false otherwise - */ public boolean shouldIgnoreFailures() { return ignoreFailures; } - /** - * Set if failures during copy be ignored - * - * @param ignoreFailures - boolean switch - */ - public void setIgnoreFailures(boolean ignoreFailures) { - this.ignoreFailures = ignoreFailures; - } - - /** - * Should DistCp be running in blocking mode - * - * @return true if should run in blocking, false otherwise - */ - public boolean shouldBlock() { - return blocking; - } - - /** - * Set if Disctp should run blocking or non-blocking - * - * @param blocking - boolean switch - */ - public void setBlocking(boolean blocking) { - this.blocking = blocking; - } - - /** - * Should files be overwritten always? - * - * @return true if files in target that may exist before distcp, should always - * be overwritten. false otherwise - */ public boolean shouldOverwrite() { return overwrite; } - /** - * Set if files should always be overwritten on target - * - * @param overwrite - boolean switch - */ - public void setOverwrite(boolean overwrite) { - this.overwrite = overwrite; - } - - /** - * @return whether we can append new data to target files - */ public boolean shouldAppend() { return append; } - /** - * Set if we want to append new data to target files. This is valid only with - * update option and CRC is not skipped. - */ - public void setAppend(boolean append) { - this.append = append; + public boolean shouldSkipCRC() { + return skipCRC; + } + + public boolean shouldBlock() { + return blocking; } public boolean shouldUseDiff() { @@ -318,104 +271,34 @@ public class DistCpOptions { return this.toSnapshot; } - public void setUseDiff(String fromSS, String toSS) { - this.useDiff = true; - this.fromSnapshot = fromSS; - this.toSnapshot = toSS; - } - - public void setUseRdiff(String fromSS, String toSS) { - this.useRdiff = true; - this.fromSnapshot = fromSS; - this.toSnapshot = toSS; + public String getFiltersFile() { + return filtersFile; } - /** - * Should CRC/checksum check be skipped while checking files are identical - * - * @return true if checksum check should be skipped while checking files are - * identical. false otherwise - */ - public boolean shouldSkipCRC() { - return skipCRC; + public Path getLogPath() { + return logPath; } - /** - * Set if checksum comparison should be skipped while determining if - * source and destination files are identical - * - * @param skipCRC - boolean switch - */ - public void setSkipCRC(boolean skipCRC) { - this.skipCRC = skipCRC; + public String getCopyStrategy() { + return copyStrategy; } - /** Get the number of threads to use for listStatus - * - * @return Number of threads to do listStatus - */ public int getNumListstatusThreads() { return numListstatusThreads; } - /** Set the number of threads to use for listStatus. We allow max 40 - * threads. Setting numThreads to zero signify we should use the value - * from conf properties. - * - * @param numThreads - Number of threads - */ - public void setNumListstatusThreads(int numThreads) { - if (numThreads > maxNumListstatusThreads) { - this.numListstatusThreads = maxNumListstatusThreads; - } else if (numThreads > 0) { - this.numListstatusThreads = numThreads; - } else { - this.numListstatusThreads = 0; - } - } - - /** Get the max number of maps to use for this copy - * - * @return Max number of maps - */ public int getMaxMaps() { return maxMaps; } - /** - * Set the max number of maps to use for copy - * - * @param maxMaps - Number of maps - */ - public void setMaxMaps(int maxMaps) { - this.maxMaps = Math.max(maxMaps, 1); - } - - /** Get the map bandwidth in MB - * - * @return Bandwidth in MB - */ public float getMapBandwidth() { return mapBandwidth; } - /** - * Set per map bandwidth - * - * @param mapBandwidth - per map bandwidth - */ - public void setMapBandwidth(float mapBandwidth) { - assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)"; - this.mapBandwidth = mapBandwidth; - } - - /** - * Returns an iterator with the list of file attributes to preserve - * - * @return iterator of file attributes to preserve - */ - public Iterator<FileAttribute> preserveAttributes() { - return preserveStatus.iterator(); + public Set<FileAttribute> getPreserveAttributes() { + return (preserveStatus == null) + ? null + : Collections.unmodifiableSet(preserveStatus); } /** @@ -428,230 +311,10 @@ public class DistCpOptions { return preserveStatus.contains(attribute); } - /** - * Add file attributes that need to be preserved. This method may be - * called multiple times to add attributes. - * - * @param fileAttribute - Attribute to add, one at a time - */ - public void preserve(FileAttribute fileAttribute) { - for (FileAttribute attribute : preserveStatus) { - if (attribute.equals(fileAttribute)) { - return; - } - } - preserveStatus.add(fileAttribute); - } - - /** - * Return true if raw.* xattrs should be preserved. - * @return true if raw.* xattrs should be preserved. - */ - public boolean shouldPreserveRawXattrs() { - return preserveRawXattrs; - } - - /** - * Indicate that raw.* xattrs should be preserved - */ - public void preserveRawXattrs() { - preserveRawXattrs = true; - } - - /** Get work path for atomic commit. If null, the work - * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath) - * - * @return Atomic work path on the target cluster. Null if not set - */ - public Path getAtomicWorkPath() { - return atomicWorkPath; - } - - /** - * Set the work path for atomic commit - * - * @param atomicWorkPath - Path on the target cluster - */ - public void setAtomicWorkPath(Path atomicWorkPath) { - this.atomicWorkPath = atomicWorkPath; - } - - /** Get output directory for writing distcp logs. Otherwise logs - * are temporarily written to JobStagingDir/_logs and deleted - * upon job completion - * - * @return Log output path on the cluster where distcp job is run - */ - public Path getLogPath() { - return logPath; - } - - /** - * Set the log path where distcp output logs are stored - * Uses JobStagingDir/_logs by default - * - * @param logPath - Path where logs will be saved - */ - public void setLogPath(Path logPath) { - this.logPath = logPath; - } - - /** - * Get the copy strategy to use. Uses appropriate input format - * - * @return copy strategy to use - */ - public String getCopyStrategy() { - return copyStrategy; - } - - /** - * Set the copy strategy to use. Should map to a strategy implementation - * in distp-default.xml - * - * @param copyStrategy - copy Strategy to use - */ - public void setCopyStrategy(String copyStrategy) { - this.copyStrategy = copyStrategy; - } - - /** - * File path (hdfs:// or file://) that contains the list of actual - * files to copy - * - * @return - Source listing file path - */ - public Path getSourceFileListing() { - return sourceFileListing; - } - - /** - * Getter for sourcePaths. - * @return List of source-paths. - */ - public List<Path> getSourcePaths() { - return sourcePaths; - } - - /** - * Setter for sourcePaths. - * @param sourcePaths The new list of source-paths. - */ - public void setSourcePaths(List<Path> sourcePaths) { - assert sourcePaths != null && sourcePaths.size() != 0; - this.sourcePaths = sourcePaths; - } - - /** - * Getter for the targetPath. - * @return The target-path. - */ - public Path getTargetPath() { - return targetPath; - } - - /** - * Getter for the targetPathExists. - * @return The target-path. - */ - public boolean getTargetPathExists() { - return targetPathExists; - } - - /** - * Set targetPathExists. - * @param targetPathExists Whether the target path of distcp exists. - */ - public boolean setTargetPathExists(boolean targetPathExists) { - return this.targetPathExists = targetPathExists; - } - - /** - * File path that contains the list of patterns - * for paths to be filtered from the file copy. - * @return - Filter file path. - */ - public final String getFiltersFile() { - return filtersFile; - } - - /** - * Set filtersFile. - * @param filtersFilename The path to a list of patterns to exclude from copy. - */ - public final void setFiltersFile(String filtersFilename) { - this.filtersFile = filtersFilename; - } - - public final void setBlocksPerChunk(int csize) { - this.blocksPerChunk = csize; - } - - public final int getBlocksPerChunk() { + public int getBlocksPerChunk() { return blocksPerChunk; } - public final boolean splitLargeFile() { - return blocksPerChunk > 0; - } - - void validate() { - if ((useDiff || useRdiff) && deleteMissing) { - // -delete and -diff/-rdiff are mutually exclusive. For backward - // compatibility, we ignore the -delete option here, instead of throwing - // an IllegalArgumentException. See HDFS-10397 for more discussion. - OptionsParser.LOG.warn( - "-delete and -diff/-rdiff are mutually exclusive. " + - "The -delete option will be ignored."); - setDeleteMissing(false); - } - - if (syncFolder && atomicCommit) { - throw new IllegalArgumentException("Atomic commit can't be used with " + - "sync folder or overwrite options"); - } - - if (deleteMissing && !(overwrite || syncFolder)) { - throw new IllegalArgumentException("Delete missing is applicable " + - "only with update or overwrite options"); - } - - if (overwrite && syncFolder) { - throw new IllegalArgumentException("Overwrite and update options are " + - "mutually exclusive"); - } - - if (!syncFolder && skipCRC) { - throw new IllegalArgumentException("Skip CRC is valid only with update options"); - } - - if (!syncFolder && append) { - throw new IllegalArgumentException( - "Append is valid only with update options"); - } - if (skipCRC && append) { - throw new IllegalArgumentException( - "Append is disallowed when skipping CRC"); - } - if (!syncFolder && (useDiff || useRdiff)) { - throw new IllegalArgumentException( - "-diff/-rdiff is valid only with -update option"); - } - - if (useDiff || useRdiff) { - if (StringUtils.isBlank(fromSnapshot) || - StringUtils.isBlank(toSnapshot)) { - throw new IllegalArgumentException( - "Must provide both the starting and ending " + - "snapshot names for -diff/-rdiff"); - } - } - if (useDiff && useRdiff) { - throw new IllegalArgumentException( - "-diff and -rdiff are mutually exclusive"); - } - } - /** * Add options to configuration. These will be used in the Mapper/committer * @@ -715,20 +378,292 @@ public class DistCpOptions { ", mapBandwidth=" + mapBandwidth + ", copyStrategy='" + copyStrategy + '\'' + ", preserveStatus=" + preserveStatus + - ", preserveRawXattrs=" + preserveRawXattrs + ", atomicWorkPath=" + atomicWorkPath + ", logPath=" + logPath + ", sourceFileListing=" + sourceFileListing + ", sourcePaths=" + sourcePaths + ", targetPath=" + targetPath + - ", targetPathExists=" + targetPathExists + ", filtersFile='" + filtersFile + '\'' + ", blocksPerChunk=" + blocksPerChunk + '}'; } - @Override - protected DistCpOptions clone() throws CloneNotSupportedException { - return (DistCpOptions) super.clone(); + /** + * The builder of the {@link DistCpOptions}. + * + * This is designed to be the only public interface to create a + * {@link DistCpOptions} object for users. It follows a simple Builder design + * pattern. + */ + public static class Builder { + private Path sourceFileListing; + private List<Path> sourcePaths; + private Path targetPath; + + private boolean atomicCommit = false; + private Path atomicWorkPath; + private boolean syncFolder = false; + private boolean deleteMissing = false; + private boolean ignoreFailures = false; + private boolean overwrite = false; + private boolean append = false; + private boolean skipCRC = false; + private boolean blocking = true; + + private boolean useDiff = false; + private boolean useRdiff = false; + private String fromSnapshot; + private String toSnapshot; + + private String filtersFile; + + private Path logPath; + private String copyStrategy = DistCpConstants.UNIFORMSIZE; + + private int numListstatusThreads = 0; // 0 indicates that flag is not set. + private int maxMaps = DistCpConstants.DEFAULT_MAPS; + private float mapBandwidth = 0; // 0 indicates we should use the default + + private EnumSet<FileAttribute> preserveStatus = + EnumSet.noneOf(FileAttribute.class); + + private int blocksPerChunk = 0; + + public Builder(List<Path> sourcePaths, Path targetPath) { + Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), + "Source paths should not be null or empty!"); + Preconditions.checkArgument(targetPath != null, + "Target path should not be null!"); + this.sourcePaths = sourcePaths; + this.targetPath = targetPath; + } + + public Builder(Path sourceFileListing, Path targetPath) { + Preconditions.checkArgument(sourceFileListing != null, + "Source file listing should not be null!"); + Preconditions.checkArgument(targetPath != null, + "Target path should not be null!"); + + this.sourceFileListing = sourceFileListing; + this.targetPath = targetPath; + } + + /** + * This is the single entry point for constructing DistCpOptions objects. + * + * Before a new DistCpOptions object is returned, it will set the dependent + * options, validate the option combinations. After constructing, the + * DistCpOptions instance is immutable. + */ + public DistCpOptions build() { + setOptionsForSplitLargeFile(); + + validate(); + + return new DistCpOptions(this); + } + + /** + * Override options for split large files. + */ + private void setOptionsForSplitLargeFile() { + if (blocksPerChunk <= 0) { + return; + } + + LOG.info("Enabling preserving blocksize since " + + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed."); + preserve(FileAttribute.BLOCKSIZE); + + LOG.info("Set " + DistCpOptionSwitch.APPEND.getSwitch() + + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + + " is passed."); + this.append = false; + } + + private void validate() { + if ((useDiff || useRdiff) && deleteMissing) { + // -delete and -diff/-rdiff are mutually exclusive. + throw new IllegalArgumentException("-delete and -diff/-rdiff are " + + "mutually exclusive. The -delete option will be ignored."); + } + + if (!atomicCommit && atomicWorkPath != null) { + throw new IllegalArgumentException( + "-tmp work-path can only be specified along with -atomic"); + } + + if (syncFolder && atomicCommit) { + throw new IllegalArgumentException("Atomic commit can't be used with " + + "sync folder or overwrite options"); + } + + if (deleteMissing && !(overwrite || syncFolder)) { + throw new IllegalArgumentException("Delete missing is applicable " + + "only with update or overwrite options"); + } + + if (overwrite && syncFolder) { + throw new IllegalArgumentException("Overwrite and update options are " + + "mutually exclusive"); + } + + if (!syncFolder && skipCRC) { + throw new IllegalArgumentException( + "Skip CRC is valid only with update options"); + } + + if (!syncFolder && append) { + throw new IllegalArgumentException( + "Append is valid only with update options"); + } + if (skipCRC && append) { + throw new IllegalArgumentException( + "Append is disallowed when skipping CRC"); + } + if (!syncFolder && (useDiff || useRdiff)) { + throw new IllegalArgumentException( + "-diff/-rdiff is valid only with -update option"); + } + + if (useDiff || useRdiff) { + if (StringUtils.isBlank(fromSnapshot) || + StringUtils.isBlank(toSnapshot)) { + throw new IllegalArgumentException( + "Must provide both the starting and ending " + + "snapshot names for -diff/-rdiff"); + } + } + if (useDiff && useRdiff) { + throw new IllegalArgumentException( + "-diff and -rdiff are mutually exclusive"); + } + } + + @VisibleForTesting + Builder withSourcePaths(List<Path> newSourcePaths) { + this.sourcePaths = newSourcePaths; + return this; + } + + public Builder withAtomicCommit(boolean newAtomicCommit) { + this.atomicCommit = newAtomicCommit; + return this; + } + + public Builder withAtomicWorkPath(Path newAtomicWorkPath) { + this.atomicWorkPath = newAtomicWorkPath; + return this; + } + + public Builder withSyncFolder(boolean newSyncFolder) { + this.syncFolder = newSyncFolder; + return this; + } + + public Builder withDeleteMissing(boolean newDeleteMissing) { + this.deleteMissing = newDeleteMissing; + return this; + } + + public Builder withIgnoreFailures(boolean newIgnoreFailures) { + this.ignoreFailures = newIgnoreFailures; + return this; + } + + public Builder withOverwrite(boolean newOverwrite) { + this.overwrite = newOverwrite; + return this; + } + + public Builder withAppend(boolean newAppend) { + this.append = newAppend; + return this; + } + + public Builder withCRC(boolean newSkipCRC) { + this.skipCRC = newSkipCRC; + return this; + } + + public Builder withBlocking(boolean newBlocking) { + this.blocking = newBlocking; + return this; + } + + public Builder withUseDiff(String newFromSnapshot, String newToSnapshot) { + this.useDiff = true; + this.fromSnapshot = newFromSnapshot; + this.toSnapshot = newToSnapshot; + return this; + } + + public Builder withUseRdiff(String newFromSnapshot, String newToSnapshot) { + this.useRdiff = true; + this.fromSnapshot = newFromSnapshot; + this.toSnapshot = newToSnapshot; + return this; + } + + public Builder withFiltersFile(String newFiletersFile) { + this.filtersFile = newFiletersFile; + return this; + } + + public Builder withLogPath(Path newLogPath) { + this.logPath = newLogPath; + return this; + } + + public Builder withCopyStrategy(String newCopyStrategy) { + this.copyStrategy = newCopyStrategy; + return this; + } + + public Builder withMapBandwidth(float newMapBandwidth) { + Preconditions.checkArgument(newMapBandwidth > 0, + "Bandwidth " + newMapBandwidth + " is invalid (should be > 0)"); + this.mapBandwidth = newMapBandwidth; + return this; + } + + public Builder withNumListstatusThreads(int newNumListstatusThreads) { + if (newNumListstatusThreads > MAX_NUM_LISTSTATUS_THREADS) { + this.numListstatusThreads = MAX_NUM_LISTSTATUS_THREADS; + } else if (newNumListstatusThreads > 0) { + this.numListstatusThreads = newNumListstatusThreads; + } else { + this.numListstatusThreads = 0; + } + return this; + } + + public Builder maxMaps(int newMaxMaps) { + this.maxMaps = Math.max(newMaxMaps, 1); + return this; + } + + public Builder preserve(String attributes) { + if (attributes == null || attributes.isEmpty()) { + preserveStatus = EnumSet.allOf(FileAttribute.class); + } else { + for (int index = 0; index < attributes.length(); index++) { + preserveStatus.add(FileAttribute. + getAttribute(attributes.charAt(index))); + } + } + return this; + } + + public Builder preserve(FileAttribute attribute) { + preserveStatus.add(attribute); + return this; + } + + public Builder withBlocksPerChunk(int newBlocksPerChunk) { + this.blocksPerChunk = newBlocksPerChunk; + return this; + } } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index bcae96a..a78320b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -48,7 +48,7 @@ import java.util.HashSet; * source.s1 */ class DistCpSync { - private DistCpOptions inputOptions; + private DistCpContext context; private Configuration conf; // diffMap maps snapshot diff op type to a list of diff ops. // It's initially created based on the snapshot diff. Then the individual @@ -58,13 +58,13 @@ class DistCpSync { private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap; private DiffInfo[] renameDiffs; - DistCpSync(DistCpOptions options, Configuration conf) { - this.inputOptions = options; + DistCpSync(DistCpContext context, Configuration conf) { + this.context = context; this.conf = conf; } private boolean isRdiff() { - return inputOptions.shouldUseRdiff(); + return context.shouldUseRdiff(); } /** @@ -77,14 +77,14 @@ class DistCpSync { * default distcp if the third condition isn't met. */ private boolean preSyncCheck() throws IOException { - List<Path> sourcePaths = inputOptions.getSourcePaths(); + List<Path> sourcePaths = context.getSourcePaths(); if (sourcePaths.size() != 1) { // we only support one source dir which must be a snapshottable directory throw new IllegalArgumentException(sourcePaths.size() + " source paths are provided"); } final Path sourceDir = sourcePaths.get(0); - final Path targetDir = inputOptions.getTargetPath(); + final Path targetDir = context.getTargetPath(); final FileSystem srcFs = sourceDir.getFileSystem(conf); final FileSystem tgtFs = targetDir.getFileSystem(conf); @@ -104,13 +104,15 @@ class DistCpSync { // make sure targetFS has no change between from and the current states if (!checkNoChange(targetFs, targetDir)) { // set the source path using the snapshot path - inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, - inputOptions.getToSnapshot()))); + context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, + context.getToSnapshot()))); return false; } - final String from = getSnapshotName(inputOptions.getFromSnapshot()); - final String to = getSnapshotName(inputOptions.getToSnapshot()); + final String from = getSnapshotName( + context.getFromSnapshot()); + final String to = getSnapshotName( + context.getToSnapshot()); try { final FileStatus fromSnapshotStat = @@ -152,9 +154,9 @@ class DistCpSync { return false; } - List<Path> sourcePaths = inputOptions.getSourcePaths(); + List<Path> sourcePaths = context.getSourcePaths(); final Path sourceDir = sourcePaths.get(0); - final Path targetDir = inputOptions.getTargetPath(); + final Path targetDir = context.getTargetPath(); final FileSystem tfs = targetDir.getFileSystem(conf); final DistributedFileSystem targetFs = (DistributedFileSystem) tfs; @@ -175,8 +177,8 @@ class DistCpSync { deleteTargetTmpDir(targetFs, tmpDir); // TODO: since we have tmp directory, we can support "undo" with failures // set the source path using the snapshot path - inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, - inputOptions.getToSnapshot()))); + context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, + context.getToSnapshot()))); } } @@ -187,13 +189,13 @@ class DistCpSync { */ private boolean getAllDiffs() throws IOException { Path ssDir = isRdiff()? - inputOptions.getTargetPath() : inputOptions.getSourcePaths().get(0); + context.getTargetPath() : context.getSourcePaths().get(0); try { DistributedFileSystem fs = (DistributedFileSystem) ssDir.getFileSystem(conf); - final String from = getSnapshotName(inputOptions.getFromSnapshot()); - final String to = getSnapshotName(inputOptions.getToSnapshot()); + final String from = getSnapshotName(context.getFromSnapshot()); + final String to = getSnapshotName(context.getToSnapshot()); SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir, from, to); this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class); @@ -273,19 +275,19 @@ class DistCpSync { */ private boolean checkNoChange(DistributedFileSystem fs, Path path) { try { - final String from = getSnapshotName(inputOptions.getFromSnapshot()); + final String from = getSnapshotName(context.getFromSnapshot()); SnapshotDiffReport targetDiff = fs.getSnapshotDiffReport(path, from, ""); if (!targetDiff.getDiffList().isEmpty()) { DistCp.LOG.warn("The target has been modified since snapshot " - + inputOptions.getFromSnapshot()); + + context.getFromSnapshot()); return false; } else { return true; } } catch (IOException e) { DistCp.LOG.warn("Failed to compute snapshot diff on " + path - + " at snapshot " + inputOptions.getFromSnapshot(), e); + + " at snapshot " + context.getFromSnapshot(), e); } return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java index 2bc343e..c356edd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java @@ -52,7 +52,7 @@ public class FileBasedCopyListing extends CopyListing { /** {@inheritDoc} */ @Override - protected void validatePaths(DistCpOptions options) + protected void validatePaths(DistCpContext context) throws IOException, InvalidInputException { } @@ -60,14 +60,14 @@ public class FileBasedCopyListing extends CopyListing { * Implementation of CopyListing::buildListing(). * Iterates over all source paths mentioned in the input-file. * @param pathToListFile Path on HDFS where the listing file is written. - * @param options Input Options for DistCp (indicating source/target paths.) + * @param context Distcp context with associated input options. * @throws IOException */ @Override - public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException { - DistCpOptions newOption = new DistCpOptions(options); - newOption.setSourcePaths(fetchFileList(options.getSourceFileListing())); - globbedListing.buildListing(pathToListFile, newOption); + public void doBuildListing(Path pathToListFile, DistCpContext context) + throws IOException { + context.setSourcePaths(fetchFileList(context.getSourceFileListing())); + globbedListing.buildListing(pathToListFile, context); } private List<Path> fetchFileList(Path sourceListing) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java index 27330b7..63c6f43 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java @@ -51,7 +51,7 @@ public class GlobbedCopyListing extends CopyListing { /** {@inheritDoc} */ @Override - protected void validatePaths(DistCpOptions options) + protected void validatePaths(DistCpContext context) throws IOException, InvalidInputException { } @@ -60,19 +60,19 @@ public class GlobbedCopyListing extends CopyListing { * Creates the copy listing by "globbing" all source-paths. * @param pathToListingFile The location at which the copy-listing file * is to be created. - * @param options Input Options for DistCp (indicating source/target paths.) + * @param context The distcp context with associated input options. * @throws IOException */ @Override - public void doBuildListing(Path pathToListingFile, - DistCpOptions options) throws IOException { + public void doBuildListing(Path pathToListingFile, DistCpContext context) + throws IOException { List<Path> globbedPaths = new ArrayList<Path>(); - if (options.getSourcePaths().isEmpty()) { + if (context.getSourcePaths().isEmpty()) { throw new InvalidInputException("Nothing to process. Source paths::EMPTY"); } - for (Path p : options.getSourcePaths()) { + for (Path p : context.getSourcePaths()) { FileSystem fs = p.getFileSystem(getConf()); FileStatus[] inputs = fs.globStatus(p); @@ -85,9 +85,8 @@ public class GlobbedCopyListing extends CopyListing { } } - DistCpOptions optionsGlobbed = new DistCpOptions(options); - optionsGlobbed.setSourcePaths(globbedPaths); - simpleListing.buildListing(pathToListingFile, optionsGlobbed); + context.setSourcePaths(globbedPaths); + simpleListing.buildListing(pathToListingFile, context); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 8881264..21ff0f8 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import com.google.common.base.Preconditions; @@ -95,253 +94,126 @@ public class OptionsParser { Arrays.toString(args), e); } - DistCpOptions option = parseSourceAndTargetPaths(command); - - option.setIgnoreFailures( - command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())); - - option.setAtomicCommit( - command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())); - - option.setSyncFolder( - command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())); - - option.setOverwrite( - command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())); - - option.setAppend( - command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())); - - option.setDeleteMissing( - command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())); - - option.setSkipCRC( - command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())); - - if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) && - option.shouldAtomicCommit()) { - String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch()); - if (workPath != null && !workPath.isEmpty()) { - option.setAtomicWorkPath(new Path(workPath)); - } - } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) { - throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic"); - } - - if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) { - option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch()))); - } - - - if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) { - option.setBlocking(false); - } - - parseBandwidth(command, option); - - parseNumListStatusThreads(command, option); - - parseMaxMaps(command, option); - - if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) { - option.setCopyStrategy( - getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); - } - - parsePreserveStatus(command, option); + DistCpOptions.Builder builder = parseSourceAndTargetPaths(command); + builder + .withAtomicCommit( + command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) + .withSyncFolder( + command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) + .withDeleteMissing( + command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) + .withIgnoreFailures( + command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) + .withOverwrite( + command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) + .withAppend( + command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) + .withCRC( + command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) + .withBlocking( + !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); checkSnapshotsArgs(snapshots); - option.setUseDiff(snapshots[0], snapshots[1]); + builder.withUseDiff(snapshots[0], snapshots[1]); } if (command.hasOption(DistCpOptionSwitch.RDIFF.getSwitch())) { String[] snapshots = getVals(command, DistCpOptionSwitch.RDIFF.getSwitch()); checkSnapshotsArgs(snapshots); - option.setUseRdiff(snapshots[0], snapshots[1]); + builder.withUseRdiff(snapshots[0], snapshots[1]); } - parseFileLimit(command); - - parseSizeLimit(command); - if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) { - option.setFiltersFile(getVal(command, - DistCpOptionSwitch.FILTERS.getSwitch())); + builder.withFiltersFile( + getVal(command, DistCpOptionSwitch.FILTERS.getSwitch())); } - parseBlocksPerChunk(command, option); - - option.validate(); - - return option; - } - - - /** - * A helper method to parse chunk size in number of blocks. - * Used when breaking large file into chunks to copy in parallel. - * - * @param command command line arguments - */ - private static void parseBlocksPerChunk(CommandLine command, - DistCpOptions option) { - boolean hasOption = - command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()); - LOG.info("parseChunkSize: " + - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption); - if (hasOption) { - String chunkSizeString = getVal(command, - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim()); - try { - int csize = Integer.parseInt(chunkSizeString); - if (csize < 0) { - csize = 0; - } - LOG.info("Set distcp blocksPerChunk to " + csize); - option.setBlocksPerChunk(csize); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("blocksPerChunk is invalid: " - + chunkSizeString, e); - } + if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) { + builder.withLogPath( + new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch()))); } - } - /** - * parseSizeLimit is a helper method for parsing the deprecated - * argument SIZE_LIMIT. - * - * @param command command line arguments - */ - private static void parseSizeLimit(CommandLine command) { - if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { - String sizeLimitString = getVal(command, - DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); - try { - Long.parseLong(sizeLimitString); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Size-limit is invalid: " - + sizeLimitString, e); + if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) { + final String workPath = getVal(command, + DistCpOptionSwitch.WORK_PATH.getSwitch()); + if (workPath != null && !workPath.isEmpty()) { + builder.withAtomicWorkPath(new Path(workPath)); } - LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); } - } - /** - * parseFileLimit is a helper method for parsing the deprecated - * argument FILE_LIMIT. - * - * @param command command line arguments - */ - private static void parseFileLimit(CommandLine command) { - if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { - String fileLimitString = getVal(command, - DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); + if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { - Integer.parseInt(fileLimitString); + final Float mapBandwidth = Float.parseFloat( + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch())); + builder.withMapBandwidth(mapBandwidth); } catch (NumberFormatException e) { - throw new IllegalArgumentException("File-limit is invalid: " - + fileLimitString, e); + throw new IllegalArgumentException("Bandwidth specified is invalid: " + + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e); } - LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); } - } - /** - * parsePreserveStatus is a helper method for parsing PRESERVE_STATUS. - * - * @param command command line arguments - * @param option parsed distcp options - */ - private static void parsePreserveStatus(CommandLine command, - DistCpOptions option) { - if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { - String attributes = - getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()); - if (attributes == null || attributes.isEmpty()) { - for (FileAttribute attribute : FileAttribute.values()) { - option.preserve(attribute); - } - } else { - for (int index = 0; index < attributes.length(); index++) { - option.preserve(FileAttribute. - getAttribute(attributes.charAt(index))); - } + if (command.hasOption( + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { + try { + final Integer numThreads = Integer.parseInt(getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())); + builder.withNumListstatusThreads(numThreads); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Number of liststatus threads is invalid: " + getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); } } - } - /** - * parseMaxMaps is a helper method for parsing MAX_MAPS. - * - * @param command command line arguments - * @param option parsed distcp options - */ - private static void parseMaxMaps(CommandLine command, - DistCpOptions option) { if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { try { - Integer maps = Integer.parseInt( - getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim()); - option.setMaxMaps(maps); + final Integer maps = Integer.parseInt( + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch())); + builder.maxMaps(maps); } catch (NumberFormatException e) { throw new IllegalArgumentException("Number of maps is invalid: " + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e); } } - } - /** - * parseNumListStatusThreads is a helper method for parsing - * NUM_LISTSTATUS_THREADS. - * - * @param command command line arguments - * @param option parsed distcp options - */ - private static void parseNumListStatusThreads(CommandLine command, - DistCpOptions option) { - if (command.hasOption( - DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { - try { - Integer numThreads = Integer.parseInt(getVal(command, - DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim()); - option.setNumListstatusThreads(numThreads); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - "Number of liststatus threads is invalid: " + getVal(command, - DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); - } + if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) { + builder.withCopyStrategy( + getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); } - } - /** - * parseBandwidth is a helper method for parsing BANDWIDTH. - * - * @param command command line arguments - * @param option parsed distcp options - */ - private static void parseBandwidth(CommandLine command, - DistCpOptions option) { - if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { + if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { + builder.preserve( + getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())); + } + + if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { + LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" + + " option. Ignoring."); + } + + if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { + LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + + " option. Ignoring."); + } + + if (command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch())) { + final String chunkSizeStr = getVal(command, + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim()); try { - Float mapBandwidth = Float.parseFloat( - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); - if (mapBandwidth <= 0) { - throw new IllegalArgumentException("Bandwidth specified is not " + - "positive: " + mapBandwidth); - } - option.setMapBandwidth(mapBandwidth); + int csize = Integer.parseInt(chunkSizeStr); + csize = csize > 0 ? csize : 0; + LOG.info("Set distcp blocksPerChunk to " + csize); + builder.withBlocksPerChunk(csize); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Bandwidth specified is invalid: " + - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e); + throw new IllegalArgumentException("blocksPerChunk is invalid: " + + chunkSizeStr, e); } } + + return builder.build(); } /** @@ -351,9 +223,8 @@ public class OptionsParser { * @param command command line arguments * @return DistCpOptions */ - private static DistCpOptions parseSourceAndTargetPaths( + private static DistCpOptions.Builder parseSourceAndTargetPaths( CommandLine command) { - DistCpOptions option; Path targetPath; List<Path> sourcePaths = new ArrayList<Path>(); @@ -378,20 +249,22 @@ public class OptionsParser { throw new IllegalArgumentException("Both source file listing and " + "source paths present"); } - option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch. - SOURCE_FILE_LISTING.getSwitch())), targetPath); + return new DistCpOptions.Builder(new Path(getVal(command, + DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())), targetPath); } else { if (sourcePaths.isEmpty()) { throw new IllegalArgumentException("Neither source file listing nor " + "source paths present"); } - option = new DistCpOptions(sourcePaths, targetPath); + return new DistCpOptions.Builder(sourcePaths, targetPath); } - return option; } private static String getVal(CommandLine command, String swtch) { - String optionValue = command.getOptionValue(swtch); + if (swtch == null) { + return null; + } + String optionValue = command.getOptionValue(swtch.trim()); if (optionValue == null) { return null; } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
