Hi, I wrote a custom InputFormat for parsing through the Enron Email corpus which is attached in the file named EmailInputFormat
I have attached the code in a text file with the sample input mail also attached as a text document The EmailClass extends Writable and implements all the methods needed to be implemented and also contains an initiate function to initialize the values in that class. This initiate method looks is written in the EmailClass.java The above method is called by nextKeyValue method which is written in the EmailRecordReader.txt ------------------------------------ Question: 1. Is it a feasible to build large custom objects within nextKeyValue() to run in Hadoop? 2. MR program which does a simple task of emitting message-id and from field email-id from enron corpus of 6 lakh emails merged into one file (174 MB) takes around 50 minutes on a pseudo node cluster. This is very very slow. Please help me in this aspect too. 3. Can static field of value in EMailRecordReader help in this situation? Thanks in advance, Varad. ------------------------------------ Varad Meru| Software Engineer varad_m...@persistent.co.in Persistent Systems and Solution Ltd. | Partners in Innovation | www.persistentsys.com DISCLAIMER ========== This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
package EnronAssignment.src.enronassignment3.input; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.io.Writable; /** * @author Varad Meru * @organisation Persistent Systems and Solutions Ltd. * @see EnronCorpus * @category Email Analysis using Hadoop * @performs This class specifies our e-mail object. Thus becoming a standard * format which will be used in our programs future extensions and * with other kind of data that we get with our * */ public class EmailClass implements Writable { private String originalSeparator = "Original Message"; private String forwardedSeparator = "Forwarded by"; private String startTag = "<message-id>"; private String endTag = "</message-id>"; private String noInput = "NO_INPUT_DATA"; public String messageID = noInput; public String date = noInput; public String fromEmailID = noInput; public List<String> toEmailID; public String subject = noInput; public List<String> ccEmailID; public double MIMEVersion = 1.0; public String content_Type = noInput; public String content_Transfer_Encoding = noInput; public List<String> bccEmailID; public String xfromEmailID = noInput; public List<String> xtoEmailID; public List<String> xccEmailID; public List<String> xbccEmailID; public String xFolder = noInput; public String xOrigin = noInput; public String xFileName = noInput; public String messageContent = ""; public List<String> attachedFiles; public String forwardedAttachement = noInput; public String originalMessage = noInput; /** * Default Constructor */ public EmailClass() { } // A constructor with Fields /** * @param messageID * @param date * @param fromEmailID * @param toEmailID * @param subject * @param ccEmailID * @param contentType * @param contentTransferEncoding * @param bccEmailID * @param xfromEmailID * @param xtoEmailID * @param xccEmailID * @param xbccEmailID * @param xFolder * @param xOrgin * @param xFileName * @param messageContent * @param attachedFiles */ public EmailClass(String messageID, String date, String fromEmailID, List<String> toEmailID, String subject, List<String> ccEmailID, String contentType, String contentTransferEncoding, List<String> bccEmailID, String xfromEmailID, List<String> xtoEmailID, List<String> xccEmailID, List<String> xbccEmailID, String xFolder, String xOrgin, String xFileName, String messageContent, List<String> attachedFiles) { this.messageID = messageID; this.date = date; this.fromEmailID = fromEmailID; this.toEmailID = toEmailID; this.subject = subject; this.ccEmailID = ccEmailID; content_Type = contentType; content_Transfer_Encoding = contentTransferEncoding; this.bccEmailID = bccEmailID; this.xfromEmailID = xfromEmailID; this.xtoEmailID = xtoEmailID; this.xccEmailID = xccEmailID; this.xbccEmailID = xbccEmailID; this.xFolder = xFolder; this.xOrigin = xOrgin; this.xFileName = xFileName; this.messageContent = messageContent; this.attachedFiles = attachedFiles; } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) */ @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub String tempString; String[] splits = null; if ((tempString = in.readLine()) == startTag) { // Message ID tempString = in.readLine(); splits = tempString.split(":"); this.messageID = splits[1]; // Date of email tempString = in.readLine(); splits = tempString.split(":"); this.date = splits[1]; // From email id tempString = in.readLine(); splits = tempString.split(":"); this.fromEmailID = splits[1]; // for getting the "To" list { tempString = in.readLine(); splits = tempString.split(":"); String toList = splits[1]; while (!(tempString = in.readLine()).contains("Subject:")) { toList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(toList, ","); this.toEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.toEmailID.add(stringTokenizer.nextToken()); // for inserting all the emails in the "To" string into the } } // For getting the subject splits = tempString.split(":"); this.subject = splits[1]; // for getting the Cc list if ((tempString = in.readLine()).contains("Cc:")) { splits = tempString.split(":"); String ccList = splits[1]; while (!(tempString = in.readLine()).contains("Mime-Version:")) { ccList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(ccList, ","); this.ccEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.ccEmailID.add(stringTokenizer.nextToken()); // for inserting all the emails in the "To" string into the } } // Content-Type of the Email tempString = in.readLine(); // skipped MIME-Version splits = tempString.split(":"); this.content_Type = splits[1]; // Content-Transfer-Encoding of the Email tempString = in.readLine(); splits = tempString.split(":"); this.content_Transfer_Encoding = splits[1]; // get the Bcc list if ((tempString = in.readLine()).contains("Bcc:")) { splits = tempString.split(":"); String bccList = splits[1]; while (!(tempString = in.readLine()).contains("X-From:")) { bccList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(bccList, ","); this.bccEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.bccEmailID.add(stringTokenizer.nextToken()); } } // for getting the xfrom splits = tempString.split(":"); this.xfromEmailID = splits[1]; // for getting xto { tempString = in.readLine(); splits = tempString.split(":"); String xtoList = splits[1]; while (!(tempString = in.readLine()).contains("X-cc")) { xtoList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(xtoList, ","); this.xtoEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.xtoEmailID.add(stringTokenizer.nextToken()); } } { splits = tempString.split(":"); String xccList = splits[1]; while (!(tempString = in.readLine()).contains("X-bcc")) { xccList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(xccList, ","); this.xccEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.xccEmailID.add(stringTokenizer.nextToken()); } } { splits = tempString.split(":"); String xbccList = splits[1]; while (!(tempString = in.readLine()).contains("X-Folder")) { xbccList += tempString; // got a whole line of To's in one line } StringTokenizer stringTokenizer = new StringTokenizer(xbccList, ","); this.xbccEmailID = new ArrayList<String>(); while (stringTokenizer.hasMoreTokens()) { this.xbccEmailID.add(stringTokenizer.nextToken()); } } // for getting the xFolder splits = tempString.split(":"); this.xFolder = splits[1]; // xOrigin tempString = in.readLine(); splits = tempString.split(":"); this.xOrigin = splits[1]; // xFileName tempString = in.readLine(); splits = tempString.split(":"); this.xFileName = splits[1]; // get MessageContent tempString = in.readLine(); String tempMessageContent = null; while (tempString != originalSeparator || tempString != forwardedSeparator || tempString != endTag) { tempMessageContent += tempString.trim(); tempString = in.readLine(); } this.messageContent = tempMessageContent; /* * insert into the original attachment file - in the case of reply * to an original email */ if (tempString.contains(originalSeparator)) { String tempRepliedMail = null; while (!((tempString = in.readLine()) != endTag)) { tempRepliedMail += tempString; // got a whole line of To's in one line } this.originalMessage = tempRepliedMail; } if (tempString.contains(forwardedSeparator)) { String tempForwardedMail = null; while (!((tempString = in.readLine()) != endTag)) { tempForwardedMail += tempString; // got a whole line of To's in one line } this.forwardedAttachement = tempForwardedMail; } if (tempString.contains(endTag)) { // Do Nothing ... } } } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(messageID); out.writeUTF(date); out.writeUTF(fromEmailID); out.writeUTF(writeList(toEmailID)); out.writeUTF(subject); // Not in all the mails if (ccEmailID != null) out.writeUTF(writeList(ccEmailID)); out.writeDouble(MIMEVersion); out.writeUTF(content_Type); out.writeUTF(content_Transfer_Encoding); // Not in all the mails if (bccEmailID != null) out.writeUTF(writeList(bccEmailID)); out.writeUTF(xfromEmailID); out.writeUTF(writeList(xtoEmailID)); out.writeUTF(writeList(xccEmailID)); out.writeUTF(writeList(xbccEmailID)); out.writeUTF(xFolder); out.writeUTF(xOrigin); out.writeUTF(xFileName); out.writeUTF(messageContent); out.writeUTF(writeList(attachedFiles)); out.writeUTF(forwardedAttachement); out.writeUTF(originalMessage); } /** * @param inputStringList * @return tempString */ private String writeList(List<String> inputStringList) { String tempString = ""; for (String string : inputStringList) { tempString += string + ","; } tempString = tempString.substring(0, tempString.length() - 2); return tempString; } /** * @param inputData * @param i * @param length * @throws IOException */ public void initiate(byte[] inputData, int i, int length) throws IOException { String oneEmail = new String(inputData); // One whole mail put into one string containing \n as its linebreaker StringTokenizer stringTokenizer = new StringTokenizer(oneEmail, "\n"); // tokenizer is used to fetch the mail contents and store into our // object String tempString; String[] splits; String[] splits1; // It'll give us separate lines /* * *************************************************** * Only the message id is being read ... try to build the loop inside * the if rather than the other way round * *************************************************** */ tempString = stringTokenizer.nextToken(); // tempString holds the start tag now inside it ... while (stringTokenizer.hasMoreTokens()) { // skipped the start tag for the first time and then read on tempString = stringTokenizer.nextToken(); // get line after line in the mail between the <message-id> and // the </message-id> and parse through it to make sense of the // mail. The following parser parser through each tag and stores // in the member variables of the class // Finding if the line contains a Message field if (tempString.contains("Message-ID:")) { splits = tempString.split(":"); this.messageID = splits[1]; } if (tempString.contains("Date:")) { splits = tempString.split(":"); this.date = splits[1]; } if (tempString.contains("From:") && !tempString.contains("-From")) { splits = tempString.split(":"); this.fromEmailID = splits[1]; } if (tempString.contains("To:") && !tempString.contains("-To:")) { splits = tempString.split(":"); String toList = splits[1]; whileLoop: while (true) { if ((tempString.contains("Subject:")) || (tempString.contains("Mime-Version:")) || (tempString.contains("Cc:")) || (tempString.contains("File:")) && !tempString.contains("-cc:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); toList += tempString; } StringTokenizer stringTokenizer1 = new StringTokenizer(toList, ","); this.toEmailID = new ArrayList<String>(); while (stringTokenizer1.hasMoreTokens()) { this.toEmailID.add(stringTokenizer1.nextToken()); } } if (tempString.contains("Subject:")) { splits1 = tempString.split(":"); this.subject = splits1[1]; } if (tempString.contains("Cc:") && !tempString.contains("-cc:")) { splits1 = tempString.split(":"); String ccList = splits1[1]; whileLoop: while (true) { if (tempString.contains("Mime-Version:") || tempString.contains("Subject:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); ccList += tempString; } String[] splitsString = ccList.split(","); this.ccEmailID = new ArrayList<String>(); for (int j = 0; j < splitsString.length; j++) { this.ccEmailID.add(splitsString[j]); } for (int j = 0; j < splitsString.length; j++) { } } if (tempString.contains("Content-Type:")) { splits = tempString.split(":"); this.content_Type = splits[1]; } if (tempString.contains("Content-Transfer-Encoding:")) { splits = tempString.split(":"); this.content_Transfer_Encoding = splits[1]; } if (tempString.contains("Bcc:") && !tempString.contains("-bcc:")) { splits1 = tempString.split(":"); String bccList = splits1[1]; whileLoop: while (true) { if (tempString.contains("X-From:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); bccList += tempString; } StringTokenizer stringTokenizer3 = new StringTokenizer(bccList, ","); this.bccEmailID = new ArrayList<String>(); while (stringTokenizer3.hasMoreTokens()) { this.bccEmailID.add(stringTokenizer3.nextToken()); } } if (tempString.contains("X-From:")) { splits1 = tempString.split(":"); this.xfromEmailID = splits1[1]; } if (tempString.contains("X-To:")) { splits1 = tempString.split(":"); String xToList = splits1[1]; whileLoop: while (true) { if (tempString.contains("X-cc:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); xToList += tempString; } StringTokenizer stringTokenizer3 = new StringTokenizer(xToList, ">,"); this.xtoEmailID = new ArrayList<String>(); while (stringTokenizer3.hasMoreTokens()) { this.xtoEmailID.add(stringTokenizer3.nextToken()); } } if (tempString.contains("X-cc:")) { splits1 = tempString.split(":"); String xCcList = splits1[1]; whileLoop: while (true) { if (tempString.contains("X-bcc:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); xCcList += tempString; } StringTokenizer stringTokenizer3 = new StringTokenizer(xCcList, ","); this.xccEmailID = new ArrayList<String>(); while (stringTokenizer3.hasMoreTokens()) { this.xccEmailID.add(stringTokenizer3.nextToken()); } } if (tempString.contains("X-bcc:")) { splits1 = tempString.split(":"); String xBccList = splits1[1]; whileLoop: while (true) { if (tempString.contains("X-Folder:")) { break whileLoop; } tempString = stringTokenizer.nextToken(); xBccList += tempString; } StringTokenizer stringTokenizer3 = new StringTokenizer( xBccList, ","); this.xbccEmailID = new ArrayList<String>(); while (stringTokenizer3.hasMoreTokens()) { this.xbccEmailID.add(stringTokenizer3.nextToken()); } } if (tempString.contains("X-Folder:")) { splits1 = tempString.split(":"); this.xFolder = splits1[1]; } if (tempString.contains("X-Origin:")) { splits = tempString.split(":"); this.xOrigin = splits[1]; } if (tempString.contains("X-FileName:")) { splits = tempString.split(":"); this.xFileName = splits[1]; } if (tempString.contains(forwardedSeparator)) { while (stringTokenizer.hasMoreTokens()) { tempString = stringTokenizer.nextToken(); this.forwardedAttachement += tempString; } } if (tempString.contains(originalSeparator)) { while (stringTokenizer.hasMoreTokens()) { tempString = stringTokenizer.nextToken(); this.originalMessage += tempString; } } // if we dont get any hit .. then add it as message content this.messageContent += tempString; } /* * insert into the original attachment file - in the case of reply to an * original email */ if (tempString.contains(originalSeparator)) { String tempRepliedMail = null; while (!((tempString = stringTokenizer.nextToken()) != endTag)) { tempRepliedMail += tempString; // got a whole line of To's in one line } this.originalMessage = tempRepliedMail; } if (tempString.contains(forwardedSeparator)) { String tempForwardedMail = null; while (!((tempString = stringTokenizer.nextToken()) != endTag)) { tempForwardedMail += tempString; // got a whole line of To's in one line } this.forwardedAttachement = tempForwardedMail; } } }
package EnronAssignment.src.enronassignment3.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class EmailInputFormat extends FileInputFormat<LongWritable, EmailClass> { @Override public RecordReader<LongWritable, EmailClass> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(split.toString()); return new EmailRecordReader(); // It returns the object of the EmailRecordReader when called by the // hadoop JVM for reading each record. } }
package EnronAssignment.src.enronassignment3.input; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class EmailRecordReader extends RecordReader<LongWritable, EmailClass> { private byte[] startTag; private byte[] endTag; public static final String stTag = "<message-id>"; public static final String enTag = "</message-id>"; private long start; private long end; private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private LongWritable key = new LongWritable(); static private EmailClass value = new EmailClass(); @Override public void close() throws IOException { fsin.close(); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public EmailClass getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return (fsin.getPos() - start) / (float) (end - start); } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) inputSplit; startTag = stTag.getBytes("utf-8"); endTag = enTag.getBytes("utf-8"); start = fileSplit.getStart(); end = start + fileSplit.getLength(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(taskAttemptContext .getConfiguration()); fsin = fs.open(fileSplit.getPath()); fsin.seek(start); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (fsin.getPos() < end) { if (readUntilMatch(startTag, false)) { try { buffer.write(startTag); if (readUntilMatch(endTag, true)) { key.set(fsin.getPos()); System.out.println("Call to nextKeyValue"); value.initiate(buffer.getData(), 0, buffer.getLength()); return true; } } finally { buffer.reset(); } } } return false; } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); // end of file: if (b == -1) return false; // save to buffer: if (withinBlock) buffer.write(b); // check if we're matching: if (b == match[i]) { i++; if (i >= match.length) return true; } else i = 0; // see if we've passed the stop point: if (!withinBlock && i == 0 && fsin.getPos() >= end) return false; } } }
package EnronAssignment.src.enronassignment3.reduce; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class EmailReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce( Text _key, java.lang.Iterable<Text> _values, org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context output) throws IOException, InterruptedException { for (Text text : _values) { output.write(_key, text); } } }
package EnronAssignment.src.enronassignment3.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import EnronAssignment.src.enronassignment3.input.EmailClass; public class EmailMapper extends Mapper<LongWritable, EmailClass, Text, Text> { @Override protected void map( LongWritable key, EmailClass value, org.apache.hadoop.mapreduce.Mapper<LongWritable, EmailClass, Text, Text>.Context context) throws IOException, InterruptedException { context.write(new Text(value.messageID), new Text(value.fromEmailID)); // System.out.println(value.messageID + " " + value.fromEmailID); } }
package EnronAssignment.src.enronassignment3.driver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import EnronAssignment.src.enronassignment3.input.EmailInputFormat; import EnronAssignment.src.enronassignment3.mapper.EmailMapper; import EnronAssignment.src.enronassignment3.reduce.EmailReducer; public class DriverClassForEmailObj { // ////////////// // MAIN METHOD // // ////////////// /** * @param args * @throws InterruptedException * @throws ClassNotFoundException * @throws IOException */ public static void main(String[] args) throws InterruptedException, ClassNotFoundException, IOException { /* * Setting the Configurations for the job */ Configuration configuration = new Configuration(); String[] otherArgs = null; try { otherArgs = new GenericOptionsParser(configuration, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: lineitem <in1> <out>"); System.exit(2); } // Just a user message System.out.println("Starting Job: enron processing"); // configuring the threshold when reducer should start collecting // the intermediate <K,V> pairs configuration.set("mapred.reduce.slowstart.completed.maps", "0.10"); // initialising the job Job job = new Job(configuration, "enron"); /* * Setting all the classes required by the job */ job.setJarByClass(DriverClassForEmailObj.class); // Setting the input format for the MR job job.setInputFormatClass(EmailInputFormat.class); //job.setReducerClass(EmailReducer.class); job.setMapperClass(EmailMapper.class); job.setOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); // setting the number of reduce tasks job.setNumReduceTasks(1); /* * Setting the input and output paths used for the input directories * and the output directory */ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Submitting the job and waiting for the job to get completed by // pinging the master-node System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { System.err.println("error in File IO"); e.printStackTrace(); } catch (Exception e) { System.err.println("error in Code"); e.printStackTrace(); } } }