[ 
https://issues.apache.org/jira/browse/HADOOP-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12629362#action_12629362
 ] 

jsensarma edited comment on HADOOP-4065 at 9/8/08 6:47 PM:
-------------------------------------------------------------------

@Pete - i am still trying to understand the proposed interface. in my case - 
the data is not splitable (so maybe there's a different base class - 
UnsplittableFileInputFormat). The factory approach for getting a record reader 
sounds interesting - but on second thoughts - since the getRR() already takes 
in a Configuration object - we don't need a new interface for this i think. 
(Meaning that we could write a ConfigurableRecordReader that could look at the 
config and instantiate the right record reader and then redirect all RR api 
calls to the contained record reader.)

the main motivation i had for filing this bug was to extract out the common 
parts for dealing with compressed, non-splitable binary data into a base class 
(and most of this functionality is in the record reader) and make it easy to 
handle new kinds of binary data with minimal code. Binary files don't have keys 
and values - they just have rows of data. So one part is to supply some default 
key (like record number). The remaining part is to get the class of the row and 
the deserialization method. Appropriately - it would be nice to have a 
deserializerFactory that takes in a Configuration object (which is i think one 
of the key missing parts of hadoop-1986). that way - the deserializer can be 
configured by application - instead of hadoop maintaining a mapping from class 
-> Deserializer.

so my proposal would be something like this (admittedly - i am not being very 
ambitious here - just want to cover the issue mentioned in this jira):

{code}
/**
  * forced to make this class since maprunner tries to use same object for all 
next() calls.
  * This way we can swap out the actual 'row' object on each call to next()
  */
public class RowContainer {
   public Object row;
}

/**
 * Application can return right deserializer based on configuration
 */
public interface RowSource {
   public Deserializer<?> getDeserializer(Configuration conf) throws 
IOException;
   public Class<?> getClass();
}

/**
 * Reads a non-splitable binary flat file.
 */
public class RowSourceFileInputFormat extends FileInputFormat<LongWritable, 
RowContainer> {
  protected boolean isSplitable(FileSystem fs, Path file) { return false; }
  public RecordReader<LongWritable, RowContainer> getRecordReader(InputSplit 
genericSplit, JobConf job,
                                                                      Reporter 
reporter)
    throws IOException {
    reporter.setStatus(genericSplit.toString());
    return new RowSourceRecordReader(job, (FileSplit) genericSplit);
  }
}

/**
 * Reads one row at a time. The key is the row number and the actual row is 
returned inside the RowContainer
 */
