Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its thing.
On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I tried this > http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files > > Test Job Details > Input 7 Files * 51MB each > > HDFS Counters of Job > Counter Map Reduce Total > FileSystemCounters FILE_BYTES_READ 92 23 115 > HDFS_BYTES_READ 360,235,092 0 360,235,092 > FILE_BYTES_WRITTEN 116,558 116,349 232,907 > HDFS_BYTES_WRITTEN 0 9 9 > > From Job Conf > pig.maxCombinedSplitSize 205847916 > pig.splitCombination true > set mapred.max.split.size 205847916 (With/Without) > > Map Task displays these logs from CombileSlpit > Total Split Length:360233853 > Total Split Paths Length:7 > > With 360MB of input data, and above conf there should have been two splits. > However there was only one split and all 7 files were read from single map > task > > > From my loader prepareRecodReader does not use the PigSplit > > @Override > public void prepareToRead(RecordReader arg0, PigSplit arg1) throws > IOException { > reader = (CombineFileRecordReader) (arg0); > } > > Any suggestions ? > > > On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > > > I am facing a issue of large number of small sized files (51MB each). A > > typical M/R job in my working environment would take ateast 2000 files, > > which is resulting in 2000 map tasks. Hence i thought of using > > CombineFileInputFormat to reduce the problem. I had written a custom > > implementation of CombineFileInputFormat along > > with CombineFileRecordReader. Below is skeleton of the code. > > > > > > public class CustomInputFormat extends > CombineFileInputFormat<IntWritable, > > RecordHolder> { > > @Override > > public RecordReader<IntWritable, RecordHolder> createRecordReader(final > > InputSplit split, > > final TaskAttemptContext taskContext) throws IOException { > > final CombineFileSplit fSplit = (CombineFileSplit) split; > > return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit, > > taskContext, > > (Class) CustomRecordReader.class); > > } > > > > public static class CustomRecordReader extends RecordReader<IntWritable, > > RecordHolder> { > > private IntWritable key = null; > > private RecordHolder value = null; > > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext > > context, Integer idx) > > throws IOException { > > FileSplit fileSplit = new FileSplit(split.getPath(idx), > > split.getOffset(idx), split.getLength(idx), > > split.getLocations()); > > Path path = fileSplit.getPath(); > > } > > > > @Override > > public void close() throws IOException { > > } > > > > @Override > > public IntWritable getCurrentKey() throws IOException, > > InterruptedException { > > return key; > > } > > > > @Override > > public RecordHolder getCurrentValue() throws IOException, > > InterruptedException { > > return value; > > } > > > > @Override > > public float getProgress() throws IOException, InterruptedException { > > return 0; > > } > > > > @Override > > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws > > IOException, InterruptedException { > > > > } > > > > @Override > > public boolean nextKeyValue() throws IOException, InterruptedException { > > boolean dataPresent = false; > > if (currentIterator.hasNext()) { > > Object nextRecord = currentIterator.next(); > > key = new IntWritable(recordPosition++); > > value = new RecordHolder(nextObject); > > dataPresent = true; > > } else { > > LOGGER.info("Reached end of File:" + dataPresent); > > } > > return dataPresent; > > } > > } > > } > > > > Within setLocation() of Loader i specified the following configurations > > job.getConfiguration().set("mapred.max.split.size", "205847916"); > > job.getConfiguration().set("mapreduce.job.max.split.locations", "5"); > > > > I used above inputform in my custom Pig Loader. I ran a pig script to > load > > data using custom loader and dump it. > > Input Size: 1256 Files * 51MB each. > > > > I was expecting atleast 3 splits that would result in 3 map tasks. > However > > only one map task was started and it was responsible for reading all of > > data via the record reader. > > > > Could anyone please show me what am i missing ? > > > > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have > > the liberty to upgrade Pig version. > > > > I observed these conversations > > > > > http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders > > http://www.mail-archive.com/user@pig.apache.org/msg03938.html > > > > They did not seem to solve my issue. > > > > Thanks for suggestions. > > > > Regards, > > Deepak > > > > > > > > -- > > Deepak > > > > > > > -- > Deepak >