MAPREDUCE-7086. Add config to allow FileInputFormat to ignore directories when recursive=false. Contributed by Sergey Shelukhin
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c6ec71 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c6ec71 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c6ec71 Branch: refs/heads/HDDS-4 Commit: 68c6ec719da8e79ada31c8f3a82124f90b9a71fd Parents: 24eeea8 Author: Jason Lowe <jl...@apache.org> Authored: Tue May 1 16:19:53 2018 -0500 Committer: Jason Lowe <jl...@apache.org> Committed: Tue May 1 16:19:53 2018 -0500 ---------------------------------------------------------------------- .../apache/hadoop/mapred/FileInputFormat.java | 25 ++++++++++++++------ .../mapreduce/lib/input/FileInputFormat.java | 8 +++++++ .../hadoop/mapred/TestFileInputFormat.java | 17 ++++++++++++- .../lib/input/TestFileInputFormat.java | 12 ++++++++++ 4 files changed, 54 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index b0ec979..fe43991 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -78,10 +78,13 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { public static final String NUM_INPUT_FILES = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; - + public static final String INPUT_DIR_RECURSIVE = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; + public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS; + private static final double SPLIT_SLOP = 1.1; // 10% slop @@ -319,16 +322,24 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { StopWatch sw = new StopWatch().start(); - FileStatus[] files = listStatus(job); - + FileStatus[] stats = listStatus(job); + // Save the number of input files for metrics/loadgen - job.setLong(NUM_INPUT_FILES, files.length); + job.setLong(NUM_INPUT_FILES, stats.length); long totalSize = 0; // compute total size - for (FileStatus file: files) { // check we have valid files + boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false) + && job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); + + List<FileStatus> files = new ArrayList<>(stats.length); + for (FileStatus file: stats) { // check we have valid files if (file.isDirectory()) { - throw new IOException("Not a file: "+ file.getPath()); + if (!ignoreDirs) { + throw new IOException("Not a file: "+ file.getPath()); + } + } else { + files.add(file); + totalSize += file.getLen(); } - totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 9868e8e..e2d8e6f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -76,6 +76,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { "mapreduce.input.fileinputformat.numinputfiles"; public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive"; + public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = + "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs"; public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads"; public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; @@ -392,7 +394,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); + + boolean ignoreDirs = !getInputDirRecursive(job) + && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); for (FileStatus file: files) { + if (ignoreDirs && file.isDirectory()) { + continue; + } Path path = file.getPath(); long length = file.getLen(); if (length != 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index d322011..879cd3d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -102,7 +102,22 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } - + + @Test + public void testIgnoreDirs() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1"); + MockFileSystem mockFs = (MockFileSystem) new Path("test:///").getFileSystem(conf); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + InputSplit[] splits = fileInputFormat.getSplits(job, 1); + Assert.assertEquals("Input splits are not correct", 1, splits.length); + FileSystem.closeAll(); + } + @Test public void testSplitLocationInfo() throws Exception { Configuration conf = getConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 4c847fa..3897a9b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -124,6 +124,18 @@ public class TestFileInputFormat { } @Test + public void testNumInputFilesIgnoreDirs() throws Exception { + Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); + List<InputSplit> splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 1, splits.size()); + verifySplits(Lists.newArrayList("test:/a1/file1"), splits); + } + + @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration(); conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org