public class RowSourceRecordReader implements RecordReader<LongWritable, 
RowContainer> {
    private long rnum;
    private final DataInput in;
    private final DecompressorStream dcin;
    private final FSDataInputStream fsin;
    private final long end;
    private final Deserializer deserializer;

    public RowSourceRecordReader(Configuration job,
                                FileSplit split) throws IOException {

      final Path file = split.getPath();
      CompressionCodecFactory compressionCodecs = new 
CompressionCodecFactory(job);
      final CompressionCodec codec = compressionCodecs.getCodec(file);
      FileSystem fs = file.getFileSystem(job);
      fsin = fs.open(split.getPath());

      if(codec != null) {
        dcin = (DecompressorStream)codec.createInputStream(fsin);
        in = new DataInputStream(dcin);
      } else {
        dcin = null;
        in = fsin;
      }
      rnum = 0;
      end = split.getLength();

      
deserializer=(ReflectionUtils.newInstance(job.getClass("mapred.input.rowsource",
 null, RowSource.class)).getDeserializer(job);
      deserializer.open(in);
    }

    public LongWritable createKey() {
      return new LongWritable();
    }

    public RowContainer createValue() {
       return new RowContainer();
    }

    public synchronized boolean next(LongWritable key, RowContainer value) 
throws IOException {
      if(dcin != null) {
        if (dcin.available() == 0) {
          return false;
        }
      } else {
        if(fsin.getPos() >= end) {
          return false;
        }
      }
      key.set(rnum++);
      Object row = deserializer.deserialize(value.row);
      value.row = row;
      return true;
    }

    public synchronized float getProgress() throws IOException {
      // this assumes no splitting                                              
                                                 
      if (end == 0) {
        return 0.0f;
      } else {
        // gives progress over uncompressed stream                              
                                                 
        return Math.min(1.0f, fsin.getPos()/(float)(end));


    public synchronized long getPos() throws IOException {
      // position over uncompressed stream. not sure what                       
                                                 
      // effect this has on stats about job                                     
                                                 
      return fsin.getPos();
    }

    public synchronized void close() throws IOException {
       // assuming that this closes the underlying streams
       deserializer.close();
    }
}
{code}

      was (Author: jsensarma):
    @Pete - i am still trying to understand the proposed interface. in my case 
- the data is not splitable (so maybe there's a different base class - 
UnsplittableFileInputFormat). The factory approach for getting a record reader 
sounds interesting - but on second thoughts - since the getRR() already takes 
in a Configuration object - we don't need a new interface for this i think. 
(Meaning that we could write a ConfigurableRecordReader that could look at the 
config and instantiate the right record reader and then redirect all RR api 
calls to the contained record reader.)

the main motivation i had for filing this bug was to extract out the common 
parts for dealing with compressed, non-splitable binary data into a base class 
(and most of this functionality is in the record reader) and make it easy to 
handle new kinds of binary data with minimal code. Binary files don't have keys 
and values - they just have rows of data. So one part is to supply some default 
key (like record number). The remaining part is to get the class of the row and 
the deserialization method. Appropriately - it would be nice to have a 
deserializerFactory that takes in a Configuration object (which is i think one 
of the key missing parts of hadoop-1986). that way - the deserializer can be 
configured by application - instead of hadoop maintaining a mapping from class 
-> Deserializer.

so my proposal would be something like this (admittedly - i am not being very 
ambitious here - just want to cover the issue mentioned in this jira):

/**
  * forced to make this class since maprunner tries to use same object for all 
next() calls.
  * This way we can swap out the actual 'row' object on each call to next()
  */
public class RowContainer {
   public Object row;
}

/**
 * Application can return right deserializer based on configuration
 */
public interface RowSource {
   public Deserializer<?> getDeserializer(Configuration conf) throws 
IOException;
   public Class<?> getClass();
}

/**
 * Reads a non-splitable binary flat file.
 */
public class RowSourceFileInputFormat extends FileInputFormat<LongWritable, 
RowContainer> {
  protected boolean isSplitable(FileSystem fs, Path file) { return false; }
  public RecordReader<LongWritable, RowContainer> getRecordReader(InputSplit 
genericSplit, JobConf job,
                                                                      Reporter 
reporter)
    throws IOException {
    reporter.setStatus(genericSplit.toString());
    return new RowSourceRecordReader(job, (FileSplit) genericSplit);
  }
}

/**
 * Reads one row at a time. The key is the row number and the actual row is 
returned inside the RowContainer
 */
public class RowSourceRecordReader implements RecordReader<LongWritable, 
RowContainer> {
    private long rnum;
    private final DataInput in;
    private final DecompressorStream dcin;
    private final FSDataInputStream fsin;
    private final long end;
    private final Deserializer deserializer;

    public RowSourceRecordReader(Configuration job,
                                FileSplit split) throws IOException {

      final Path file = split.getPath();
      CompressionCodecFactory compressionCodecs = new 
CompressionCodecFactory(job);
      final CompressionCodec codec = compressionCodecs.getCodec(file);
      FileSystem fs = file.getFileSystem(job);
      fsin = fs.open(split.getPath());

      if(codec != null) {
        dcin = (DecompressorStream)codec.createInputStream(fsin);
        in = new DataInputStream(dcin);
      } else {
        dcin = null;
        in = fsin;
      }
      rnum = 0;
      end = split.getLength();

      
deserializer=(ReflectionUtils.newInstance(job.getClass("mapred.input.rowsource",
 null, RowSource.class)).getDeserializer(job);
      deserializer.open(in);
    }

    public LongWritable createKey() {
      return new LongWritable();
    }

    public RowContainer createValue() {
       return new RowContainer();
    }

    public synchronized boolean next(LongWritable key, RowContainer value) 
throws IOException {
      if(dcin != null) {
        if (dcin.available() == 0) {
          return false;
        }
      } else {
        if(fsin.getPos() >= end) {
          return false;
        }
      }
      key.set(rnum++);
      Object row = deserializer.deserialize(value.row);
      value.row = row;
      return true;
    }

    public synchronized float getProgress() throws IOException {
      // this assumes no splitting                                              
                                                 
      if (end == 0) {
        return 0.0f;
      } else {
        // gives progress over uncompressed stream                              
                                                 
        return Math.min(1.0f, fsin.getPos()/(float)(end));


    public synchronized long getPos() throws IOException {
      // position over uncompressed stream. not sure what                       
                                                 
      // effect this has on stats about job                                     
                                                 
      return fsin.getPos();
    }

    public synchronized void close() throws IOException {
       // assuming that this closes the underlying streams
       deserializer.close();
    }
}

  
> support for reading binary data from flat files
> -----------------------------------------------
>
>                 Key: HADOOP-4065
>                 URL: https://issues.apache.org/jira/browse/HADOOP-4065
>             Project: Hadoop Core
>          Issue Type: Bug
>          Components: mapred
>            Reporter: Joydeep Sen Sarma
>         Attachments: HADOOP-4065.0.txt, ThriftFlatFile.java
>
>
> like textinputformat - looking for a concrete implementation to read binary 
> records from a flat file (that may be compressed).
> it's assumed that hadoop can't split such a file. so the inputformat can set 
> splittable to false.
> tricky aspects are:
> - how to know what class the file contains (has to be in a configuration 
> somewhere).
> - how to determine EOF (would be nice if hadoop can determine EOF and not 
> have the deserializer throw an exception  (which is hard to distinguish from 
> a exception due to corruptions?)). this is easy for non-compressed streams - 
> for compressed streams - DecompressorStream has a useful looking 
> getAvailable() call - except the class is marked package private.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to