Author: vinodkv
Date: Thu Mar 20 02:46:23 2014
New Revision: 1579515

URL: http://svn.apache.org/r1579515
Log:
MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block 
location calls in parallel. Contributed by Siddharth Seth.

Added:
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Mar 20 
02:46:23 2014
@@ -199,6 +199,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to
     handle cross platform application submissions. (Jian He via vinodkv)
 
+    MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block
+    location calls in parallel. (Siddharth Seth via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
 Thu Mar 20 02:46:23 2014
@@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+
 /** 
  * A base class for file-based {@link InputFormat}.
  * 
@@ -203,10 +206,7 @@ public abstract class FileInputFormat<K,
     
     // Whether we need to recursive look into the directory structure
     boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
-    
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    List<IOException> errors = new ArrayList<IOException>();
-    
+
     // creates a MultiPathFilter with the hiddenFileFilter and the
     // user provided one (if any).
     List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -217,6 +217,41 @@ public abstract class FileInputFormat<K,
     }
     PathFilter inputFilter = new MultiPathFilter(filters);
 
+    FileStatus[] result;
+    int numThreads = job
+        .getInt(
+            
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
+            
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
+    
+    Stopwatch sw = new Stopwatch().start();
+    if (numThreads == 1) {
+      List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, 
inputFilter, recursive); 
+      result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
+    } else {
+      Iterable<FileStatus> locatedFiles = null;
+      try {
+        
+        LocatedFileStatusFetcher locatedFileStatusFetcher = new 
LocatedFileStatusFetcher(
+            job, dirs, recursive, inputFilter, false);
+        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while getting file statuses");
+      }
+      result = Iterables.toArray(locatedFiles, FileStatus.class);
+    }
+
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+    }
+    LOG.info("Total input paths to process : " + result.length);
+    return result;
+  }
+  
+  private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
+      PathFilter inputFilter, boolean recursive) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
     for (Path p: dirs) {
       FileSystem fs = p.getFileSystem(job); 
       FileStatus[] matches = fs.globStatus(p, inputFilter);
@@ -246,12 +281,10 @@ public abstract class FileInputFormat<K,
         }
       }
     }
-
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
-    LOG.info("Total input paths to process : " + result.size()); 
-    return result.toArray(new FileStatus[result.size()]);
+    return result;
   }
 
   /**
@@ -267,6 +300,7 @@ public abstract class FileInputFormat<K,
    * they're too big.*/ 
   public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
+    Stopwatch sw = new Stopwatch().start();
     FileStatus[] files = listStatus(job);
     
     // Save the number of input files for metrics/loadgen
@@ -325,7 +359,11 @@ public abstract class FileInputFormat<K,
         splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
-    LOG.debug("Total # of splits: " + splits.size());
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+          + ", TimeTaken: " + sw.elapsedMillis());
+    }
     return splits.toArray(new FileSplit[splits.size()]);
   }
 

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java?rev=1579515&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
 Thu Mar 20 02:46:23 2014
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Utility class to fetch block locations for specified Input paths using a
+ * configured number of threads.
+ */
+@Private
+public class LocatedFileStatusFetcher {
+
+  private final Path[] inputDirs;
+  private final PathFilter inputFilter;
+  private final Configuration conf;
+  private final boolean recursive;
+  private final boolean newApi;
+  
+  private final ExecutorService rawExec;
+  private final ListeningExecutorService exec;
+  private final BlockingQueue<List<FileStatus>> resultQueue;
+  private final List<IOException> invalidInputErrors = new 
LinkedList<IOException>();
+
+  private final ProcessInitialInputPathCallback 
processInitialInputPathCallback = 
+      new ProcessInitialInputPathCallback();
+  private final ProcessInputDirCallback processInputDirCallback = 
+      new ProcessInputDirCallback();
+
+  private final AtomicInteger runningTasks = new AtomicInteger(0);
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition condition = lock.newCondition();
+
+  private volatile Throwable unknownError;
+
+  /**
+   * @param conf configuration for the job
+   * @param dirs the initial list of paths
+   * @param recursive whether to traverse the patchs recursively
+   * @param inputFilter inputFilter to apply to the resulting paths
+   * @param newApi whether using the mapred or mapreduce API
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
+      boolean recursive, PathFilter inputFilter, boolean newApi) throws 
InterruptedException,
+      IOException {
+    int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
+        FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
+    rawExec = Executors.newFixedThreadPool(
+        numThreads,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("GetFileInfo #%d").build());
+    exec = MoreExecutors.listeningDecorator(rawExec);
+    resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
+    this.conf = conf;
+    this.inputDirs = dirs;
+    this.recursive = recursive;
+    this.inputFilter = inputFilter;
+    this.newApi = newApi;
+  }
+
+  /**
+   * Start executing and return FileStatuses based on the parameters specified
+   * @return fetched file statuses
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public Iterable<FileStatus> getFileStatuses() throws InterruptedException,
+      IOException {
+    // Increment to make sure a race between the first thread completing and 
the
+    // rest being scheduled does not lead to a termination.
+    runningTasks.incrementAndGet();
+    for (Path p : inputDirs) {
+      runningTasks.incrementAndGet();
+      ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
+          .submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
+      Futures.addCallback(future, processInitialInputPathCallback);
+    }
+
+    runningTasks.decrementAndGet();
+
+    lock.lock();
+    try {
+      while (runningTasks.get() != 0 && unknownError == null) {
+        condition.await();
+      }
+    } finally {
+      lock.unlock();
+    }
+    this.exec.shutdownNow();
+    if (this.unknownError != null) {
+      if (this.unknownError instanceof Error) {
+        throw (Error) this.unknownError;
+      } else if (this.unknownError instanceof RuntimeException) {
+        throw (RuntimeException) this.unknownError;
+      } else if (this.unknownError instanceof IOException) {
+        throw (IOException) this.unknownError;
+      } else if (this.unknownError instanceof InterruptedException) {
+        throw (InterruptedException) this.unknownError;
+      } else {
+        throw new IOException(this.unknownError);
+      }
+    }
+    if (this.invalidInputErrors.size() != 0) {
+      if (this.newApi) {
+        throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException(
+            invalidInputErrors);
+      } else {
+        throw new InvalidInputException(invalidInputErrors);
+      }
+    }
+    return Iterables.concat(resultQueue);
+  }
+
+  /**
+   * Collect misconfigured Input errors. Errors while actually reading file 
info
+   * are reported immediately
+   */
+  private void registerInvalidInputError(List<IOException> errors) {
+    synchronized (this) {
+      this.invalidInputErrors.addAll(errors);
+    }
+  }
+
+  /**
+   * Register fatal errors - example an IOException while accessing a file or a
+   * full exection queue
+   */
+  private void registerError(Throwable t) {
+    lock.lock();
+    try {
+      if (unknownError != null) {
+        unknownError = t;
+        condition.signal();
+      }
+
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void decrementRunningAndCheckCompletion() {
+    lock.lock();
+    try {
+      if (runningTasks.decrementAndGet() == 0) {
+        condition.signal();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  /**
+   * Retrieves block locations for the given @link {@link FileStatus}, and adds
+   * additional paths to the process queue if required.
+   */
+  private static class ProcessInputDirCallable implements
+      Callable<ProcessInputDirCallable.Result> {
+
+    private final FileSystem fs;
+    private final FileStatus fileStatus;
+    private final boolean recursive;
+    private final PathFilter inputFilter;
+
+    ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
+        boolean recursive, PathFilter inputFilter) {
+      this.fs = fs;
+      this.fileStatus = fileStatus;
+      this.recursive = recursive;
+      this.inputFilter = inputFilter;
+    }
+
+    @Override
+    public Result call() throws Exception {
+      Result result = new Result();
+      result.fs = fs;
+
+      if (fileStatus.isDirectory()) {
+        RemoteIterator<LocatedFileStatus> iter = fs
+            .listLocatedStatus(fileStatus.getPath());
+        while (iter.hasNext()) {
+          LocatedFileStatus stat = iter.next();
+          if (inputFilter.accept(stat.getPath())) {
+            if (recursive && stat.isDirectory()) {
+              result.dirsNeedingRecursiveCalls.add(stat);
+            } else {
+              result.locatedFileStatuses.add(stat);
+            }
+          }
+        }
+      } else {
+        result.locatedFileStatuses.add(fileStatus);
+      }
+      return result;
+    }
+
+    private static class Result {
+      private List<FileStatus> locatedFileStatuses = new 
LinkedList<FileStatus>();
+      private List<FileStatus> dirsNeedingRecursiveCalls = new 
LinkedList<FileStatus>();
+      private FileSystem fs;
+    }
+  }
+
+  /**
+   * The callback handler to handle results generated by
+   * {@link ProcessInputDirCallable}. This populates the final result set.
+   * 
+   */
+  private class ProcessInputDirCallback implements
+      FutureCallback<ProcessInputDirCallable.Result> {
+
+    @Override
+    public void onSuccess(ProcessInputDirCallable.Result result) {
+      try {
+        if (result.locatedFileStatuses.size() != 0) {
+          resultQueue.add(result.locatedFileStatuses);
+        }
+        if (result.dirsNeedingRecursiveCalls.size() != 0) {
+          for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) {
+            runningTasks.incrementAndGet();
+            ListenableFuture<ProcessInputDirCallable.Result> future = exec
+                .submit(new ProcessInputDirCallable(result.fs, fileStatus,
+                    recursive, inputFilter));
+            Futures.addCallback(future, processInputDirCallback);
+          }
+        }
+        decrementRunningAndCheckCompletion();
+      } catch (Throwable t) { // Error within the callback itself.
+        registerError(t);
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // Any generated exceptions. Leads to immediate termination.
+      registerError(t);
+    }
+  }
+
+  
+  /**
+   * Processes an initial Input Path pattern through the globber and PathFilter
+   * to generate a list of files which need further processing.
+   */
+  private static class ProcessInitialInputPathCallable implements
+      Callable<ProcessInitialInputPathCallable.Result> {
+
+    private final Path path;
+    private final Configuration conf;
+    private final PathFilter inputFilter;
+
+    public ProcessInitialInputPathCallable(Path path, Configuration conf,
+        PathFilter pathFilter) {
+      this.path = path;
+      this.conf = conf;
+      this.inputFilter = pathFilter;
+    }
+
+    @Override
+    public Result call() throws Exception {
+      Result result = new Result();
+      FileSystem fs = path.getFileSystem(conf);
+      result.fs = fs;
+      FileStatus[] matches = fs.globStatus(path, inputFilter);
+      if (matches == null) {
+        result.addError(new IOException("Input path does not exist: " + path));
+      } else if (matches.length == 0) {
+        result.addError(new IOException("Input Pattern " + path
+            + " matches 0 files"));
+      } else {
+        result.matchedFileStatuses = matches;
+      }
+      return result;
+    }
+
+    private static class Result {
+      private List<IOException> errors;
+      private FileStatus[] matchedFileStatuses;
+      private FileSystem fs;
+
+      void addError(IOException ioe) {
+        if (errors == null) {
+          errors = new LinkedList<IOException>();
+        }
+        errors.add(ioe);
+      }
+    }
+  }
+
+  /**
+   * The callback handler to handle results generated by
+   * {@link ProcessInitialInputPathCallable}
+   * 
+   */
+  private class ProcessInitialInputPathCallback implements
+      FutureCallback<ProcessInitialInputPathCallable.Result> {
+
+    @Override
+    public void onSuccess(ProcessInitialInputPathCallable.Result result) {
+      try {
+        if (result.errors != null) {
+          registerInvalidInputError(result.errors);
+        }
+        if (result.matchedFileStatuses != null) {
+          for (FileStatus matched : result.matchedFileStatuses) {
+            runningTasks.incrementAndGet();
+            ListenableFuture<ProcessInputDirCallable.Result> future = exec
+                .submit(new ProcessInputDirCallable(result.fs, matched,
+                    recursive, inputFilter));
+            Futures.addCallback(future, processInputDirCallback);
+          }
+        }
+        decrementRunningAndCheckCompletion();
+      } catch (Throwable t) { // Exception within the callback
+        registerError(t);
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // Any generated exceptions. Leads to immediate termination.
+      registerError(t);
+    }
+  }
+}
\ No newline at end of file

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
 Thu Mar 20 02:46:23 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
 /** 
  * A base class for file-based {@link InputFormat}s.
  * 
@@ -68,6 +72,9 @@ public abstract class FileInputFormat<K,
     "mapreduce.input.fileinputformat.numinputfiles";
   public static final String INPUT_DIR_RECURSIVE =
     "mapreduce.input.fileinputformat.input.dir.recursive";
+  public static final String LIST_STATUS_NUM_THREADS =
+      "mapreduce.input.fileinputformat.list-status.num-threads";
+  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
@@ -225,7 +232,6 @@ public abstract class FileInputFormat<K,
    */
   protected List<FileStatus> listStatus(JobContext job
                                         ) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
     Path[] dirs = getInputPaths(job);
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
@@ -237,9 +243,7 @@ public abstract class FileInputFormat<K,
 
     // Whether we need to recursive look into the directory structure
     boolean recursive = getInputDirRecursive(job);
-    
-    List<IOException> errors = new ArrayList<IOException>();
-    
+
     // creates a MultiPathFilter with the hiddenFileFilter and the
     // user provided one (if any).
     List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -250,6 +254,37 @@ public abstract class FileInputFormat<K,
     }
     PathFilter inputFilter = new MultiPathFilter(filters);
     
+    List<FileStatus> result = null;
+
+    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
+        DEFAULT_LIST_STATUS_NUM_THREADS);
+    Stopwatch sw = new Stopwatch().start();
+    if (numThreads == 1) {
+      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
+    } else {
+      Iterable<FileStatus> locatedFiles = null;
+      try {
+        LocatedFileStatusFetcher locatedFileStatusFetcher = new 
LocatedFileStatusFetcher(
+            job.getConfiguration(), dirs, recursive, inputFilter, true);
+        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while getting file statuses");
+      }
+      result = Lists.newArrayList(locatedFiles);
+    }
+    
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
+    }
+    LOG.info("Total input paths to process : " + result.size()); 
+    return result;
+  }
+
+  private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] 
dirs,
+      PathFilter inputFilter, boolean recursive) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
     for (int i=0; i < dirs.length; ++i) {
       Path p = dirs[i];
       FileSystem fs = p.getFileSystem(job.getConfiguration()); 
@@ -284,7 +319,6 @@ public abstract class FileInputFormat<K,
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
-    LOG.info("Total input paths to process : " + result.size()); 
     return result;
   }
   
