Thank you for your help. 2011/11/4 Todd Lipcon <[email protected]>
> Hi Pedro, > > It sounds like you're on the right track, but I don't really have time > to help much beyond pointing you in the right direction. Time to put > on your debugging hat :) Maybe do some testing with a small job like > "sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1 > reducer. If I recall correctly it generates a single map output > record... otherwise you could do a "sort" of 1 line of text. Then you > can easily add debug output to diagnose what your issue is. > > -Todd > > On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[email protected]> wrote: > > I've looked to the MapOutputServlet class. But the problem is the > following: > > > > MapOutput can be compressed or not. When I'm talking about uncompressed > > mapoutput, using the index mechanism of the MapOutputServlet, it works > for > > me. The map tasks generates digests for each partition, and it match with > > the digests produce by the reduce. > > > > Let me explain what I've updated in the code of MR at my own version. A > map > > task (MT) is producing a digest for each partition of data generated. So, > > if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and > > Hash2. > > > > Now, when a reduce task (RT) fetch the map output, it will generate > another > > digest using the index mechanism of the MapOutputServlet and compares > with > > the respective digest generated by the map task. > > > > As you can see in my explanation, when I'm talking about uncompressed map > > output, the index mechanism is really useful. > > > > But I've also tried to do the same with compressed map output. And it > > doesn't work. That's the reason that I'm trying now with the IFile.Reader > > class. > > > > As you can see, I'm in a big dilemma and I don't know what to do. > > > > I will show you my code. This 2 methods are trying to generate digests > from > > the map and the reduce side. At the end, they give different results, > and I > > don't know why. These 2 methods are my first tentative to generate > digests > > from compressed map output > > > > > > [code] > > // this method is trying to generate a digest from the compressed map > > output on the map side. > > public synchronized String generateHash(FileSystem fs, Path filename, > > Decompressor decompressor, int offset, int mapOutputLength) { > > LOG.debug("Opening file2: " + filename); > > > > MessageDigest md = null; > > String digest = null; > > DecompressorStream decodec = null; > > FSDataInputStream input = null; > > > > try { > > input = fs.open(filename); > > decodec = new DecompressorStream(input, decompressor); > > md = MessageDigest.getInstance("SHA-1"); > > System.out.println("ABC"); > > byte[] buffer; > > int size; > > while (mapOutputLength > 0) { > > // the case that the bytes read is small the the default size. > > // We don't want that the message digest contains trash. > > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > > System.out.println("mapOutputLength: " + mapOutputLength + " Size: " + > > size); > > > > if(size == 0) > > break; > > > > buffer = new byte[size]; > > size = decodec.read(buffer, offset, size); > > System.out.println("read: " + size + "\ndata: " + new String(buffer)); > > mapOutputLength -= size; > > > > if(size > 0) > > md.update(buffer); > > else > > if(size == -1) > > break; > > } > > System.out.println("DFG"); > > digest = hashIt(md); > > } catch (NoSuchAlgorithmException e) { > > //TODO Auto-generated catch block > > e.printStackTrace(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } finally { > > if(input!= null) > > try { > > input.close(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > } > > > > return digest; > > } > > [/code] > > > > > > > > [code] > > // this method is trying to generate the digest from the map output > > compressed sent by the reduce > > public synchronized String generateHash(byte[] data, Decompressor > > decompressor, int offset, int mapOutputLength) { > > MessageDigest md = null; > > String digest = null; > > DecompressorStream decodec = null; > > ByteArrayInputStream bis = null; > > try { > > bis = new ByteArrayInputStream(data); > > decodec = new DecompressorStream(bis, decompressor); > > md = MessageDigest.getInstance("SHA-1"); > > > > int size; > > byte[] buffer; > > while (mapOutputLength > 0) { > > // the case that the bytes read is small the the default size. > > // We don't want that the message digest contains trash. > > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > > > > if(size == 0) > > break; > > > > buffer = new byte[size]; > > decodec.read(buffer, offset, size); > > md.update(buffer); > > > > mapOutputLength -= size; > > } > > > > digest = hashIt(md); > > } catch (NoSuchAlgorithmException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } finally { > > if(bis!= null) > > try { > > bis.close(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > } > > > > return digest; > > } > > [/code] > > > > > > 2011/11/4 Todd Lipcon <[email protected]> > > > >> On Fri, Nov 4, 2011 at 10:04 AM, Pedro Costa <[email protected]> > wrote: > >> > 1- I think that IFIle.reader can only read the whole map output file. > I > >> > want to read a partition of the map output. How can I do that? How do > I > >> set > >> > the size of a partition in the I > >> > >> Look at the code for MapOutputServlet - it uses the index mechanism to > >> find a particular partition. > >> > >> > > >> > 2 - I know that map output is composed by blocks. What is the size of > a > >> > block? Is it 64MB by default? > >> > >> Nope, it doesn't use blocks. That's HDFS you're thinking of. > >> > >> -Todd > >> > >> > 2011/11/4 Todd Lipcon <[email protected]> > >> > > >> >> Hi Pedro, > >> >> > >> >> The format is called IFile. Check out the source for more info on the > >> >> format - it's fairly simple. The partition starts are recorded in a > >> >> separate index file next to the output file. > >> >> > >> >> I don't think you'll find significant docs on this format since it's > >> >> MR-internal - the code is your best resource. > >> >> > >> >> -Todd > >> >> > >> >> On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <[email protected]> > wrote: > >> >> > Hi, > >> >> > > >> >> > I'm trying to understand the structure of the map output file. > Here's > >> an > >> >> > example of a mapoutput file that contains 2 partitions: > >> >> > > >> >> > [code] > >> >> > <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot > apple > >> >> > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot > >> apple > >> >> > carrot apple apple carrot banana apple ^Mbanana apple > >> >> <FF><FF><DF>|<8E><B7> > >> >> > [/code] > >> >> > > >> >> > 1 - I would like to understand what are the ASCII characters parts. > >> What > >> >> > they means? > >> >> > > >> >> > 2 - What type of file is a map output? Is it a > >> SequenceFileOutputFormat, > >> >> or > >> >> > a TextOutputFormat? > >> >> > > >> >> > 3 - I've a small program that runs independently of the MR that has > >> the > >> >> > goal to digest each partition and give the correspondent hash. How > do > >> I > >> >> > know where each partition starts? > >> >> > > >> >> > > >> >> > -- > >> >> > Thanks, > >> >> > PSC > >> >> > > >> >> > >> >> > >> >> > >> >> -- > >> >> Todd Lipcon > >> >> Software Engineer, Cloudera > >> >> > >> > > >> > > >> > > >> > -- > >> > Thanks, > >> > > >> > >> > >> > >> -- > >> Todd Lipcon > >> Software Engineer, Cloudera > >> > > > > > > > > -- > > Thanks, > > > > > > -- > Todd Lipcon > Software Engineer, Cloudera > -- Thanks,
