Till 0.18.x, files are not added to client-side classpath. Use 0.19,
and run following command to use custom input format
bin/hadoop jar contrib/streaming/hadoop-0.19.0-streaming.jar -mapper
mapper.pl -reducer org.apache.hadoop.mapred.lib.IdentityReducer -input
test.data -output test-output -file <pathToMapper.pl> -inputformat MyFormatter
-libjars <jar-containing-custom-input-format>
Thanks
Amareshwari
t-alleyne wrote:
Hello,
I'm try to run a mapreduce job on a data file in which the keys and values
alternate rows. E.g.
key1
value1
key2
...
I've written my own InputFormat by extending FileInputFormat (the code for
this class is below.) The problem is that when I run hadoop streaming with
the command
bin/hadoop jar contrib/streaming/hadoop-0.18.3-streaming.jar -mapper
mapper.pl -reducer org.apache.hadoop.mapred.lib.IdentityReducer -input
test.data -output test-output -file <pathToMapper.pl> -inputformat
MyFormatter
I get the error
-inputformat : class not found : MyFormatter
java.lang.RuntimeException: -inputformat : class not found : MyFormatter
at org.apache.hadoop.streaming.StreamJob.fail(StreamJob.java:550)
...
I have tried putting .java, .class, and .jar file of MyFormatter in the job
jar using the -file parameter. I have also tried putting them on the hdfs
using -copyFromLocal, but I still get the same error. Can anyone give me
some hints as to what the problem might be? Also, I tried to hack together
my formatter based on the hadoop examples, so does it seems like it should
properly process the input files I described above?
Trevis
<imports ommitted>
public final class MyFormatter extends
org.apache.hadoop.mapred.FileInputFormat<Text, Text>
{
@Override
public RecordReader<Text, Text> getRecordReader( InputSplit split,
JobConf job, Reporter reporter ) throws IOException
{
return new MyRecordReader( job, (FileSplit) split );
}
static class MyRecordReader implements RecordReader<Text, Text>
{
private LineRecordReader _in = null;
private LongWritable _junk = null;
public FastaRecordReader( JobConf job, FileSplit split ) throws
IOException
{
_junk = new LongWritable();
_in = new LineRecordReader( job, split );
}
@Override
public void close() throws IOException
{
_in.close();
}
@Override
public Text createKey()
{
return new Text();
}
@Override
public Text createValue()
{
return new Text();
}
@Override
public long getPos() throws IOException
{
return _in.getPos();
}
@Override
public float getProgress() throws IOException
{
return _in.getProgress();
}
@Override
public boolean next( Text key, Text value ) throws IOException
{
if ( _in.next( _junk, key ) )
{
if ( _in.next( _junk, value ) )
{
return true;
}
}
key.clear();
value.clear();
return false;
}
}
}