@@ -332,6 +366,7 @@ public abstract class FileInputFormat<K,
    * @throws IOException
    */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
+    Stopwatch sw = new Stopwatch().start();
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
 
@@ -376,7 +411,11 @@ public abstract class FileInputFormat<K,
     }
     // Save the number of input files for metrics/loadgen
     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
-    LOG.debug("Total # of splits: " + splits.size());
+    sw.stop();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+          + ", TimeTaken: " + sw.elapsedMillis());
+    }
     return splits;
   }
 

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Thu Mar 20 02:46:23 2014
@@ -430,6 +430,16 @@
   take priority over this setting.</description>
 </property>
 
+<property>
+  <name>mapreduce.input.fileinputformat.list-status.num-threads</name>
+  <value>1</value>
+  <description>The number of threads to use to list and fetch block locations
+  for the specified input paths. Note: multiple threads should not be used
+  if a custom non thread-safe path filter is used.
+  </description>
+</property>
+
+
 
 <property>
   <name>mapreduce.client.submit.file.replication</name>

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
 Thu Mar 20 02:46:23 2014
@@ -19,7 +19,12 @@ package org.apache.hadoop.mapred;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
-public class TestFileInputFormat {
+import com.google.common.collect.Lists;
 
+@RunWith(value = Parameterized.class)
+public class TestFileInputFormat {
+  
+  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+  
+  private static String testTmpDir = System.getProperty("test.build.data", 
"/tmp");
+  private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+  
+  private static FileSystem localFs;
+  
+  private int numThreads;
+  
+  public TestFileInputFormat(int numThreads) {
+    this.numThreads = numThreads;
+    LOG.info("Running with numThreads: " + numThreads);
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 5 }};
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+    localFs = FileSystem.getLocal(new Configuration());
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+  
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(TEST_ROOT_DIR, true);
+  }
+  
   @Test
   public void testListLocatedStatus() throws Exception {
     Configuration conf = getConfiguration();
     conf.setBoolean("fs.test.impl.disable.cache", false);
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
         "test:///a1/a2");
     MockFileSystem mockFs =
