HDFS-11786. Add support to make copyFromLocal multi threaded. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02b141ac Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02b141ac Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02b141ac Branch: refs/heads/HDFS-7240 Commit: 02b141ac6059323ec43e472ca36dc570fdca386f Parents: b778887 Author: Anu Engineer <aengin...@apache.org> Authored: Sun Jul 16 10:59:34 2017 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Sun Jul 16 10:59:34 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/fs/shell/CopyCommands.java | 112 +++++++++++- .../apache/hadoop/fs/shell/MoveCommands.java | 4 +- .../hadoop/fs/shell/TestCopyFromLocal.java | 173 +++++++++++++++++++ .../hadoop/fs/shell/TestCopyPreserveFlag.java | 19 ++ .../src/test/resources/testConf.xml | 44 ++++- 5 files changed, 346 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index e2fad75..7b3c53e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -26,7 +26,11 @@ import java.net.URISyntaxException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; @@ -288,9 +292,113 @@ class CopyCommands { } public static class CopyFromLocal extends Put { + private ThreadPoolExecutor executor = null; + private int numThreads = 1; + + private static final int MAX_THREADS = + Runtime.getRuntime().availableProcessors() * 2; public static final String NAME = "copyFromLocal"; - public static final String USAGE = Put.USAGE; - public static final String DESCRIPTION = "Identical to the -put command."; + public static final String USAGE = + "[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>"; + public static final String DESCRIPTION = + "Copy files from the local file system " + + "into fs. Copying fails if the file already " + + "exists, unless the -f flag is given.\n" + + "Flags:\n" + + " -p : Preserves access and modification times, ownership and the" + + " mode.\n" + + " -f : Overwrites the destination if it already exists.\n" + + " -t <thread count> : Number of threads to be used, default is 1.\n" + + " -l : Allow DataNode to lazily persist the file to disk. Forces" + + " replication factor of 1. This flag will result in reduced" + + " durability. Use with care.\n" + + " -d : Skip creation of temporary file(<dst>._COPYING_).\n"; + + private void setNumberThreads(String numberThreadsString) { + if (numberThreadsString == null) { + numThreads = 1; + } else { + int parsedValue = Integer.parseInt(numberThreadsString); + if (parsedValue <= 1) { + numThreads = 1; + } else if (parsedValue > MAX_THREADS) { + numThreads = MAX_THREADS; + } else { + numThreads = parsedValue; + } + } + } + + @Override + protected void processOptions(LinkedList<String> args) throws IOException { + CommandFormat cf = + new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); + cf.addOptionWithValue("t"); + cf.parse(args); + setNumberThreads(cf.getOptValue("t")); + setOverwrite(cf.getOpt("f")); + setPreserve(cf.getOpt("p")); + setLazyPersist(cf.getOpt("l")); + setDirectWrite(cf.getOpt("d")); + getRemoteDestination(args); + // should have a -r option + setRecursive(true); + } + + private void copyFile(PathData src, PathData target) throws IOException { + if (isPathRecursable(src)) { + throw new PathIsDirectoryException(src.toString()); + } + super.copyFileToTarget(src, target); + } + + @Override + protected void copyFileToTarget(PathData src, PathData target) + throws IOException { + // if number of thread is 1, mimic put and avoid threading overhead + if (numThreads == 1) { + copyFile(src, target); + return; + } + + Runnable task = () -> { + try { + copyFile(src, target); + } catch (IOException e) { + displayError(e); + } + }; + executor.submit(task); + } + + @Override + protected void processArguments(LinkedList<PathData> args) + throws IOException { + executor = new ThreadPoolExecutor(numThreads, numThreads, 1, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + super.processArguments(args); + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + executor.shutdownNow(); + displayError(e); + Thread.currentThread().interrupt(); + } + } + + @VisibleForTesting + public int getNumThreads() { + return numThreads; + } + + @VisibleForTesting + public ThreadPoolExecutor getExecutor() { + return executor; + } } public static class CopyToLocal extends Get { http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java index d359282..5ef4277 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; +import org.apache.hadoop.fs.shell.CopyCommands.Put; /** Various commands for moving files */ @InterfaceAudience.Private @@ -41,7 +41,7 @@ class MoveCommands { /** * Move local files to a remote filesystem */ - public static class MoveFromLocal extends CopyFromLocal { + public static class MoveFromLocal extends Put { public static final String NAME = "moveFromLocal"; public static final String USAGE = "<localsrc> ... <dst>"; public static final String DESCRIPTION = http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java new file mode 100644 index 0000000..8d354b4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.shell; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.Assert; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; + +/** + * Test for copyFromLocal. + */ +public class TestCopyFromLocal { + private static final String FROM_DIR_NAME = "fromDir"; + private static final String TO_DIR_NAME = "toDir"; + + private static FileSystem fs; + private static Path testDir; + private static Configuration conf; + + public static int initialize(Path dir) throws Exception { + fs.mkdirs(dir); + Path fromDirPath = new Path(dir, FROM_DIR_NAME); + fs.mkdirs(fromDirPath); + Path toDirPath = new Path(dir, TO_DIR_NAME); + fs.mkdirs(toDirPath); + + int numTotalFiles = 0; + int numDirs = RandomUtils.nextInt(5); + for (int dirCount = 0; dirCount < numDirs; ++dirCount) { + Path subDirPath = new Path(fromDirPath, "subdir" + dirCount); + fs.mkdirs(subDirPath); + int numFiles = RandomUtils.nextInt(10); + for (int fileCount = 0; fileCount < numFiles; ++fileCount) { + numTotalFiles++; + Path subFile = new Path(subDirPath, "file" + fileCount); + fs.createNewFile(subFile); + FSDataOutputStream output = fs.create(subFile, true); + for(int i = 0; i < 100; ++i) { + output.writeInt(i); + output.writeChar('\n'); + } + output.close(); + } + } + + return numTotalFiles; + } + + @BeforeClass + public static void init() throws Exception { + conf = new Configuration(false); + conf.set("fs.file.impl", LocalFileSystem.class.getName()); + fs = FileSystem.getLocal(conf); + testDir = new FileSystemTestHelper().getTestRootPath(fs); + // don't want scheme on the path, just an absolute path + testDir = new Path(fs.makeQualified(testDir).toUri().getPath()); + + FileSystem.setDefaultUri(conf, fs.getUri()); + fs.setWorkingDirectory(testDir); + } + + @AfterClass + public static void cleanup() throws Exception { + fs.delete(testDir, true); + fs.close(); + } + + private void run(CommandWithDestination cmd, String... args) { + cmd.setConf(conf); + assertEquals(0, cmd.run(args)); + } + + @Test(timeout = 10000) + public void testCopyFromLocal() throws Exception { + Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + TestCopyFromLocal.initialize(dir); + run(new TestMultiThreadedCopy(1, 0), + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyFromLocalWithThreads() throws Exception { + Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + int numFiles = TestCopyFromLocal.initialize(dir); + int maxThreads = Runtime.getRuntime().availableProcessors() * 2; + int randThreads = RandomUtils.nextInt(maxThreads); + int numActualThreads = randThreads == 0 ? 1 : randThreads; + String numThreads = Integer.toString(numActualThreads); + run(new TestMultiThreadedCopy(numActualThreads, numFiles), "-t", numThreads, + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyFromLocalWithThreadWrong() throws Exception { + Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + int numFiles = TestCopyFromLocal.initialize(dir); + int maxThreads = Runtime.getRuntime().availableProcessors() * 2; + String numThreads = Integer.toString(maxThreads * 2); + run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads, + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyFromLocalWithZeroThreads() throws Exception { + Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + TestCopyFromLocal.initialize(dir); + run(new TestMultiThreadedCopy(1, 0), "-t", "0", + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + private class TestMultiThreadedCopy extends CopyFromLocal { + private int expectedThreads; + private int expectedCompletedTaskCount; + + TestMultiThreadedCopy(int expectedThreads, + int expectedCompletedTaskCount) { + this.expectedThreads = expectedThreads; + this.expectedCompletedTaskCount = expectedCompletedTaskCount; + } + + @Override + protected void processArguments(LinkedList<PathData> args) + throws IOException { + // Check if the correct number of threads are spawned + Assert.assertEquals(expectedThreads, getNumThreads()); + super.processArguments(args); + // Once the copy is complete, check following + // 1) number of completed tasks are same as expected + // 2) There are no active tasks in the executor + // 3) Executor has shutdown correctly + ThreadPoolExecutor executor = getExecutor(); + Assert.assertEquals(executor.getCompletedTaskCount(), + expectedCompletedTaskCount); + Assert.assertEquals(executor.getActiveCount(), 0); + Assert.assertTrue(executor.isTerminated()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java index 47dc601..8dd09e5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.shell.CopyCommands.Cp; import org.apache.hadoop.fs.shell.CopyCommands.Get; import org.apache.hadoop.fs.shell.CopyCommands.Put; +import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -120,6 +121,24 @@ public class TestCopyPreserveFlag { } @Test(timeout = 10000) + public void testCopyFromLocal() throws Exception { + run(new CopyFromLocal(), FROM.toString(), TO.toString()); + assertAttributesChanged(TO); + } + + @Test(timeout = 10000) + public void testCopyFromLocalWithThreads() throws Exception { + run(new CopyFromLocal(), "-t", "10", FROM.toString(), TO.toString()); + assertAttributesChanged(TO); + } + + @Test(timeout = 10000) + public void testCopyFromLocalWithThreadsPreserve() throws Exception { + run(new CopyFromLocal(), "-p", "-t", "10", FROM.toString(), TO.toString()); + assertAttributesPreserved(TO); + } + + @Test(timeout = 10000) public void testGetWithP() throws Exception { run(new Get(), "-p", FROM.toString(), TO.toString()); assertAttributesPreserved(TO); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index 342b17c..64677f8 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -547,11 +547,51 @@ <comparators> <comparator> <type>RegexpComparator</type> - <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s*</expected-output> + <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output> </comparator> <comparator> <type>RegexpComparator</type> - <expected-output>^\s*Identical to the -put command\.\s*</expected-output> + <expected-output>^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*Flags:( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*-p Preserves access and modification times, ownership and the( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*mode.( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*-t <thread count> Number of threads to be used, default is 1.( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*durability. Use with care.( )*</expected-output> + </comparator> + <comparator> + <type>RegexpComparator</type> + <expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output> </comparator> </comparators> </test> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org