Author: vinodkv Date: Thu Mar 20 02:46:23 2014 New Revision: 1579515 URL: http://svn.apache.org/r1579515 Log: MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block location calls in parallel. Contributed by Siddharth Seth.
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Mar 20 02:46:23 2014 @@ -199,6 +199,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to handle cross platform application submissions. (Jian He via vinodkv) + MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block + location calls in parallel. (Siddharth Seth via vinodkv) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar 20 02:46:23 2014 @@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + /** * A base class for file-based {@link InputFormat}. * @@ -203,10 +206,7 @@ public abstract class FileInputFormat<K, // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); - - List<FileStatus> result = new ArrayList<FileStatus>(); - List<IOException> errors = new ArrayList<IOException>(); - + // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); @@ -217,6 +217,41 @@ public abstract class FileInputFormat<K, } PathFilter inputFilter = new MultiPathFilter(filters); + FileStatus[] result; + int numThreads = job + .getInt( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); + + Stopwatch sw = new Stopwatch().start(); + if (numThreads == 1) { + List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); + result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); + } else { + Iterable<FileStatus> locatedFiles = null; + try { + + LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( + job, dirs, recursive, inputFilter, false); + locatedFiles = locatedFileStatusFetcher.getFileStatuses(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting file statuses"); + } + result = Iterables.toArray(locatedFiles, FileStatus.class); + } + + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); + } + LOG.info("Total input paths to process : " + result.length); + return result; + } + + private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, + PathFilter inputFilter, boolean recursive) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + List<IOException> errors = new ArrayList<IOException>(); for (Path p: dirs) { FileSystem fs = p.getFileSystem(job); FileStatus[] matches = fs.globStatus(p, inputFilter); @@ -246,12 +281,10 @@ public abstract class FileInputFormat<K, } } } - if (!errors.isEmpty()) { throw new InvalidInputException(errors); } - LOG.info("Total input paths to process : " + result.size()); - return result.toArray(new FileStatus[result.size()]); + return result; } /** @@ -267,6 +300,7 @@ public abstract class FileInputFormat<K, * they're too big.*/ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen @@ -325,7 +359,11 @@ public abstract class FileInputFormat<K, splits.add(makeSplit(path, 0, length, new String[0])); } } - LOG.debug("Total # of splits: " + splits.size()); + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Total # of splits generated by getSplits: " + splits.size() + + ", TimeTaken: " + sw.elapsedMillis()); + } return splits.toArray(new FileSplit[splits.size()]); } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java?rev=1579515&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java Thu Mar 20 02:46:23 2014 @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Utility class to fetch block locations for specified Input paths using a + * configured number of threads. + */ +@Private +public class LocatedFileStatusFetcher { + + private final Path[] inputDirs; + private final PathFilter inputFilter; + private final Configuration conf; + private final boolean recursive; + private final boolean newApi; + + private final ExecutorService rawExec; + private final ListeningExecutorService exec; + private final BlockingQueue<List<FileStatus>> resultQueue; + private final List<IOException> invalidInputErrors = new LinkedList<IOException>(); + + private final ProcessInitialInputPathCallback processInitialInputPathCallback = + new ProcessInitialInputPathCallback(); + private final ProcessInputDirCallback processInputDirCallback = + new ProcessInputDirCallback(); + + private final AtomicInteger runningTasks = new AtomicInteger(0); + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + + private volatile Throwable unknownError; + + /** + * @param conf configuration for the job + * @param dirs the initial list of paths + * @param recursive whether to traverse the patchs recursively + * @param inputFilter inputFilter to apply to the resulting paths + * @param newApi whether using the mapred or mapreduce API + * @throws InterruptedException + * @throws IOException + */ + public LocatedFileStatusFetcher(Configuration conf, Path[] dirs, + boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException, + IOException { + int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS, + FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); + rawExec = Executors.newFixedThreadPool( + numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("GetFileInfo #%d").build()); + exec = MoreExecutors.listeningDecorator(rawExec); + resultQueue = new LinkedBlockingQueue<List<FileStatus>>(); + this.conf = conf; + this.inputDirs = dirs; + this.recursive = recursive; + this.inputFilter = inputFilter; + this.newApi = newApi; + } + + /** + * Start executing and return FileStatuses based on the parameters specified + * @return fetched file statuses + * @throws InterruptedException + * @throws IOException + */ + public Iterable<FileStatus> getFileStatuses() throws InterruptedException, + IOException { + // Increment to make sure a race between the first thread completing and the + // rest being scheduled does not lead to a termination. + runningTasks.incrementAndGet(); + for (Path p : inputDirs) { + runningTasks.incrementAndGet(); + ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec + .submit(new ProcessInitialInputPathCallable(p, conf, inputFilter)); + Futures.addCallback(future, processInitialInputPathCallback); + } + + runningTasks.decrementAndGet(); + + lock.lock(); + try { + while (runningTasks.get() != 0 && unknownError == null) { + condition.await(); + } + } finally { + lock.unlock(); + } + this.exec.shutdownNow(); + if (this.unknownError != null) { + if (this.unknownError instanceof Error) { + throw (Error) this.unknownError; + } else if (this.unknownError instanceof RuntimeException) { + throw (RuntimeException) this.unknownError; + } else if (this.unknownError instanceof IOException) { + throw (IOException) this.unknownError; + } else if (this.unknownError instanceof InterruptedException) { + throw (InterruptedException) this.unknownError; + } else { + throw new IOException(this.unknownError); + } + } + if (this.invalidInputErrors.size() != 0) { + if (this.newApi) { + throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException( + invalidInputErrors); + } else { + throw new InvalidInputException(invalidInputErrors); + } + } + return Iterables.concat(resultQueue); + } + + /** + * Collect misconfigured Input errors. Errors while actually reading file info + * are reported immediately + */ + private void registerInvalidInputError(List<IOException> errors) { + synchronized (this) { + this.invalidInputErrors.addAll(errors); + } + } + + /** + * Register fatal errors - example an IOException while accessing a file or a + * full exection queue + */ + private void registerError(Throwable t) { + lock.lock(); + try { + if (unknownError != null) { + unknownError = t; + condition.signal(); + } + + } finally { + lock.unlock(); + } + } + + private void decrementRunningAndCheckCompletion() { + lock.lock(); + try { + if (runningTasks.decrementAndGet() == 0) { + condition.signal(); + } + } finally { + lock.unlock(); + } + } + + /** + * Retrieves block locations for the given @link {@link FileStatus}, and adds + * additional paths to the process queue if required. + */ + private static class ProcessInputDirCallable implements + Callable<ProcessInputDirCallable.Result> { + + private final FileSystem fs; + private final FileStatus fileStatus; + private final boolean recursive; + private final PathFilter inputFilter; + + ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus, + boolean recursive, PathFilter inputFilter) { + this.fs = fs; + this.fileStatus = fileStatus; + this.recursive = recursive; + this.inputFilter = inputFilter; + } + + @Override + public Result call() throws Exception { + Result result = new Result(); + result.fs = fs; + + if (fileStatus.isDirectory()) { + RemoteIterator<LocatedFileStatus> iter = fs + .listLocatedStatus(fileStatus.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + result.dirsNeedingRecursiveCalls.add(stat); + } else { + result.locatedFileStatuses.add(stat); + } + } + } + } else { + result.locatedFileStatuses.add(fileStatus); + } + return result; + } + + private static class Result { + private List<FileStatus> locatedFileStatuses = new LinkedList<FileStatus>(); + private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<FileStatus>(); + private FileSystem fs; + } + } + + /** + * The callback handler to handle results generated by + * {@link ProcessInputDirCallable}. This populates the final result set. + * + */ + private class ProcessInputDirCallback implements + FutureCallback<ProcessInputDirCallable.Result> { + + @Override + public void onSuccess(ProcessInputDirCallable.Result result) { + try { + if (result.locatedFileStatuses.size() != 0) { + resultQueue.add(result.locatedFileStatuses); + } + if (result.dirsNeedingRecursiveCalls.size() != 0) { + for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) { + runningTasks.incrementAndGet(); + ListenableFuture<ProcessInputDirCallable.Result> future = exec + .submit(new ProcessInputDirCallable(result.fs, fileStatus, + recursive, inputFilter)); + Futures.addCallback(future, processInputDirCallback); + } + } + decrementRunningAndCheckCompletion(); + } catch (Throwable t) { // Error within the callback itself. + registerError(t); + } + } + + @Override + public void onFailure(Throwable t) { + // Any generated exceptions. Leads to immediate termination. + registerError(t); + } + } + + + /** + * Processes an initial Input Path pattern through the globber and PathFilter + * to generate a list of files which need further processing. + */ + private static class ProcessInitialInputPathCallable implements + Callable<ProcessInitialInputPathCallable.Result> { + + private final Path path; + private final Configuration conf; + private final PathFilter inputFilter; + + public ProcessInitialInputPathCallable(Path path, Configuration conf, + PathFilter pathFilter) { + this.path = path; + this.conf = conf; + this.inputFilter = pathFilter; + } + + @Override + public Result call() throws Exception { + Result result = new Result(); + FileSystem fs = path.getFileSystem(conf); + result.fs = fs; + FileStatus[] matches = fs.globStatus(path, inputFilter); + if (matches == null) { + result.addError(new IOException("Input path does not exist: " + path)); + } else if (matches.length == 0) { + result.addError(new IOException("Input Pattern " + path + + " matches 0 files")); + } else { + result.matchedFileStatuses = matches; + } + return result; + } + + private static class Result { + private List<IOException> errors; + private FileStatus[] matchedFileStatuses; + private FileSystem fs; + + void addError(IOException ioe) { + if (errors == null) { + errors = new LinkedList<IOException>(); + } + errors.add(ioe); + } + } + } + + /** + * The callback handler to handle results generated by + * {@link ProcessInitialInputPathCallable} + * + */ + private class ProcessInitialInputPathCallback implements + FutureCallback<ProcessInitialInputPathCallable.Result> { + + @Override + public void onSuccess(ProcessInitialInputPathCallable.Result result) { + try { + if (result.errors != null) { + registerInvalidInputError(result.errors); + } + if (result.matchedFileStatuses != null) { + for (FileStatus matched : result.matchedFileStatuses) { + runningTasks.incrementAndGet(); + ListenableFuture<ProcessInputDirCallable.Result> future = exec + .submit(new ProcessInputDirCallable(result.fs, matched, + recursive, inputFilter)); + Futures.addCallback(future, processInputDirCallback); + } + } + decrementRunningAndCheckCompletion(); + } catch (Throwable t) { // Exception within the callback + registerError(t); + } + } + + @Override + public void onFailure(Throwable t) { + // Any generated exceptions. Leads to immediate termination. + registerError(t); + } + } +} \ No newline at end of file Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Thu Mar 20 02:46:23 2014 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.LocatedFileStatusFetcher; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.secur import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + /** * A base class for file-based {@link InputFormat}s. * @@ -68,6 +72,9 @@ public abstract class FileInputFormat<K, "mapreduce.input.fileinputformat.numinputfiles"; public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive"; + public static final String LIST_STATUS_NUM_THREADS = + "mapreduce.input.fileinputformat.list-status.num-threads"; + public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; private static final Log LOG = LogFactory.getLog(FileInputFormat.class); @@ -225,7 +232,6 @@ public abstract class FileInputFormat<K, */ protected List<FileStatus> listStatus(JobContext job ) throws IOException { - List<FileStatus> result = new ArrayList<FileStatus>(); Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); @@ -237,9 +243,7 @@ public abstract class FileInputFormat<K, // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); - - List<IOException> errors = new ArrayList<IOException>(); - + // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); @@ -250,6 +254,37 @@ public abstract class FileInputFormat<K, } PathFilter inputFilter = new MultiPathFilter(filters); + List<FileStatus> result = null; + + int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, + DEFAULT_LIST_STATUS_NUM_THREADS); + Stopwatch sw = new Stopwatch().start(); + if (numThreads == 1) { + result = singleThreadedListStatus(job, dirs, inputFilter, recursive); + } else { + Iterable<FileStatus> locatedFiles = null; + try { + LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( + job.getConfiguration(), dirs, recursive, inputFilter, true); + locatedFiles = locatedFileStatusFetcher.getFileStatuses(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting file statuses"); + } + result = Lists.newArrayList(locatedFiles); + } + + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, + PathFilter inputFilter, boolean recursive) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + List<IOException> errors = new ArrayList<IOException>(); for (int i=0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); @@ -284,7 +319,6 @@ public abstract class FileInputFormat<K, if (!errors.isEmpty()) { throw new InvalidInputException(errors); } - LOG.info("Total input paths to process : " + result.size()); return result; } @@ -332,6 +366,7 @@ public abstract class FileInputFormat<K, * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException { + Stopwatch sw = new Stopwatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); @@ -376,7 +411,11 @@ public abstract class FileInputFormat<K, } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); - LOG.debug("Total # of splits: " + splits.size()); + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Total # of splits generated by getSplits: " + splits.size() + + ", TimeTaken: " + sw.elapsedMillis()); + } return splits; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Mar 20 02:46:23 2014 @@ -430,6 +430,16 @@ take priority over this setting.</description> </property> +<property> + <name>mapreduce.input.fileinputformat.list-status.num-threads</name> + <value>1</value> + <description>The number of threads to use to list and fetch block locations + for the specified input paths. Note: multiple threads should not be used + if a custom non thread-safe path filter is used. + </description> +</property> + + <property> <name>mapreduce.client.submit.file.replication</name> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar 20 02:46:23 2014 @@ -19,7 +19,12 @@ package org.apache.hadoop.mapred; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; -public class TestFileInputFormat { +import com.google.common.collect.Lists; +@RunWith(value = Parameterized.class) +public class TestFileInputFormat { + + private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class); + + private static String testTmpDir = System.getProperty("test.build.data", "/tmp"); + private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF"); + + private static FileSystem localFs; + + private int numThreads; + + public TestFileInputFormat(int numThreads) { + this.numThreads = numThreads; + LOG.info("Running with numThreads: " + numThreads); + } + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 5 }}; + return Arrays.asList(data); + } + + @Before + public void setup() throws IOException { + LOG.info("Using Test Dir: " + TEST_ROOT_DIR); + localFs = FileSystem.getLocal(new Configuration()); + localFs.delete(TEST_ROOT_DIR, true); + localFs.mkdirs(TEST_ROOT_DIR); + } + + @After + public void cleanup() throws IOException { + localFs.delete(TEST_ROOT_DIR, true); + } + @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration(); conf.setBoolean("fs.test.impl.disable.cache", false); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1/a2"); MockFileSystem mockFs = @@ -51,6 +99,82 @@ public class TestFileInputFormat { Assert.assertEquals("Input splits are not correct", 2, splits.length); Assert.assertEquals("listLocatedStatuss calls", 1, mockFs.numListLocatedStatusCalls); + FileSystem.closeAll(); + } + + @Test + public void testListStatusSimple() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestSimple(conf, localFs); + + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusNestedRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestNestedRecursive(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusNestedNonRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestNestedNonRecursive(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusErrorOnNonExistantDir() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestErrorOnNonExistantDir(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + try { + fif.listStatus(jobConf); + Assert.fail("Expecting an IOException for a missing Input path"); + } catch (IOException e) { + Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2"); + expectedExceptionPath = localFs.makeQualified(expectedExceptionPath); + Assert.assertTrue(e instanceof InvalidInputException); + Assert.assertEquals( + "Input path does not exist: " + expectedExceptionPath.toString(), + e.getMessage()); + } } private Configuration getConfiguration() { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Thu Mar 20 02:46:23 2014 @@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib. import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFile import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +@RunWith(value = Parameterized.class) public class TestFileInputFormat { + + private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class); + + private static String testTmpDir = System.getProperty("test.build.data", "/tmp"); + private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF"); + + private static FileSystem localFs; + + private int numThreads; + + public TestFileInputFormat(int numThreads) { + this.numThreads = numThreads; + LOG.info("Running with numThreads: " + numThreads); + } + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 5 }}; + return Arrays.asList(data); + } + + @Before + public void setup() throws IOException { + LOG.info("Using Test Dir: " + TEST_ROOT_DIR); + localFs = FileSystem.getLocal(new Configuration()); + localFs.delete(TEST_ROOT_DIR, true); + localFs.mkdirs(TEST_ROOT_DIR); + } + + @After + public void cleanup() throws IOException { + localFs.delete(TEST_ROOT_DIR, true); + } @Test public void testNumInputFilesRecursively() throws Exception { Configuration conf = getConfiguration(); conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); Job job = Job.getInstance(conf); FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); Assert.assertEquals("Input splits are not correct", 3, splits.size()); - Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) - .getPath().toString()); - Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) - .getPath().toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() - .toString()); - + verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", + "test:/a1/file1"), splits); + // Using the deprecated configuration conf = getConfiguration(); conf.set("mapred.input.dir.recursive", "true"); job = Job.getInstance(conf); splits = fileInputFormat.getSplits(job); - Assert.assertEquals("Input splits are not correct", 3, splits.size()); - Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) - .getPath().toString()); - Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) - .getPath().toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() - .toString()); + verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", + "test:/a1/file1"), splits); } @Test public void testNumInputFilesWithoutRecursively() throws Exception { Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); Job job = Job.getInstance(conf); FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); Assert.assertEquals("Input splits are not correct", 2, splits.size()); - Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath() - .toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath() - .toString()); + verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits); } @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); conf.setBoolean("fs.test.impl.disable.cache", false); conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); MockFileSystem mockFs = @@ -95,8 +137,226 @@ public class TestFileInputFormat { Assert.assertEquals("Input splits are not correct", 2, splits.size()); Assert.assertEquals("listLocatedStatuss calls", 1, mockFs.numListLocatedStatusCalls); + FileSystem.closeAll(); + } + + @Test + public void testListStatusSimple() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestSimple(conf, localFs); + + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + @Test + public void testListStatusNestedRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + + @Test + public void testListStatusNestedNonRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + @Test + public void testListStatusErrorOnNonExistantDir() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + configureTestErrorOnNonExistantDir(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + try { + fif.listStatus(job); + Assert.fail("Expecting an IOException for a missing Input path"); + } catch (IOException e) { + Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2"); + expectedExceptionPath = localFs.makeQualified(expectedExceptionPath); + Assert.assertTrue(e instanceof InvalidInputException); + Assert.assertEquals( + "Input path does not exist: " + expectedExceptionPath.toString(), + e.getMessage()); + } + } + + public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs) + throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + Path base2 = new Path(TEST_ROOT_DIR, "input2"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); + localFs.mkdirs(base1); + localFs.mkdirs(base2); + + Path in1File1 = new Path(base1, "file1"); + Path in1File2 = new Path(base1, "file2"); + localFs.createNewFile(in1File1); + localFs.createNewFile(in1File2); + + Path in2File1 = new Path(base2, "file1"); + Path in2File2 = new Path(base2, "file2"); + localFs.createNewFile(in2File1); + localFs.createNewFile(in2File2); + List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1, + in2File2); + return expectedPaths; + } + + public static List<Path> configureTestNestedRecursive(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1).toString()); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + true); + localFs.mkdirs(base1); + + Path inDir1 = new Path(base1, "dir1"); + Path inDir2 = new Path(base1, "dir2"); + Path inFile1 = new Path(base1, "file1"); + + Path dir1File1 = new Path(inDir1, "file1"); + Path dir1File2 = new Path(inDir1, "file2"); + + Path dir2File1 = new Path(inDir2, "file1"); + Path dir2File2 = new Path(inDir2, "file2"); + + localFs.mkdirs(inDir1); + localFs.mkdirs(inDir2); + + localFs.createNewFile(inFile1); + localFs.createNewFile(dir1File1); + localFs.createNewFile(dir1File2); + localFs.createNewFile(dir2File1); + localFs.createNewFile(dir2File2); + + List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1, + dir1File2, dir2File1, dir2File2); + return expectedPaths; + } + + public static List<Path> configureTestNestedNonRecursive(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1).toString()); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + false); + localFs.mkdirs(base1); + + Path inDir1 = new Path(base1, "dir1"); + Path inDir2 = new Path(base1, "dir2"); + Path inFile1 = new Path(base1, "file1"); + + Path dir1File1 = new Path(inDir1, "file1"); + Path dir1File2 = new Path(inDir1, "file2"); + + Path dir2File1 = new Path(inDir2, "file1"); + Path dir2File2 = new Path(inDir2, "file2"); + + localFs.mkdirs(inDir1); + localFs.mkdirs(inDir2); + + localFs.createNewFile(inFile1); + localFs.createNewFile(dir1File1); + localFs.createNewFile(dir1File2); + localFs.createNewFile(dir2File1); + localFs.createNewFile(dir2File2); + + List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2); + return expectedPaths; } + public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + Path base2 = new Path(TEST_ROOT_DIR, "input2"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + true); + localFs.mkdirs(base1); + + Path inFile1 = new Path(base1, "file1"); + Path inFile2 = new Path(base1, "file2"); + + localFs.createNewFile(inFile1); + localFs.createNewFile(inFile2); + + List<Path> expectedPaths = Lists.newArrayList(); + return expectedPaths; + } + + public static void verifyFileStatuses(List<Path> expectedPaths, + List<FileStatus> fetchedStatuses, final FileSystem localFs) { + Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size()); + + Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths, + new Function<Path, Path>() { + @Override + public Path apply(Path input) { + return localFs.makeQualified(input); + } + }); + + Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths); + for (FileStatus fileStatus : fetchedStatuses) { + if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) { + Assert.fail("Found extra fetched status: " + fileStatus.getPath()); + } + } + Assert.assertEquals( + "Not all expectedPaths matched: " + expectedPathSet.toString(), 0, + expectedPathSet.size()); + } + + + private void verifySplits(List<String> expected, List<InputSplit> splits) { + Iterable<String> pathsFromSplits = Iterables.transform(splits, + new Function<InputSplit, String>() { + @Override + public String apply(@Nullable InputSplit input) { + return ((FileSplit) input).getPath().toString(); + } + }); + + Set<String> expectedSet = Sets.newHashSet(expected); + for (String splitPathString : pathsFromSplits) { + if (!expectedSet.remove(splitPathString)) { + Assert.fail("Found extra split: " + splitPathString); + } + } + Assert.assertEquals( + "Not all expectedPaths matched: " + expectedSet.toString(), 0, + expectedSet.size()); + } + private Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set("fs.test.impl.disable.cache", "true");