@@ -51,6 +99,82 @@ public class TestFileInputFormat {
     Assert.assertEquals("Input splits are not correct", 2, splits.length);
     Assert.assertEquals("listLocatedStatuss calls",
         1, mockFs.numListLocatedStatusCalls);
+    FileSystem.closeAll();
+  }
+  
+  @Test
+  public void testListStatusSimple() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = 
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestSimple(conf, localFs);
+
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusNestedRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = 
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestNestedRecursive(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusNestedNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = 
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestNestedNonRecursive(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    FileStatus[] statuses = fif.listStatus(jobConf);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
+            localFs);
+  }
+
+  @Test
+  public void testListStatusErrorOnNonExistantDir() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
+        .configureTestErrorOnNonExistantDir(conf, localFs);
+    JobConf jobConf = new JobConf(conf);
+    TextInputFormat fif = new TextInputFormat();
+    fif.configure(jobConf);
+    try {
+      fif.listStatus(jobConf);
+      Assert.fail("Expecting an IOException for a missing Input path");
+    } catch (IOException e) {
+      Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+      expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+      Assert.assertTrue(e instanceof InvalidInputException);
+      Assert.assertEquals(
+          "Input path does not exist: " + expectedExceptionPath.toString(),
+          e.getMessage());
+    }
   }
 
   private Configuration getConfiguration() {

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1579515&r1=1579514&r2=1579515&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
 Thu Mar 20 02:46:23 2014
@@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib.
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFile
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
+@RunWith(value = Parameterized.class)
 public class TestFileInputFormat {
+  
+  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
+  
+  private static String testTmpDir = System.getProperty("test.build.data", 
"/tmp");
+  private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
+  
+  private static FileSystem localFs;
+  
+  private int numThreads;
+  
+  public TestFileInputFormat(int numThreads) {
+    this.numThreads = numThreads;
+    LOG.info("Running with numThreads: " + numThreads);
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { 1 }, { 5 }};
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
+    localFs = FileSystem.getLocal(new Configuration());
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+  
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(TEST_ROOT_DIR, true);
+  }
 
   @Test
   public void testNumInputFilesRecursively() throws Exception {
     Configuration conf = getConfiguration();
     conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     Job job = Job.getInstance(conf);
     FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
     List<InputSplit> splits = fileInputFormat.getSplits(job);
     Assert.assertEquals("Input splits are not correct", 3, splits.size());
-    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
-        .toString());
-    
+    verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+        "test:/a1/file1"), splits);
+
     // Using the deprecated configuration
     conf = getConfiguration();
     conf.set("mapred.input.dir.recursive", "true");
     job = Job.getInstance(conf);
     splits = fileInputFormat.getSplits(job);
