Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its
thing.


On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> I tried this
> http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files
>
> Test Job  Details
> Input 7 Files * 51MB each
>
> HDFS Counters of Job
> Counter Map Reduce Total
> FileSystemCounters FILE_BYTES_READ 92 23 115
> HDFS_BYTES_READ 360,235,092 0 360,235,092
> FILE_BYTES_WRITTEN 116,558 116,349 232,907
> HDFS_BYTES_WRITTEN 0 9 9
>
> From Job Conf
> pig.maxCombinedSplitSize 205847916
> pig.splitCombination true
> set mapred.max.split.size 205847916 (With/Without)
>
> Map Task displays these logs from CombileSlpit
> Total Split Length:360233853
> Total Split Paths Length:7
>
> With 360MB of input data, and above conf there should have been two splits.
> However there was only one split and all 7 files were read from single map
> task
>
>
> From my loader prepareRecodReader does not use the PigSplit
>
>        @Override
> public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
> IOException {
>  reader = (CombineFileRecordReader) (arg0);
> }
>
> Any suggestions ?
>
>
> On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
> > I am facing a issue of large number of small sized files (51MB each). A
> > typical M/R job in my working environment would take ateast 2000 files,
> > which is resulting in 2000 map tasks. Hence i thought of using
> > CombineFileInputFormat to reduce the problem. I had written a custom
> > implementation of CombineFileInputFormat along
> > with CombineFileRecordReader. Below is skeleton of the code.
> >
> >
> > public class CustomInputFormat extends
> CombineFileInputFormat<IntWritable,
> > RecordHolder> {
> > @Override
> > public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> > InputSplit split,
> >  final TaskAttemptContext taskContext) throws IOException {
> > final CombineFileSplit fSplit = (CombineFileSplit) split;
> >  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> > taskContext,
> > (Class) CustomRecordReader.class);
> >  }
> >
> > public static class CustomRecordReader extends RecordReader<IntWritable,
> > RecordHolder> {
> >                 private IntWritable key = null;
> > private RecordHolder value = null;
> > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> > context, Integer idx)
> >  throws IOException {
> > FileSplit fileSplit = new FileSplit(split.getPath(idx),
> > split.getOffset(idx), split.getLength(idx),
> >  split.getLocations());
> > Path path = fileSplit.getPath();
> > }
> >
> > @Override
> > public void close() throws IOException {
> > }
> >
> > @Override
> > public IntWritable getCurrentKey() throws IOException,
> > InterruptedException {
> >  return key;
> > }
> >
> > @Override
> > public RecordHolder getCurrentValue() throws IOException,
> > InterruptedException {
> >  return value;
> > }
> >
> > @Override
> >  public float getProgress() throws IOException, InterruptedException {
> > return 0;
> > }
> >
> > @Override
> > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> > IOException, InterruptedException {
> >
> > }
> >
> > @Override
> > public boolean nextKeyValue() throws IOException, InterruptedException {
> >  boolean dataPresent = false;
> > if (currentIterator.hasNext()) {
> > Object nextRecord = currentIterator.next();
> >  key = new IntWritable(recordPosition++);
> > value = new RecordHolder(nextObject);
> > dataPresent = true;
> >  } else {
> > LOGGER.info("Reached end of File:" + dataPresent);
> > }
> >  return dataPresent;
> > }
> > }
> > }
> >
> > Within setLocation() of Loader i specified the following configurations
> > job.getConfiguration().set("mapred.max.split.size", "205847916");
> > job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
> >
> > I used above inputform in my custom Pig Loader. I ran a pig script to
> load
> > data using custom loader and dump it.
> > Input Size: 1256 Files * 51MB each.
> >
> > I was expecting atleast 3 splits that would result in 3 map tasks.
> However
> > only one map task was started and it was responsible for reading all of
> > data via the record reader.
> >
> > Could anyone please show me what am i missing ?
> >
> > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have
> > the liberty to upgrade Pig version.
> >
> > I observed these conversations
> >
> >
> http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
> > http://www.mail-archive.com/user@pig.apache.org/msg03938.html
> >
> > They did not seem to solve my issue.
> >
> > Thanks for suggestions.
> >
> > Regards,
> > Deepak
> >
> >
> >
> > --
> > Deepak
> >
> >
>
>
> --
> Deepak
>

Reply via email to