I have  LongWritable, IncidentWritable key-value pair as output from one
job, that I want to read as input in my second job, where IncidentWritable
is custom Writable(see code below).

How do I read IncidentWritable in my custom Reader? I don't know how to
convert byte[] to IncidentWritable.

Code I use:

public class IncidentWritable implements Writable
{
    ...

    public void write(DataOutput out) throws IOException
    {
        out.writeInt(getId());
        out.writeInt(getStatus());

    }

    public void readFields(DataInput in) throws IOException
    {
        setId(in.readInt());
        setStatus(in.readInt());

    }

    ...

}

public class KeyLongWritableValueIncidentWritableInputFormat    extends
FileInputFormat<LongWritable, IncidentWritable>
{
    protected boolean isSplitable(JobContext context, Path file) {
        return true;
    }
    public org.apache.hadoop.mapred.RecordReader<LongWritable,
IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit
inputSplit, JobConf jobConf, Reporter reporter) throws IOException
    {
        return new *KeyLongWritableValueIncidentWr**itableReader*((FileSplit)
inputSplit, jobConf);
    }
}

public class KeyLongWritableValueIncidentWritableReader<LongWritable,
IncidentWritable> extends RecordReader<LongWritable, IncidentWritable> {
    ...
    public synchronized boolean nextKeyValue()
       throws IOException {
       byte[] line = null;
       int lineLen = -1;
       if (lineRecordReader.nextKeyValue()) {
         innerValue = lineRecordReader.getCurrentValue();
         line = innerValue.getBytes();
         lineLen = innerValue.getLength();
       } else {
         return false;
       }
       if (line == null)
         return false;
       if (key == null) {
         key = new LongWritable();
       }
       if (value == null) {
         value = new IncidentWritable();
       }
       int pos = findSeparator(line, 0, lineLen, this.separator);
       *setKeyValue(key, value, line, lineLen, pos);*
       return true;
     }

     public static void *setKeyValue*(LongWritable key, IncidentWritable
value, byte[] line,
         int lineLen, int pos) {
         if (pos == -1) {
            Text tmp = new Text();
            tmp.set(line, 0, lineLen);
            *key = new LongWritable(Long.parseLong(tmp.toString()));*
            value = new IncidentWritable();
         } else {

            Text tmp = new Text();
            tmp.set(line, 0, pos);
            key = new LongWritable(Long.parseLong(tmp.toString()));
            tmp.set(line, pos + 1, lineLen - pos - 1);
            *value  =  //no idea how to deserialize here*
         }
     }


}









Thanks,
Valentina

Reply via email to