-    Assert.assertEquals("Input splits are not correct", 3, splits.size());
-    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
-        .getPath().toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
-        .toString());
+    verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
+        "test:/a1/file1"), splits);
   }
 
   @Test
   public void testNumInputFilesWithoutRecursively() throws Exception {
     Configuration conf = getConfiguration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     Job job = Job.getInstance(conf);
     FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
     List<InputSplit> splits = fileInputFormat.getSplits(job);
     Assert.assertEquals("Input splits are not correct", 2, splits.size());
-    Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
-        .toString());
-    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
-        .toString());
+    verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
   }
 
   @Test
   public void testListLocatedStatus() throws Exception {
     Configuration conf = getConfiguration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
     conf.setBoolean("fs.test.impl.disable.cache", false);
     conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
     MockFileSystem mockFs =
@@ -95,8 +137,226 @@ public class TestFileInputFormat {
     Assert.assertEquals("Input splits are not correct", 2, splits.size());
     Assert.assertEquals("listLocatedStatuss calls",
         1, mockFs.numListLocatedStatusCalls);
+    FileSystem.closeAll();
+  }
+
+  @Test
+  public void testListStatusSimple() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestSimple(conf, localFs);
+    
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+  @Test
+  public void testListStatusNestedRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+
+  @Test
+  public void testListStatusNestedNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    List<FileStatus> statuses = fif.listStatus(job);
+
+    verifyFileStatuses(expectedPaths, statuses, localFs);
+  }
+
+  @Test
+  public void testListStatusErrorOnNonExistantDir() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+
+    configureTestErrorOnNonExistantDir(conf, localFs);
+    Job job  = Job.getInstance(conf);
+    FileInputFormat<?, ?> fif = new TextInputFormat();
+    try {
+      fif.listStatus(job);
+      Assert.fail("Expecting an IOException for a missing Input path");
+    } catch (IOException e) {
+      Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
+      expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
+      Assert.assertTrue(e instanceof InvalidInputException);
+      Assert.assertEquals(
+          "Input path does not exist: " + expectedExceptionPath.toString(),
+          e.getMessage());
+    }
+  }
+
+  public static List<Path> configureTestSimple(Configuration conf, FileSystem 
localFs)
+      throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    Path base2 = new Path(TEST_ROOT_DIR, "input2");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+    localFs.mkdirs(base1);
+    localFs.mkdirs(base2);
+
+    Path in1File1 = new Path(base1, "file1");
+    Path in1File2 = new Path(base1, "file2");
+    localFs.createNewFile(in1File1);
+    localFs.createNewFile(in1File2);
+
+    Path in2File1 = new Path(base2, "file1");
+    Path in2File2 = new Path(base2, "file2");
+    localFs.createNewFile(in2File1);
+    localFs.createNewFile(in2File2);
+    List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1,
+        in2File2);
+    return expectedPaths;
+  }
+
+  public static List<Path> configureTestNestedRecursive(Configuration conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1).toString());
+    conf.setBoolean(
+        
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        true);
+    localFs.mkdirs(base1);
+
+    Path inDir1 = new Path(base1, "dir1");
+    Path inDir2 = new Path(base1, "dir2");
+    Path inFile1 = new Path(base1, "file1");
+
+    Path dir1File1 = new Path(inDir1, "file1");
+    Path dir1File2 = new Path(inDir1, "file2");
+
+    Path dir2File1 = new Path(inDir2, "file1");
+    Path dir2File2 = new Path(inDir2, "file2");
+
+    localFs.mkdirs(inDir1);
+    localFs.mkdirs(inDir2);
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(dir1File1);
+    localFs.createNewFile(dir1File2);
+    localFs.createNewFile(dir2File1);
+    localFs.createNewFile(dir2File2);
+
+    List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1,
+        dir1File2, dir2File1, dir2File2);
+    return expectedPaths;
+  }
+
+  public static List<Path> configureTestNestedNonRecursive(Configuration conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1).toString());
+    conf.setBoolean(
+        
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        false);
+    localFs.mkdirs(base1);
+
+    Path inDir1 = new Path(base1, "dir1");
+    Path inDir2 = new Path(base1, "dir2");
+    Path inFile1 = new Path(base1, "file1");
+
+    Path dir1File1 = new Path(inDir1, "file1");
+    Path dir1File2 = new Path(inDir1, "file2");
+
+    Path dir2File1 = new Path(inDir2, "file1");
+    Path dir2File2 = new Path(inDir2, "file2");
+
+    localFs.mkdirs(inDir1);
+    localFs.mkdirs(inDir2);
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(dir1File1);
+    localFs.createNewFile(dir1File2);
+    localFs.createNewFile(dir2File1);
+    localFs.createNewFile(dir2File2);
+
+    List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2);
+    return expectedPaths;
   }
 
