Re: Re: Re: File Splits in Hadoop

2008-12-11 Thread amitsingh

Hi ,

After some debugging it seems that i got the problem.

In parseARC(...)  length for bzip was incorrectly set. (last bzip in the 
split)

if (totalRead > splitEnd) {
  break;
} ... it should have ideally allowed to read from next block instead of 
setting length to offset-size for last block.


so modified the next to directly read from inputStream instead of byte 
array  populated using offset & length.



anyways, Is this the correct way to read records spanning multiple splits.
(Currently Its taking quite some time to do processing 100% CPU :(.  
Investigating what could be the potential cause )


So for me the conclusion seems to be reads spanning multiple physical 
blocks are transparent.


***I Guess, could have handled this in parseARC in a cleaner way




*
modified srcCode
*

public boolean next(Text key, BytesWritable value) throws IOException {
   long start = 0;
   long len = 0;
   try {
   LOG.info("NEXT !! ");
   int index = recordIndex++;
   if (index >= startLens.length) {
   LOG.info("BAD ");
   return false;
   } else {
   LOG.info("GOOD");
   }

   start = startLens[index][0];
   len = startLens[index][1];
   byte[] zipbytes = new byte[(int) len];

   LOG.info("index" + index + "\tstartLens.length" + 
startLens.length +"\tstart:" + start + "\tlen" + len);


  
   ByteArrayOutputStream baos = new ByteArrayOutputStream();

   ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
   GZIPInputStream zin = null;
   in.seek(start);
   //Change
   if (index == startLens.length - 1) {
//Change

   LOG.info("zin = new GZIPInputStream(in);");//Change
   zin = new GZIPInputStream(in);  
//Change
   } else {
   //Change
   in.read(zipbytes);
   //Change
   zin = new GZIPInputStream(zipin);
//Change

   LOG.info("zin = new GZIPInputStream(zipin);");//Change
   }

   int gzipRead = -1;
   int totalGzipRead = 0;
   baos.reset();
   try {
   while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
!= -1) {

   baos.write(buffer, 0, gzipRead);
   totalGzipRead += gzipRead;
   }
   } catch (Exception ex) {
   ex.printStackTrace();
   LOG
   .info(ex.toString() + "\nBANGstart:" + start + 
"\tlen"

   + len);
   LOG.equals(StringUtils.stringifyException(ex));
   }

   byte[] pageBytes = baos.toByteArray();
   baos.close();
   if (index != startLens.length - 1) {//Change
   zin.close();
  //Change
   }
  //Change

   zipin.close();
 
   Text keyText = (Text) key;

   keyText.set("" + index);
   BytesWritable valueBytes = (BytesWritable) value;
   valueBytes.set(pageBytes, 0, pageBytes.length);

   return true;
   } catch (Exception e) {
   e.printStackTrace();
   LOG.info(e.toString() + "start:" + start + "\tlen" + len);
   LOG.equals(StringUtils.stringifyException(e));
   return false;
   }

   }


amitsingh wrote:

Thanks for discussion Taran,

The problem still persists.
What should be done if i have a record which spans multiple PSplits 
(physcial splits on HDFS)?

What happens if  we try to read beyond a pSplit?
Is the next read transparently done from records corresponding to 
next  block for the same file (might not be on the same machine) or

next block (may not be of the same file) from the local disk is read.

If its former i guess things should have worked fine (surprisingly 
they arent !! i m goofing  it up somewhere).
If its latter then i have no idea how to tackle this. (Any help would 
be highly appreciated)




** 



I Tried running a simple program where in I created a sample GZip file 
by serailizing records

  // serialize the objects sarah and sam
  FileOutputStream fos = new 
FileOutputStream("/home/amitsingh/OUTPUT/out.bin");

  GZIPOutputStream gz = new GZIPOutputStream(fos);
  ObjectOutputStream o

Re: Re: File Splits in Hadoop

2008-12-10 Thread amitsingh

Thanks for discussion Taran,

The problem still persists.
What should be done if i have a record which spans multiple PSplits 
(physcial splits on HDFS)?

What happens if  we try to read beyond a pSplit?
Is the next read transparently done from records corresponding to next  
block for the same file (might not be on the same machine) or

next block (may not be of the same file) from the local disk is read.

If its former i guess things should have worked fine (surprisingly they 
arent !! i m goofing  it up somewhere).
If its latter then i have no idea how to tackle this. (Any help would be 
highly appreciated)




**

I Tried running a simple program where in I created a sample GZip file 
by serailizing records

  // serialize the objects sarah and sam
  FileOutputStream fos = new 
FileOutputStream("/home/amitsingh/OUTPUT/out.bin");

  GZIPOutputStream gz = new GZIPOutputStream(fos);
  ObjectOutputStream oos = new ObjectOutputStream(gz);

  for (int i = 0; i < 50; i++) {
  Employee sam = new Employee(i + "name", i,   i + 5);
   // 3 fields , 2 int , 1 string
  oos.writeObject(sam);
  }
  oos.flush();
  oos.close();

Now if i just run a simple map reduce on this binary file, it gives 
exception java.io.EOFException: Unexpected end of ZLIB input stream

It creates 2 splits
Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 
length:1555001 hosts: sandpiper ,bytesRemaining: 1555001
Split 2:  hdfs://localhost:54310/user/amitsingh/out1: start1555001 
length:1555001 hosts: sandpiper ,


