Re: Subdirectory question revisited
Here's how I solved the problem using a custom InputFormat... the key part is in listStatus(), where we traverse the directory tree. Since HDFS doesn't have links this code is probably safe, but if you have a filesystem with cycles you will get trapped. Ian import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; import java.util.ArrayList; import java.util.Arrays; import java.util.ArrayDeque; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InvalidInputException; import org.apache.hadoop.mapred.LineRecordReader; public class TrecWebInputFormat extends FileInputFormat { @Override public boolean isSplitable(FileSystem fs, Path filename) { return false; } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new TrecWebRecordReader(job, (FileSplit)split); } // The following are incomprehensibly private in FileInputFormat... private static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ String name = p.getName(); return !name.startsWith("_") && !name.startsWith("."); } }; /** * Proxy PathFilter that accepts a path only if all filters given in the * constructor do. Used by the listPaths() to apply the built-in * hiddenFileFilter together with a user provided one (if any). */ private static class MultiPathFilter implements PathFilter { private List filters; public MultiPathFilter(List filters) { this.filters = filters; } public boolean accept(Path path) { for (PathFilter filter : filters) { if (!filter.accept(path)) { return false; } } return true; } } @Override protected FileStatus[] listStatus(JobConf job) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } List result = new ArrayList(); List errors = new ArrayList(); ArrayDeque stats = new ArrayDeque(dirs.length); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List filters = new ArrayList(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); // Set up traversal from input paths, which may be globs for (Path p: dirs) { FileSystem fs = p.getFileSystem(job); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat: matches) { stats.add(globStat); } } } while (!stats.isEmpty()) { FileStatus stat = stats.pop(); if (stat.isDir()) { FileSystem fs = stat.getPath().getFileSystem(job); for (FileStatus sub: fs.listStatus(stat.getPath(), inputFilter)) { stats.push(sub); } } else { result.add(stat); } } if (!errors.isEmpty()) { throw new InvalidInputException(errors); } LOG.info("Total input paths to process : " + result.size()); return result.toArray(new FileStatus[result.size()]); } public static class TrecWebRecordReader implements RecordReader { private CompressionCodecFactory compressionCodecs = null; private long start; private long end; private long pos; private Path file; private LineRecordReader.LineRea
Re: Subdirectory question revisited
OK, thanks for the pointer. If I wind up rolling our own code to handle this I'll make sure to contribute it. DR Aaron Kimball wrote: There is no technical limit that prevents Hadoop from operating in this fashion; it's simply the case that the included InputFormat implementations do not do so. This behavior has been set in this fashion for a long time, so it's unlikely that it will change soon, as that might break existing applications. But you can write your own subclass of TextInputFormat or SequenceFileInputFormat that overrides the getSplits() method to recursively descend through directories and search for files. - Aaron On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote: As per a previous list question ( http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) it looks as though it's not possible for hadoop to traverse input directories recursively in order to discover input files.
Re: Subdirectory question revisited
Hey Aaron, I had a similar problem. I have log files arranged in the following fashion: /logs//.log I want to analyze a range of dates for all hosts. What I did was write into my driver class a subroutine that descends through the HDFS file system starting at /logs and builds a list of input files, then fed the list of files to the framework. Example code below. Brian FileSystem fs = FileSystem.get(conf); Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log. ([0-9]+-[0-9]+-[0-9]+)"); for (FileStatus status : fs.listStatus(base)) { Path pathname = status.getPath(); for (FileStatus logfile : fs.listStatus(pathname)) { Path logFilePath = logfile.getPath(); Matcher m = fileNamePattern.matcher(logFilePath.getName()); if (m.matches()) { String dateString = m.group(2); Date logDate = df.parse(dateString); if ((logDate.equals(startDate) || logDate.after(startDate)) && logDate.before(endDate)) { FileInputFormat.addInputPath(conf, logFilePath); } else { //System.out.println("Ignoring file: " + logFilePath.getName()); //System.out.println("Start Date: " + startDate + ", End Date: " + endDate + ", Log date: " + logDate); } } else { System.out.println("Ignoring file: " + logFilePath.getName()); } } } On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote: There is no technical limit that prevents Hadoop from operating in this fashion; it's simply the case that the included InputFormat implementations do not do so. This behavior has been set in this fashion for a long time, so it's unlikely that it will change soon, as that might break existing applications. But you can write your own subclass of TextInputFormat or SequenceFileInputFormat that overrides the getSplits() method to recursively descend through directories and search for files. - Aaron On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote: As per a previous list question ( http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) it looks as though it's not possible for hadoop to traverse input directories recursively in order to discover input files. Just wondering a) if there's any particular reason why this functionality doesn't exist, and b) if not, if there's any workaround/hack to make it possible. Like the OP, I was thinking it would be helpful to partition my input data by year, month, and day. I figured his would enable me to run jobs against specific date ranges of input data, and thereby speed up the execution of my jobs since they wouldn't have to process every single record. Any way to make this happen? (Or am I totally going about this the wrong way for what I'm trying to achieve?) TIA, DR
Re: Subdirectory question revisited
Hey Aaron, I had a similar problem. I have log files arranged in the following fashion: /logs//.log I want to analyze a range of dates for all hosts. What I did was write into my driver class a subroutine that descends through the HDFS file system starting at /logs and builds a list of input files, then fed the list of files to the framework. Example code below. Brian FileSystem fs = FileSystem.get(conf); Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log. ([0-9]+-[0-9]+-[0-9]+)"); for (FileStatus status : fs.listStatus(base)) { Path pathname = status.getPath(); for (FileStatus logfile : fs.listStatus(pathname)) { Path logFilePath = logfile.getPath(); Matcher m = fileNamePattern.matcher(logFilePath.getName()); if (m.matches()) { String dateString = m.group(2); Date logDate = df.parse(dateString); if ((logDate.equals(startDate) || logDate.after(startDate)) && logDate.before(endDate)) { FileInputFormat.addInputPath(conf, logFilePath); } else { //System.out.println("Ignoring file: " + logFilePath.getName()); //System.out.println("Start Date: " + startDate + ", End Date: " + endDate + ", Log date: " + logDate); } } else { System.out.println("Ignoring file: " + logFilePath.getName()); } } } On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote: There is no technical limit that prevents Hadoop from operating in this fashion; it's simply the case that the included InputFormat implementations do not do so. This behavior has been set in this fashion for a long time, so it's unlikely that it will change soon, as that might break existing applications. But you can write your own subclass of TextInputFormat or SequenceFileInputFormat that overrides the getSplits() method to recursively descend through directories and search for files. - Aaron On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote: As per a previous list question ( http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) it looks as though it's not possible for hadoop to traverse input directories recursively in order to discover input files. Just wondering a) if there's any particular reason why this functionality doesn't exist, and b) if not, if there's any workaround/hack to make it possible. Like the OP, I was thinking it would be helpful to partition my input data by year, month, and day. I figured his would enable me to run jobs against specific date ranges of input data, and thereby speed up the execution of my jobs since they wouldn't have to process every single record. Any way to make this happen? (Or am I totally going about this the wrong way for what I'm trying to achieve?) TIA, DR
Re: Subdirectory question revisited
Hey Aaron, I had a similar problem. I have log files arranged in the following fashion: /logs//.log I want to analyze a range of dates for all hosts. What I did was write into my driver class a subroutine that descends through the HDFS file system starting at /logs and builds a list of input files, then fed the list of files to the framework. Example code below. Brian FileSystem fs = FileSystem.get(conf); Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log. ([0-9]+-[0-9]+-[0-9]+)"); for (FileStatus status : fs.listStatus(base)) { Path pathname = status.getPath(); for (FileStatus logfile : fs.listStatus(pathname)) { Path logFilePath = logfile.getPath(); Matcher m = fileNamePattern.matcher(logFilePath.getName()); if (m.matches()) { String dateString = m.group(2); Date logDate = df.parse(dateString); if ((logDate.equals(startDate) || logDate.after(startDate)) && logDate.before(endDate)) { FileInputFormat.addInputPath(conf, logFilePath); } else { //System.out.println("Ignoring file: " + logFilePath.getName()); //System.out.println("Start Date: " + startDate + ", End Date: " + endDate + ", Log date: " + logDate); } } else { System.out.println("Ignoring file: " + logFilePath.getName()); } } } On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote: There is no technical limit that prevents Hadoop from operating in this fashion; it's simply the case that the included InputFormat implementations do not do so. This behavior has been set in this fashion for a long time, so it's unlikely that it will change soon, as that might break existing applications. But you can write your own subclass of TextInputFormat or SequenceFileInputFormat that overrides the getSplits() method to recursively descend through directories and search for files. - Aaron On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote: As per a previous list question ( http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) it looks as though it's not possible for hadoop to traverse input directories recursively in order to discover input files. Just wondering a) if there's any particular reason why this functionality doesn't exist, and b) if not, if there's any workaround/hack to make it possible. Like the OP, I was thinking it would be helpful to partition my input data by year, month, and day. I figured his would enable me to run jobs against specific date ranges of input data, and thereby speed up the execution of my jobs since they wouldn't have to process every single record. Any way to make this happen? (Or am I totally going about this the wrong way for what I'm trying to achieve?) TIA, DR
Re: Subdirectory question revisited
There is no technical limit that prevents Hadoop from operating in this fashion; it's simply the case that the included InputFormat implementations do not do so. This behavior has been set in this fashion for a long time, so it's unlikely that it will change soon, as that might break existing applications. But you can write your own subclass of TextInputFormat or SequenceFileInputFormat that overrides the getSplits() method to recursively descend through directories and search for files. - Aaron On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote: > As per a previous list question ( > http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) > it looks as though it's not possible for hadoop to traverse input > directories recursively in order to discover input files. > > Just wondering a) if there's any particular reason why this functionality > doesn't exist, and b) if not, if there's any workaround/hack to make it > possible. > > Like the OP, I was thinking it would be helpful to partition my input data > by year, month, and day. I figured his would enable me to run jobs against > specific date ranges of input data, and thereby speed up the execution of my > jobs since they wouldn't have to process every single record. > > Any way to make this happen? (Or am I totally going about this the wrong > way for what I'm trying to achieve?) > > TIA, > > DR >
Subdirectory question revisited
As per a previous list question (http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3ce75c02ef0804011433x144813e6x2450da7883de3...@mail.gmail.com%3e) it looks as though it's not possible for hadoop to traverse input directories recursively in order to discover input files. Just wondering a) if there's any particular reason why this functionality doesn't exist, and b) if not, if there's any workaround/hack to make it possible. Like the OP, I was thinking it would be helpful to partition my input data by year, month, and day. I figured his would enable me to run jobs against specific date ranges of input data, and thereby speed up the execution of my jobs since they wouldn't have to process every single record. Any way to make this happen? (Or am I totally going about this the wrong way for what I'm trying to achieve?) TIA, DR