Re: Re: Re: File Splits in Hadoop
Hi , After some debugging it seems that i got the problem. In parseARC(...) length for bzip was incorrectly set. (last bzip in the split) if (totalRead > splitEnd) { break; } ... it should have ideally allowed to read from next block instead of setting length to offset-size for last block. so modified the next to directly read from inputStream instead of byte array populated using offset & length. anyways, Is this the correct way to read records spanning multiple splits. (Currently Its taking quite some time to do processing 100% CPU :(. Investigating what could be the potential cause ) So for me the conclusion seems to be reads spanning multiple physical blocks are transparent. ***I Guess, could have handled this in parseARC in a cleaner way * modified srcCode * public boolean next(Text key, BytesWritable value) throws IOException { long start = 0; long len = 0; try { LOG.info("NEXT !! "); int index = recordIndex++; if (index >= startLens.length) { LOG.info("BAD "); return false; } else { LOG.info("GOOD"); } start = startLens[index][0]; len = startLens[index][1]; byte[] zipbytes = new byte[(int) len]; LOG.info("index" + index + "\tstartLens.length" + startLens.length +"\tstart:" + start + "\tlen" + len); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes); GZIPInputStream zin = null; in.seek(start); //Change if (index == startLens.length - 1) { //Change LOG.info("zin = new GZIPInputStream(in);");//Change zin = new GZIPInputStream(in); //Change } else { //Change in.read(zipbytes); //Change zin = new GZIPInputStream(zipin); //Change LOG.info("zin = new GZIPInputStream(zipin);");//Change } int gzipRead = -1; int totalGzipRead = 0; baos.reset(); try { while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) { baos.write(buffer, 0, gzipRead); totalGzipRead += gzipRead; } } catch (Exception ex) { ex.printStackTrace(); LOG .info(ex.toString() + "\nBANGstart:" + start + "\tlen" + len); LOG.equals(StringUtils.stringifyException(ex)); } byte[] pageBytes = baos.toByteArray(); baos.close(); if (index != startLens.length - 1) {//Change zin.close(); //Change } //Change zipin.close(); Text keyText = (Text) key; keyText.set("" + index); BytesWritable valueBytes = (BytesWritable) value; valueBytes.set(pageBytes, 0, pageBytes.length); return true; } catch (Exception e) { e.printStackTrace(); LOG.info(e.toString() + "start:" + start + "\tlen" + len); LOG.equals(StringUtils.stringifyException(e)); return false; } } amitsingh wrote: Thanks for discussion Taran, The problem still persists. What should be done if i have a record which spans multiple PSplits (physcial splits on HDFS)? What happens if we try to read beyond a pSplit? Is the next read transparently done from records corresponding to next block for the same file (might not be on the same machine) or next block (may not be of the same file) from the local disk is read. If its former i guess things should have worked fine (surprisingly they arent !! i m goofing it up somewhere). If its latter then i have no idea how to tackle this. (Any help would be highly appreciated) ** I Tried running a simple program where in I created a sample GZip file by serailizing records // serialize the objects sarah and sam FileOutputStream fos = new FileOutputStream("/home/amitsingh/OUTPUT/out.bin"); GZIPOutputStream gz = new GZIPOutputStream(fos); ObjectOutputStream o
Re: Re: File Splits in Hadoop
Thanks for discussion Taran, The problem still persists. What should be done if i have a record which spans multiple PSplits (physcial splits on HDFS)? What happens if we try to read beyond a pSplit? Is the next read transparently done from records corresponding to next block for the same file (might not be on the same machine) or next block (may not be of the same file) from the local disk is read. If its former i guess things should have worked fine (surprisingly they arent !! i m goofing it up somewhere). If its latter then i have no idea how to tackle this. (Any help would be highly appreciated) ** I Tried running a simple program where in I created a sample GZip file by serailizing records // serialize the objects sarah and sam FileOutputStream fos = new FileOutputStream("/home/amitsingh/OUTPUT/out.bin"); GZIPOutputStream gz = new GZIPOutputStream(fos); ObjectOutputStream oos = new ObjectOutputStream(gz); for (int i = 0; i < 50; i++) { Employee sam = new Employee(i + "name", i, i + 5); // 3 fields , 2 int , 1 string oos.writeObject(sam); } oos.flush(); oos.close(); Now if i just run a simple map reduce on this binary file, it gives exception java.io.EOFException: Unexpected end of ZLIB input stream It creates 2 splits Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 length:1555001 hosts: sandpiper ,bytesRemaining: 1555001 Split 2: hdfs://localhost:54310/user/amitsingh/out1: start1555001 length:1555001 hosts: sandpiper , For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB input stream [for startLens[0] start:0len1556480] For Map2--> No valid GZip is found as startLens is empty I am not sure why in Map1 len1556480 and not 3110002(entire file) as there is ONLY one GZip and thats the entire file. Any guidance would be of great help ?? ** Source code ** package org.apache.hadoop.mapred; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; public class CustomGzipRecordReader implements RecordReader { public static final Log LOG = LogFactory .getLog(CustomGzipRecordReader.class); protected Configuration conf; protected long splitStart = 0; protected long pos = 0; protected long splitEnd = 0; protected long splitLen = 0; protected long fileLen = 0; protected FSDataInputStream in; protected int recordIndex = 0; protected long[][] startLens; protected byte[] buffer = new byte[4096]; private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B }; //chech the split and populate startLens indicating at which all offset a Zlib file starts in this split private void parseArcBytes() throws IOException { long totalRead = in.getPos(); byte[] buffer = new byte[4096]; List starts = new ArrayList(); int read = -1; while ((read = in.read(buffer)) > 0) { for (int i = 0; i < (read - 1); i++) { if ((buffer[i] == (byte) 0x1F) && (buffer[i + 1] == (byte) 0x8B)) { long curStart = totalRead + i; in.seek(curStart); byte[] zipbytes = null; try { zipbytes = new byte[32]; in.read(zipbytes); ByteArrayInputStream zipin = new ByteArrayInputStream( zipbytes); GZIPInputStream zin = new GZIPInputStream(zipin); zin.close(); zipin.close(); starts.add(curStart); LOG.info("curStart: " + (curStart)); } catch (Exception e) { LOG.info("Ignoring position: " + (curStart)); continue; } } } totalRead += read; in.seek(totalRead); if (totalRead > splitEnd) { break; } } startLens
Re: File Splits in Hadoop
On Wed, Dec 10, 2008 at 11:12 AM, amitsingh <[EMAIL PROTECTED]>wrote: > Hi, > > I am stuck with some questions based on following scenario. > > 1) Hadoop normally splits the input file and distributes the splits across > slaves(referred to as Psplits from now), in to chunks of 64 MB. > a) Is there Any way to specify split criteria so for example a huge 4 GB > file is split in to 40 odd files(Psplits) respecting record boundaries ? you can set mapred.min.split.size in jobConf you can set its value greater than block size and hence can force a split to be larger than block size. However, this might result into splits having data blocks that are not local. > > b) Is it even required that these physical splits(Psplits) obey record > boundaries ? > > 2) We can get locations of these Psplits on HDFS as follows > BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); > //FileInputFormat line 273 > In FileInputFormat, for each blkLocations(Psplit) multiple logical > splits(referred to as Lsplits from now) are created based on hueristic for > number of mappers. > > Q) How is following situation handled in TextInputFormat which reads line > by line, > i) Input File is split as described in step 1 in more than 2 parts > ii) Suppose there is a line of text which starts near end of Psplit-i and > end in Psplit-i+1 (say Psplit2 and Psplit3) > iii) Which mapper gets this line spanning multiple Psplits(mapper_i or > mapper_i+1) > iv) I went through the FileInputFormat code, Lsplits are done only for a > particular pSplit not across pSplit. Why so ? > > Q) In short, If one has to read arbitary objects(not line), how does one > handle records which are partially in one PSplit and partially in other. > I am working on this as well and not found exact answer, but in my view mapper_i should handle the line / record which is partially in one split and partially in other split. The mapper_i+1 should first seek beginning of new record (line in this case) and start processing from there. Someone from Hadoop core team please correct me if this is wrong and fill in details. Thanks, Taran > --Amit > > > >
File Splits in Hadoop
Hi, I am stuck with some questions based on following scenario. 1) Hadoop normally splits the input file and distributes the splits across slaves(referred to as Psplits from now), in to chunks of 64 MB. a) Is there Any way to specify split criteria so for example a huge 4 GB file is split in to 40 odd files(Psplits) respecting record boundaries ? b) Is it even required that these physical splits(Psplits) obey record boundaries ? 2) We can get locations of these Psplits on HDFS as follows BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //FileInputFormat line 273 In FileInputFormat, for each blkLocations(Psplit) multiple logical splits(referred to as Lsplits from now) are created based on hueristic for number of mappers. Q) How is following situation handled in TextInputFormat which reads line by line, i) Input File is split as described in step 1 in more than 2 parts ii) Suppose there is a line of text which starts near end of Psplit-i and end in Psplit-i+1 (say Psplit2 and Psplit3) iii) Which mapper gets this line spanning multiple Psplits(mapper_i or mapper_i+1) iv) I went through the FileInputFormat code, Lsplits are done only for a particular pSplit not across pSplit. Why so ? Q) In short, If one has to read arbitary objects(not line), how does one handle records which are partially in one PSplit and partially in other. --Amit