I have LongWritable, IncidentWritable key-value pair as output from one job, that I want to read as input in my second job, where IncidentWritable is custom Writable(see code below).
How do I read IncidentWritable in my custom Reader? I don't know how to convert byte[] to IncidentWritable. Code I use: public class IncidentWritable implements Writable { ... public void write(DataOutput out) throws IOException { out.writeInt(getId()); out.writeInt(getStatus()); } public void readFields(DataInput in) throws IOException { setId(in.readInt()); setStatus(in.readInt()); } ... } public class KeyLongWritableValueIncidentWritableInputFormat extends FileInputFormat<LongWritable, IncidentWritable> { protected boolean isSplitable(JobContext context, Path file) { return true; } public org.apache.hadoop.mapred.RecordReader<LongWritable, IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { return new *KeyLongWritableValueIncidentWr**itableReader*((FileSplit) inputSplit, jobConf); } } public class KeyLongWritableValueIncidentWritableReader<LongWritable, IncidentWritable> extends RecordReader<LongWritable, IncidentWritable> { ... public synchronized boolean nextKeyValue() throws IOException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; if (key == null) { key = new LongWritable(); } if (value == null) { value = new IncidentWritable(); } int pos = findSeparator(line, 0, lineLen, this.separator); *setKeyValue(key, value, line, lineLen, pos);* return true; } public static void *setKeyValue*(LongWritable key, IncidentWritable value, byte[] line, int lineLen, int pos) { if (pos == -1) { Text tmp = new Text(); tmp.set(line, 0, lineLen); *key = new LongWritable(Long.parseLong(tmp.toString()));* value = new IncidentWritable(); } else { Text tmp = new Text(); tmp.set(line, 0, pos); key = new LongWritable(Long.parseLong(tmp.toString())); tmp.set(line, pos + 1, lineLen - pos - 1); *value = //no idea how to deserialize here* } } } Thanks, Valentina