+  public static List<Path> configureTestErrorOnNonExistantDir(Configuration 
conf,
+      FileSystem localFs) throws IOException {
+    Path base1 = new Path(TEST_ROOT_DIR, "input1");
+    Path base2 = new Path(TEST_ROOT_DIR, "input2");
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
+    conf.setBoolean(
+        
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
+        true);
+    localFs.mkdirs(base1);
+
+    Path inFile1 = new Path(base1, "file1");
+    Path inFile2 = new Path(base1, "file2");
+
+    localFs.createNewFile(inFile1);
+    localFs.createNewFile(inFile2);
+
+    List<Path> expectedPaths = Lists.newArrayList();
+    return expectedPaths;
+  }
+
+  public static void verifyFileStatuses(List<Path> expectedPaths,
+      List<FileStatus> fetchedStatuses, final FileSystem localFs) {
+    Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
+
+    Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths,
+        new Function<Path, Path>() {
+          @Override
+          public Path apply(Path input) {
+            return localFs.makeQualified(input);
+          }
+        });
+
+    Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
+    for (FileStatus fileStatus : fetchedStatuses) {
+      if 
(!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) {
+        Assert.fail("Found extra fetched status: " + fileStatus.getPath());
+      }
+    }
+    Assert.assertEquals(
+        "Not all expectedPaths matched: " + expectedPathSet.toString(), 0,
+        expectedPathSet.size());
+  }
+
+
+  private void verifySplits(List<String> expected, List<InputSplit> splits) {
+    Iterable<String> pathsFromSplits = Iterables.transform(splits,
+        new Function<InputSplit, String>() {
+          @Override
+          public String apply(@Nullable InputSplit input) {
+            return ((FileSplit) input).getPath().toString();
+          }
+        });
+
+    Set<String> expectedSet = Sets.newHashSet(expected);
+    for (String splitPathString : pathsFromSplits) {
+      if (!expectedSet.remove(splitPathString)) {
+        Assert.fail("Found extra split: " + splitPathString);
+      }
+    }
+    Assert.assertEquals(
+        "Not all expectedPaths matched: " + expectedSet.toString(), 0,
+        expectedSet.size());
+  }
+  
   private Configuration getConfiguration() {
     Configuration conf = new Configuration();
     conf.set("fs.test.impl.disable.cache", "true");


Reply via email to