I thought that the InStream buffer (called 'in' in this case) would maintain the stream position based on how many bytes I had 'read' via in.read(). Maybe this is not the case...
Would it then be proper to call: in.seek(pos); I believe I tried this at one point and I got an error. I will try again to be sure though. Thanks for your reply! Cheers, Sean Jorgen Johnson wrote: > > Hi Sean, > > Perhaps I'm missing something, but it doesn't appear to me that you're > actually seeking to the filesplit start position in your constructor... > > This would explain why all the mappers are getting the same records. > > -jorgenj > > On Mon, Jun 30, 2008 at 9:22 AM, Sean Arietta <[EMAIL PROTECTED]> > wrote: > >> >> Hello all, >> I am having a problem writing my own RecordReader. The basic setup I have >> is >> a large byte array that needs to be diced up into key value pairs such >> that >> the key is the index into the array and the value is a byte array itself >> (albeit much smaller). Here is the code that I currently have written to >> do >> just this: >> >> >> /* This method is just the constructor for my new RecordReader >> public TrainingRecordReader(Configuration job, FileSplit split) throws >> IOException >> { >> start = split.getStart(); >> end = start + split.getLength(); >> final Path file = split.getPath(); >> compressionCodecs = new CompressionCodecFactory(job); >> final CompressionCodec codec = >> compressionCodecs.getCodec(file); >> >> // open the file and seek to the start of the split >> FileSystem fs = file.getFileSystem(job); >> FSDataInputStream fileIn = fs.open(split.getPath()); >> in = new TrainingReader(fileIn, job); >> this.pos = start; >> } >> >> // This returns the key, value pair I was talking about >> public synchronized boolean next(LongWritable key, BytesWritable value) >> throws IOException >> { >> if (pos >= end) >> return false; >> >> key.set(pos); // key is position >> int newSize = in.readVector(value); >> if (newSize > 0) >> { >> pos += newSize; >> return true; >> } >> return false; >> } >> >> // This extracts that smaller byte array from the large input file >> public int readVector(BytesWritable value) throws IOException >> { >> int numBytes = in.read(buffer); >> value.set(buffer, 0, numBytes); >> return numBytes; >> } >> >> So all of this worked just fine when I set conf.set("mapred.job.tracker", >> "local"), but now that I am attempting to test in a fake distributed >> setting >> (aka still one node, but I haven't set the above config param), I do not >> get >> what I want. Instead of getting unique key value pairs, I get repeated >> key >> value pairs based on the number of map tasks I have set. So, say that my >> large file contained 49 entries, I would want a unique key value pair for >> each of those, but if I set my numMapTasks to 7, I get 7 unique ones that >> repeat every 7 key value pairs. >> >> So it seems that each MapTask which ultimately calls my >> TrainingReader.next() method from above is somehow pointing to the same >> FileSplit. I know that in the LineRecordReader in the source there is >> some >> small little routine that skips the first line of the data if you aren't >> at >> the beginning???? Is that related? Why isn't it the case that >> split.getStart() isn't returning the absolute pointer to the start of the >> split? So many questions I don't know the answer to, haha. >> >> I would appreciate anyone's help in resolving this issue. Thanks very >> much! >> >> Cheers, >> Sean M. Arietta >> -- >> View this message in context: >> http://www.nabble.com/RecordReader-Functionality-tp18199187p18199187.html >> Sent from the Hadoop core-user mailing list archive at Nabble.com. >> >> > > > -- > "Liberties are not given, they are taken." > - Aldous Huxley > > -- View this message in context: http://www.nabble.com/RecordReader-Functionality-tp18199187p18200404.html Sent from the Hadoop core-user mailing list archive at Nabble.com.