For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB 
input stream [for startLens[0]  start:0len1556480]

For Map2--> No valid GZip is found as startLens is empty

I am not sure why in Map1 len1556480 and not 3110002(entire file) as  
there is ONLY one GZip and thats the entire file.

Any guidance would be of great help ??







**
Source code
**

package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

public class CustomGzipRecordReader implements
   RecordReader {

   public static final Log LOG = LogFactory
   .getLog(CustomGzipRecordReader.class);

   protected Configuration conf;
   protected long splitStart = 0;
   protected long pos = 0;
   protected long splitEnd = 0;
   protected long splitLen = 0;
   protected long fileLen = 0;
   protected FSDataInputStream in;
   protected int recordIndex = 0;
   protected long[][] startLens;
   protected byte[] buffer = new byte[4096];

   private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };

   //chech the split and populate startLens indicating at which all 
offset a Zlib file starts in this split

   private void parseArcBytes() throws IOException {

   long totalRead = in.getPos();
   byte[] buffer = new byte[4096];
   List starts = new ArrayList();

   int read = -1;
   while ((read = in.read(buffer)) > 0) {

   for (int i = 0; i < (read - 1); i++) {

   if ((buffer[i] == (byte) 0x1F)
   && (buffer[i + 1] == (byte) 0x8B)) {
   long curStart = totalRead + i;
   in.seek(curStart);
   byte[] zipbytes = null;
   try {
   zipbytes = new byte[32];
   in.read(zipbytes);
   ByteArrayInputStream zipin = new 
ByteArrayInputStream(

   zipbytes);
   GZIPInputStream zin = new GZIPInputStream(zipin);
   zin.close();
   zipin.close();
   starts.add(curStart);
   LOG.info("curStart: " + (curStart));
   } catch (Exception e) {
   LOG.info("Ignoring position: " + (curStart));
   continue;
   }
   }
   }

   totalRead += read;
   in.seek(totalRead);
   if (totalRead > splitEnd) {
   break;
   }
   }

   startLens

Re: File Splits in Hadoop

2008-12-10 Thread Tarandeep Singh
On Wed, Dec 10, 2008 at 11:12 AM, amitsingh <[EMAIL PROTECTED]>wrote:

> Hi,
>
> I am stuck with some questions based on following scenario.
>
> 1) Hadoop normally splits the input file and distributes the splits across
> slaves(referred to as Psplits from now), in to chunks of 64 MB.
> a) Is there Any way to specify split criteria  so for example a huge 4 GB
> file is split in to 40 odd files(Psplits) respecting record boundaries ?


you can set mapred.min.split.size in jobConf
you can set its value greater than block size and hence can force a split to
be larger than block size. However, this might result into splits having
data blocks that are not local.


>
> b) Is it even required that these physical splits(Psplits) obey record
> boundaries ?
>
> 2) We can get locations of these Psplits on HDFS as follows
> BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,  length);
> //FileInputFormat line 273
> In FileInputFormat, for each blkLocations(Psplit) multiple logical
> splits(referred to as Lsplits from now) are created based on hueristic for
> number of mappers.
>
> Q) How is following situation handled in TextInputFormat which reads line
> by line,
>   i) Input File is split as described in step 1 in more than 2 parts
>   ii) Suppose there is a line of text which starts near end of Psplit-i and
> end in Psplit-i+1 (say Psplit2 and Psplit3)
>   iii) Which mapper gets this line spanning multiple Psplits(mapper_i or
> mapper_i+1)
>   iv) I went through the FileInputFormat code, Lsplits are done only for a
> particular pSplit not across pSplit. Why so ?
>
> Q) In short, If one has to read arbitary objects(not line), how does one
> handle records which are partially in one PSplit and partially in other.
>

I am working on this as well and not found exact answer, but in my view
mapper_i should handle the line / record which is partially in one split and
partially in other split. The mapper_i+1 should first seek beginning of new
record (line in this case) and start processing from there.

Someone from Hadoop core team please correct me if this is wrong and fill in
details.

Thanks,
Taran


> --Amit
>
>
>
>


File Splits in Hadoop

2008-12-10 Thread amitsingh

Hi,

I am stuck with some questions based on following scenario.

1) Hadoop normally splits the input file and distributes the splits 
across slaves(referred to as Psplits from now), in to chunks of 64 MB.
a) Is there Any way to specify split criteria  so for example a huge 4 
GB file is split in to 40 odd files(Psplits) respecting record boundaries ?
b) Is it even required that these physical splits(Psplits) obey record 
boundaries ?


2) We can get locations of these Psplits on HDFS as follows
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,  
length); //FileInputFormat line 273
In FileInputFormat, for each blkLocations(Psplit) multiple logical 
splits(referred to as Lsplits from now) are created based on hueristic 
for number of mappers.


Q) How is following situation handled in TextInputFormat which reads 
line by line,

   i) Input File is split as described in step 1 in more than 2 parts
   ii) Suppose there is a line of text which starts near end of 
Psplit-i and end in Psplit-i+1 (say Psplit2 and Psplit3)
   iii) Which mapper gets this line spanning multiple Psplits(mapper_i 
or mapper_i+1)
   iv) I went through the FileInputFormat code, Lsplits are done only 
for a particular pSplit not across pSplit. Why so ?


Q) In short, If one has to read arbitary objects(not line), how does one 
handle records which are partially in one PSplit and partially in other.


--Amit