Hello all,
I am having a problem writing my own RecordReader. The basic setup I have is
a large byte array that needs to be diced up into key value pairs such that
the key is the index into the array and the value is a byte array itself
(albeit much smaller). Here is the code that I currently have written to do
just this:


/* This method is just the constructor for my new RecordReader
public TrainingRecordReader(Configuration job, FileSplit split) throws
IOException 
        {
                start = split.getStart();
                end = start + split.getLength();
                final Path file = split.getPath();
                compressionCodecs = new CompressionCodecFactory(job);
                final CompressionCodec codec = compressionCodecs.getCodec(file);
                
                // open the file and seek to the start of the split
                FileSystem fs = file.getFileSystem(job);
                FSDataInputStream fileIn = fs.open(split.getPath());
                in = new TrainingReader(fileIn, job);
                this.pos = start;
        }

// This returns the key, value pair I was talking about
public synchronized boolean next(LongWritable key, BytesWritable value)
throws IOException 
        {
                if (pos >= end)
                        return false;
                
                key.set(pos);           // key is position
                int newSize = in.readVector(value);
                if (newSize > 0) 
                {
                        pos += newSize;
                        return true;
                }
                return false;
        }

// This extracts that smaller byte array from the large input file
public int readVector(BytesWritable value) throws IOException 
                {
                        int numBytes = in.read(buffer);
                        value.set(buffer, 0, numBytes);
                        return numBytes;
                }

So all of this worked just fine when I set conf.set("mapred.job.tracker",
"local"), but now that I am attempting to test in a fake distributed setting
(aka still one node, but I haven't set the above config param), I do not get
what I want. Instead of getting unique key value pairs, I get repeated key
value pairs based on the number of map tasks I have set. So, say that my
large file contained 49 entries, I would want a unique key value pair for
each of those, but if I set my numMapTasks to 7, I get 7 unique ones that
repeat every 7 key value pairs.

So it seems that each MapTask which ultimately calls my
TrainingReader.next() method from above is somehow pointing to the same
FileSplit. I know that in the LineRecordReader in the source there is some
small little routine that skips the first line of the data if you aren't at
the beginning???? Is that related? Why isn't it the case that
split.getStart() isn't returning the absolute pointer to the start of the
split? So many questions I don't know the answer to, haha.

I would appreciate anyone's help in resolving this issue. Thanks very much!

Cheers,
Sean M. Arietta
-- 
View this message in context: 
http://www.nabble.com/RecordReader-Functionality-tp18199187p18199187.html
Sent from the Hadoop core-user mailing list archive at Nabble.com.

Reply via email to