Author: jlowe
Date: Fri Aug 16 16:09:17 2013
New Revision: 1514767
URL: http://svn.apache.org/r1514767
Log:
svn merge -c 1507385 FIXES: MAPREDUCE-1981. Improve getSplits performance by
using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
- copied unchanged from r1507385,
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1514767&r1=1514766&r2=1514767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
Fri Aug 16 16:09:17 2013
@@ -16,6 +16,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of
indexes for better cache performance. (Sandy Ryza)
+ MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
+ (Hairong Kuang and Jason Lowe via jlowe)
+
BUG FIXES
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1514767&r1=1514766&r2=1514767&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
Fri Aug 16 16:09:17 2013
@@ -36,8 +36,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -169,13 +171,17 @@ public abstract class FileInputFormat<K,
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
- for(FileStatus stat: fs.listStatus(path, inputFilter)) {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
}
/** List input directories.
@@ -221,14 +227,19 @@ public abstract class FileInputFormat<K,
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
- for(FileStatus stat: fs.listStatus(globStat.getPath(),
- inputFilter)) {
- if (recursive && stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter =
+ fs.listLocatedStatus(globStat.getPath());
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (recursive && stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(),
+ inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
} else {
result.add(globStat);
}
@@ -254,7 +265,6 @@ public abstract class FileInputFormat<K,
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
- @SuppressWarnings("deprecation")
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job);
@@ -278,31 +288,38 @@ public abstract class FileInputFormat<K,
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations,
- length-bytesRemaining, splitSize, clusterMap);
- splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
- bytesRemaining -= splitSize;
+ if (length != 0) {
+ FileSystem fs = path.getFileSystem(job);
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ blkLocations = fs.getFileBlockLocations(file, 0, length);
+ }
+ if (isSplitable(fs, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+ String[] splitHosts = getSplitHosts(blkLocations,
+ length-bytesRemaining, splitSize, clusterMap);
+ splits.add(makeSplit(path, length-bytesRemaining, splitSize,
+ splitHosts));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ String[] splitHosts = getSplitHosts(blkLocations, length
+ - bytesRemaining, bytesRemaining, clusterMap);
+ splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
+ splitHosts));
+ }
+ } else {
+ String[] splitHosts =
getSplitHosts(blkLocations,0,length,clusterMap);
+ splits.add(makeSplit(path, 0, length, splitHosts));
}
-
- if (bytesRemaining != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, length
- - bytesRemaining, bytesRemaining, clusterMap);
- splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
- splitHosts));
- }
- } else if (length != 0) {
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(makeSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1514767&r1=1514766&r2=1514767&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
Fri Aug 16 16:09:17 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.HashSet;
@@ -38,7 +37,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -215,46 +214,33 @@ public abstract class CombineFileInputFo
}
// all the files in input set
- Path[] paths = FileUtil.stat2Paths(
- listStatus(job).toArray(new FileStatus[0]));
+ List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
- if (paths.length == 0) {
+ if (stats.size() == 0) {
return splits;
}
- // Convert them to Paths first. This is a costly operation and
- // we should do it first, otherwise we will incur doing it multiple
- // times, one time each for each pool in the next loop.
- List<Path> newpaths = new LinkedList<Path>();
- for (int i = 0; i < paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(conf);
- Path p = fs.makeQualified(paths[i]);
- newpaths.add(p);
- }
-
// In one single iteration, process all the paths in a single pool.
// Processing one pool at a time ensures that a split contains paths
// from a single pool only.
for (MultiPathFilter onepool : pools) {
- ArrayList<Path> myPaths = new ArrayList<Path>();
+ ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
- for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
- Path p = iter.next();
- if (onepool.accept(p)) {
+ for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
+ FileStatus p = iter.next();
+ if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
// create splits for all files in this pool.
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
+ getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
- getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
+ getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
rackToNodes.clear();
@@ -264,7 +250,7 @@ public abstract class CombineFileInputFo
/**
* Return all the splits in the specified set of paths
*/
- private void getMoreSplits(JobContext job, Path[] paths,
+ private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
@@ -285,15 +271,16 @@ public abstract class CombineFileInputFo
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
- files = new OneFileInfo[paths.length];
- if (paths.length == 0) {
+ files = new OneFileInfo[stats.size()];
+ if (stats.size() == 0) {
return;
}
// populate all the blocks for all files
long totLength = 0;
- for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+ int i = 0;
+ for (FileStatus stat : stats) {
+ files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
@@ -569,7 +556,7 @@ public abstract class CombineFileInputFo
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
- OneFileInfo(Path path, Configuration conf,
+ OneFileInfo(FileStatus stat, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
@@ -580,10 +567,13 @@ public abstract class CombineFileInputFo
this.fileSize = 0;
// get block locations from file system
- FileSystem fs = path.getFileSystem(conf);
- FileStatus stat = fs.getFileStatus(path);
- BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
- stat.getLen());
+ BlockLocation[] locations;
+ if (stat instanceof LocatedFileStatus) {
+ locations = ((LocatedFileStatus) stat).getBlockLocations();
+ } else {
+ FileSystem fs = stat.getPath().getFileSystem(conf);
+ locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
+ }
// create a list of all block and their locations
if (locations == null) {
blocks = new OneBlockInfo[0];
@@ -598,8 +588,8 @@ public abstract class CombineFileInputFo
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
- blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
- .getHosts(), locations[0].getTopologyPaths());
+ blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
+ locations[0].getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
@@ -625,9 +615,9 @@ public abstract class CombineFileInputFo
myLength = Math.min(maxSize, left);
}
}
- OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
- myLength, locations[i].getHosts(), locations[i]
- .getTopologyPaths());
+ OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
+ myOffset, myLength, locations[i].getHosts(),
+ locations[i].getTopologyPaths());
left -= myLength;
myOffset += myLength;
@@ -739,6 +729,9 @@ public abstract class CombineFileInputFo
protected BlockLocation[] getFileBlockLocations(
FileSystem fs, FileStatus stat) throws IOException {
+ if (stat instanceof LocatedFileStatus) {
+ return ((LocatedFileStatus) stat).getBlockLocations();
+ }
return fs.getFileBlockLocations(stat, 0, stat.getLen());
}
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1514767&r1=1514766&r2=1514767&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
Fri Aug 16 16:09:17 2013
@@ -29,9 +29,11 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -259,14 +261,19 @@ public abstract class FileInputFormat<K,
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
- for(FileStatus stat: fs.listStatus(globStat.getPath(),
- inputFilter)) {
- if (recursive && stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter =
+ fs.listLocatedStatus(globStat.getPath());
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (recursive && stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(),
+ inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
} else {
result.add(globStat);
}
@@ -296,13 +303,17 @@ public abstract class FileInputFormat<K,
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
- for(FileStatus stat: fs.listStatus(path, inputFilter)) {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
}
@@ -331,8 +342,13 @@ public abstract class FileInputFormat<K,
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ blkLocations = fs.getFileBlockLocations(file, 0, length);
+ }
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1514767&r1=1514766&r2=1514767&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
Fri Aug 16 16:09:17 2013
@@ -24,11 +24,14 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
@@ -77,6 +80,23 @@ public class TestFileInputFormat {
.toString());
}
+ @Test
+ public void testListLocatedStatus() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setBoolean("fs.test.impl.disable.cache", false);
+ conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
+ MockFileSystem mockFs =
+ (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ Assert.assertEquals("listLocatedStatus already called",
+ 0, mockFs.numListLocatedStatusCalls);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+ List<InputSplit> splits = fileInputFormat.getSplits(job);
+ Assert.assertEquals("Input splits are not correct", 2, splits.size());
+ Assert.assertEquals("listLocatedStatuss calls",
+ 1, mockFs.numListLocatedStatusCalls);
+ }
+
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");
@@ -86,13 +106,14 @@ public class TestFileInputFormat {
}
static class MockFileSystem extends RawLocalFileSystem {
+ int numListLocatedStatusCalls = 0;
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
if (f.toString().equals("test:/a1")) {
return new FileStatus[] {
- new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+ new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1"))
};
} else if (f.toString().equals("test:/a1/a2")) {
return new FileStatus[] {
@@ -116,5 +137,20 @@ public class TestFileInputFormat {
throws FileNotFoundException, IOException {
return this.listStatus(f);
}
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+ throws IOException {
+ return new BlockLocation[] {
+ new BlockLocation(new String[] { "localhost:50010" },
+ new String[] { "localhost" }, 0, len) };
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+ PathFilter filter) throws FileNotFoundException, IOException {
+ ++numListLocatedStatusCalls;
+ return super.listLocatedStatus(f, filter);
+ }
}
}