Hi,

I wrote a custom InputFormat for parsing through the Enron Email corpus which 
is attached in the file named EmailInputFormat

I have attached the code in a text file with the sample input mail also 
attached as a text document

The EmailClass extends Writable and implements all the methods needed to be 
implemented and also contains an initiate function to initialize the values in 
that class.

This initiate method looks is written in the EmailClass.java

The above method is called by nextKeyValue method which is written in the 
EmailRecordReader.txt

------------------------------------
Question:
1. Is it a feasible to build large custom objects within nextKeyValue() to run 
in Hadoop?
2. MR program which does a simple task of emitting message-id and from field 
email-id from enron corpus of 6 lakh emails merged into one file (174 MB) takes 
around 50 minutes on a pseudo node cluster. This is very very slow.
Please help me in this aspect too.
3. Can static field of value in EMailRecordReader help in this situation?


Thanks in advance,
Varad.


------------------------------------
Varad Meru| Software Engineer
varad_m...@persistent.co.in
Persistent Systems and Solution Ltd. | Partners in Innovation | 
www.persistentsys.com
DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.

package EnronAssignment.src.enronassignment3.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Writable;

/**
 * @author Varad Meru
 * @organisation Persistent Systems and Solutions Ltd.
 * @see EnronCorpus
 * @category Email Analysis using Hadoop
 * @performs This class specifies our e-mail object. Thus becoming a standard
 *           format which will be used in our programs future extensions and
 *           with other kind of data that we get with our
 * 
 */
public class EmailClass implements Writable {

        private String originalSeparator = "Original Message";
        private String forwardedSeparator = "Forwarded by";
        private String startTag = "<message-id>";
        private String endTag = "</message-id>";
        private String noInput = "NO_INPUT_DATA";

        public String messageID = noInput;
        public String date = noInput;
        public String fromEmailID = noInput;
        public List<String> toEmailID;
        public String subject = noInput;
        public List<String> ccEmailID;
        public double MIMEVersion = 1.0;
        public String content_Type = noInput;
        public String content_Transfer_Encoding = noInput;
        public List<String> bccEmailID;
        public String xfromEmailID = noInput;
        public List<String> xtoEmailID;
        public List<String> xccEmailID;
        public List<String> xbccEmailID;
        public String xFolder = noInput;
        public String xOrigin = noInput;
        public String xFileName = noInput;
        public String messageContent = "";
        public List<String> attachedFiles;
        public String forwardedAttachement = noInput;
        public String originalMessage = noInput;

        /**
         * Default Constructor
         */
        public EmailClass() {

        }

        // A constructor with Fields
        /**
         * @param messageID
         * @param date
         * @param fromEmailID
         * @param toEmailID
         * @param subject
         * @param ccEmailID
         * @param contentType
         * @param contentTransferEncoding
         * @param bccEmailID
         * @param xfromEmailID
         * @param xtoEmailID
         * @param xccEmailID
         * @param xbccEmailID
         * @param xFolder
         * @param xOrgin
         * @param xFileName
         * @param messageContent
         * @param attachedFiles
         */
        public EmailClass(String messageID, String date, String fromEmailID,
                        List<String> toEmailID, String subject, List<String> 
ccEmailID,
                        String contentType, String contentTransferEncoding,
                        List<String> bccEmailID, String xfromEmailID,
                        List<String> xtoEmailID, List<String> xccEmailID,
                        List<String> xbccEmailID, String xFolder, String xOrgin,
                        String xFileName, String messageContent, List<String> 
attachedFiles) {
                this.messageID = messageID;
                this.date = date;
                this.fromEmailID = fromEmailID;
                this.toEmailID = toEmailID;
                this.subject = subject;
                this.ccEmailID = ccEmailID;
                content_Type = contentType;
                content_Transfer_Encoding = contentTransferEncoding;
                this.bccEmailID = bccEmailID;
                this.xfromEmailID = xfromEmailID;
                this.xtoEmailID = xtoEmailID;
                this.xccEmailID = xccEmailID;
                this.xbccEmailID = xbccEmailID;
                this.xFolder = xFolder;
                this.xOrigin = xOrgin;
                this.xFileName = xFileName;
                this.messageContent = messageContent;
                this.attachedFiles = attachedFiles;
        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
         */
        @Override
        public void readFields(DataInput in) throws IOException {
                // TODO Auto-generated method stub
                String tempString;
                String[] splits = null;
                if ((tempString = in.readLine()) == startTag) {

                        // Message ID
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.messageID = splits[1];

                        // Date of email
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.date = splits[1];

                        // From email id
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.fromEmailID = splits[1];

                        // for getting the "To" list
                        {
                                tempString = in.readLine();
                                splits = tempString.split(":");
                                String toList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("Subject:")) {
                                        toList += tempString;
                                        // got a whole line of To's in one line
                                }

                                StringTokenizer stringTokenizer = new 
StringTokenizer(toList,
                                                ",");
                                this.toEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.toEmailID.add(stringTokenizer.nextToken());
                                        // for inserting all the emails in the 
"To" string into the
                                }
                        }

                        // For getting the subject
                        splits = tempString.split(":");
                        this.subject = splits[1];

                        // for getting the Cc list
                        if ((tempString = in.readLine()).contains("Cc:")) {
                                splits = tempString.split(":");
                                String ccList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("Mime-Version:")) {
                                        ccList += tempString;
                                        // got a whole line of To's in one line
                                }

                                StringTokenizer stringTokenizer = new 
StringTokenizer(ccList,
                                                ",");
                                this.ccEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.ccEmailID.add(stringTokenizer.nextToken());
                                        // for inserting all the emails in the 
"To" string into the
                                }
                        }

                        // Content-Type of the Email
                        tempString = in.readLine(); // skipped MIME-Version
                        splits = tempString.split(":");
                        this.content_Type = splits[1];

                        // Content-Transfer-Encoding of the Email
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.content_Transfer_Encoding = splits[1];

                        // get the Bcc list
                        if ((tempString = in.readLine()).contains("Bcc:")) {
                                splits = tempString.split(":");
                                String bccList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("X-From:")) {
                                        bccList += tempString;
                                        // got a whole line of To's in one line
                                }

                                StringTokenizer stringTokenizer = new 
StringTokenizer(bccList,
                                                ",");
                                this.bccEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.bccEmailID.add(stringTokenizer.nextToken());
                                }
                        }

                        // for getting the xfrom
                        splits = tempString.split(":");
                        this.xfromEmailID = splits[1];

                        // for getting xto
                        {
                                tempString = in.readLine();
                                splits = tempString.split(":");
                                String xtoList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("X-cc")) {
                                        xtoList += tempString;
                                        // got a whole line of To's in one line
                                }

                                StringTokenizer stringTokenizer = new 
StringTokenizer(xtoList,
                                                ",");
                                this.xtoEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.xtoEmailID.add(stringTokenizer.nextToken());
                                }
                        }
                        {
                                splits = tempString.split(":");
                                String xccList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("X-bcc")) {
                                        xccList += tempString;
                                        // got a whole line of To's in one line
                                }

                                StringTokenizer stringTokenizer = new 
StringTokenizer(xccList,
                                                ",");
                                this.xccEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.xccEmailID.add(stringTokenizer.nextToken());
                                }
                        }
                        {
                                splits = tempString.split(":");
                                String xbccList = splits[1];

                                while (!(tempString = 
in.readLine()).contains("X-Folder")) {
                                        xbccList += tempString;
                                        // got a whole line of To's in one line
                                }
                                StringTokenizer stringTokenizer = new 
StringTokenizer(xbccList,
                                                ",");
                                this.xbccEmailID = new ArrayList<String>();
                                while (stringTokenizer.hasMoreTokens()) {
                                        
this.xbccEmailID.add(stringTokenizer.nextToken());
                                }
                        }

                        // for getting the xFolder
                        splits = tempString.split(":");
                        this.xFolder = splits[1];

                        // xOrigin
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.xOrigin = splits[1];

                        // xFileName
                        tempString = in.readLine();
                        splits = tempString.split(":");
                        this.xFileName = splits[1];

                        // get MessageContent
                        tempString = in.readLine();
                        String tempMessageContent = null;
                        while (tempString != originalSeparator
                                        || tempString != forwardedSeparator || 
tempString != endTag) {
                                tempMessageContent += tempString.trim();
                                tempString = in.readLine();
                        }
                        this.messageContent = tempMessageContent;

                        /*
                         * insert into the original attachment file - in the 
case of reply
                         * to an original email
                         */
                        if (tempString.contains(originalSeparator)) {
                                String tempRepliedMail = null;
                                while (!((tempString = in.readLine()) != 
endTag)) {
                                        tempRepliedMail += tempString;
                                        // got a whole line of To's in one line
                                }
                                this.originalMessage = tempRepliedMail;
                        }

                        if (tempString.contains(forwardedSeparator)) {
                                String tempForwardedMail = null;
                                while (!((tempString = in.readLine()) != 
endTag)) {
                                        tempForwardedMail += tempString;
                                        // got a whole line of To's in one line
                                }
                                this.forwardedAttachement = tempForwardedMail;
                        }

                        if (tempString.contains(endTag)) {
                                // Do Nothing ...
                        }

                }

        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
         */
        @Override
        public void write(DataOutput out) throws IOException {
                out.writeUTF(messageID);
                out.writeUTF(date);
                out.writeUTF(fromEmailID);
                out.writeUTF(writeList(toEmailID));
                out.writeUTF(subject);
                // Not in all the mails
                if (ccEmailID != null)
                        out.writeUTF(writeList(ccEmailID));
                out.writeDouble(MIMEVersion);
                out.writeUTF(content_Type);
                out.writeUTF(content_Transfer_Encoding);
                // Not in all the mails
                if (bccEmailID != null)
                        out.writeUTF(writeList(bccEmailID));
                out.writeUTF(xfromEmailID);
                out.writeUTF(writeList(xtoEmailID));
                out.writeUTF(writeList(xccEmailID));
                out.writeUTF(writeList(xbccEmailID));
                out.writeUTF(xFolder);
                out.writeUTF(xOrigin);
                out.writeUTF(xFileName);
                out.writeUTF(messageContent);
                out.writeUTF(writeList(attachedFiles));
                out.writeUTF(forwardedAttachement);
                out.writeUTF(originalMessage);
        }

        /**
         * @param inputStringList
         * @return tempString
         */
        private String writeList(List<String> inputStringList) {
                String tempString = "";
                for (String string : inputStringList) {
                        tempString += string + ",";
                }
                tempString = tempString.substring(0, tempString.length() - 2);
                return tempString;
        }

        /**
         * @param inputData
         * @param i
         * @param length
         * @throws IOException
         */
        public void initiate(byte[] inputData, int i, int length)
                        throws IOException {

                String oneEmail = new String(inputData);
                // One whole mail put into one string containing \n as its 
linebreaker

                StringTokenizer stringTokenizer = new StringTokenizer(oneEmail, 
"\n");
                // tokenizer is used to fetch the mail contents and store into 
our
                // object

                String tempString;
                String[] splits;
                String[] splits1;
                // It'll give us separate lines

                /*
                 * ***************************************************
                 * Only the message id is being read ... try to build the loop 
inside
                 * the if rather than the other way round
                 * ***************************************************
                 */
                tempString = stringTokenizer.nextToken();
                // tempString holds the start tag now inside it ...

                while (stringTokenizer.hasMoreTokens()) {
                        // skipped the start tag for the first time and then 
read on
                        tempString = stringTokenizer.nextToken();

                        // get line after line in the mail between the 
<message-id> and
                        // the </message-id> and parse through it to make sense 
of the
                        // mail. The following parser parser through each tag 
and stores
                        // in the member variables of the class

                        // Finding if the line contains a Message field
                        if (tempString.contains("Message-ID:")) {
                                splits = tempString.split(":");
                                this.messageID = splits[1];
                        }

                        if (tempString.contains("Date:")) {
                                splits = tempString.split(":");
                                this.date = splits[1];
                        }

                        if (tempString.contains("From:") && 
!tempString.contains("-From")) {
                                splits = tempString.split(":");
                                this.fromEmailID = splits[1];
                        }

                        if (tempString.contains("To:") && 
!tempString.contains("-To:")) {
                                splits = tempString.split(":");
                                String toList = splits[1];

                                whileLoop: while (true) {
                                        if ((tempString.contains("Subject:"))
                                                        || 
(tempString.contains("Mime-Version:"))
                                                        || 
(tempString.contains("Cc:"))
                                                        || 
(tempString.contains("File:"))
                                                        && 
!tempString.contains("-cc:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        toList += tempString;
                                }
                                StringTokenizer stringTokenizer1 = new 
StringTokenizer(toList,
                                                ",");
                                this.toEmailID = new ArrayList<String>();
                                while (stringTokenizer1.hasMoreTokens()) {
                                        
this.toEmailID.add(stringTokenizer1.nextToken());
                                }
                        }

                        if (tempString.contains("Subject:")) {
                                splits1 = tempString.split(":");
                                this.subject = splits1[1];
                        }

                        if (tempString.contains("Cc:") && 
!tempString.contains("-cc:")) {
                                splits1 = tempString.split(":");
                                String ccList = splits1[1];

                                whileLoop: while (true) {
                                        if (tempString.contains("Mime-Version:")
                                                        || 
tempString.contains("Subject:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        ccList += tempString;
                                }
                                String[] splitsString = ccList.split(",");
                                this.ccEmailID = new ArrayList<String>();
                                for (int j = 0; j < splitsString.length; j++) {
                                        this.ccEmailID.add(splitsString[j]);
                                }
                                for (int j = 0; j < splitsString.length; j++) {

                                }
                        }

                        if (tempString.contains("Content-Type:")) {
                                splits = tempString.split(":");
                                this.content_Type = splits[1];
                        }

                        if (tempString.contains("Content-Transfer-Encoding:")) {
                                splits = tempString.split(":");
                                this.content_Transfer_Encoding = splits[1];
                        }

                        if (tempString.contains("Bcc:") && 
!tempString.contains("-bcc:")) {
                                splits1 = tempString.split(":");
                                String bccList = splits1[1];

                                whileLoop: while (true) {
                                        if (tempString.contains("X-From:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        bccList += tempString;
                                }

                                StringTokenizer stringTokenizer3 = new 
StringTokenizer(bccList,
                                                ",");
                                this.bccEmailID = new ArrayList<String>();
                                while (stringTokenizer3.hasMoreTokens()) {
                                        
this.bccEmailID.add(stringTokenizer3.nextToken());
                                }

                        }

                        if (tempString.contains("X-From:")) {
                                splits1 = tempString.split(":");
                                this.xfromEmailID = splits1[1];
                        }

                        if (tempString.contains("X-To:")) {
                                splits1 = tempString.split(":");
                                String xToList = splits1[1];

                                whileLoop: while (true) {
                                        if (tempString.contains("X-cc:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        xToList += tempString;
                                }

                                StringTokenizer stringTokenizer3 = new 
StringTokenizer(xToList,
                                                ">,");
                                this.xtoEmailID = new ArrayList<String>();
                                while (stringTokenizer3.hasMoreTokens()) {
                                        
this.xtoEmailID.add(stringTokenizer3.nextToken());
                                }
                        }

                        if (tempString.contains("X-cc:")) {
                                splits1 = tempString.split(":");
                                String xCcList = splits1[1];

                                whileLoop: while (true) {
                                        if (tempString.contains("X-bcc:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        xCcList += tempString;
                                }

                                StringTokenizer stringTokenizer3 = new 
StringTokenizer(xCcList,
                                                ",");
                                this.xccEmailID = new ArrayList<String>();
                                while (stringTokenizer3.hasMoreTokens()) {
                                        
this.xccEmailID.add(stringTokenizer3.nextToken());
                                }
                        }

                        if (tempString.contains("X-bcc:")) {
                                splits1 = tempString.split(":");
                                String xBccList = splits1[1];

                                whileLoop: while (true) {
                                        if (tempString.contains("X-Folder:")) {
                                                break whileLoop;
                                        }
                                        tempString = 
stringTokenizer.nextToken();
                                        xBccList += tempString;
                                }

                                StringTokenizer stringTokenizer3 = new 
StringTokenizer(
                                                xBccList, ",");
                                this.xbccEmailID = new ArrayList<String>();
                                while (stringTokenizer3.hasMoreTokens()) {
                                        
this.xbccEmailID.add(stringTokenizer3.nextToken());
                                }
                        }

                        if (tempString.contains("X-Folder:")) {
                                splits1 = tempString.split(":");
                                this.xFolder = splits1[1];
                        }

                        if (tempString.contains("X-Origin:")) {
                                splits = tempString.split(":");
                                this.xOrigin = splits[1];
                        }

                        if (tempString.contains("X-FileName:")) {
                                splits = tempString.split(":");
                                this.xFileName = splits[1];
                        }

                        if (tempString.contains(forwardedSeparator)) {
                                while (stringTokenizer.hasMoreTokens()) {
                                        tempString = 
stringTokenizer.nextToken();
                                        this.forwardedAttachement += tempString;
                                }
                        }
                        if (tempString.contains(originalSeparator)) {
                                while (stringTokenizer.hasMoreTokens()) {
                                        tempString = 
stringTokenizer.nextToken();
                                        this.originalMessage += tempString;
                                }
                        }

                        // if we dont get any hit .. then add it as message 
content
                        this.messageContent += tempString;
                }

                /*
                 * insert into the original attachment file - in the case of 
reply to an
                 * original email
                 */
                if (tempString.contains(originalSeparator)) {
                        String tempRepliedMail = null;
                        while (!((tempString = stringTokenizer.nextToken()) != 
endTag)) {
                                tempRepliedMail += tempString;
                                // got a whole line of To's in one line
                        }
                        this.originalMessage = tempRepliedMail;
                }

                if (tempString.contains(forwardedSeparator)) {
                        String tempForwardedMail = null;
                        while (!((tempString = stringTokenizer.nextToken()) != 
endTag)) {
                                tempForwardedMail += tempString;
                                // got a whole line of To's in one line
                        }
                        this.forwardedAttachement = tempForwardedMail;
                }

        }
}
package EnronAssignment.src.enronassignment3.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class EmailInputFormat extends FileInputFormat<LongWritable, EmailClass> 
{
        @Override
        public RecordReader<LongWritable, EmailClass> createRecordReader(
                        InputSplit split, TaskAttemptContext context) throws 
IOException,
                        InterruptedException {
                context.setStatus(split.toString());
                return new EmailRecordReader();
                // It returns the object of the EmailRecordReader when called 
by the
                // hadoop JVM for reading each record.
        }
}
package EnronAssignment.src.enronassignment3.input;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class EmailRecordReader extends RecordReader<LongWritable, EmailClass> {

        private byte[] startTag;
        private byte[] endTag;
        public static final String stTag = "<message-id>";
        public static final String enTag = "</message-id>";

        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable key = new LongWritable();
        static private EmailClass value = new EmailClass();

        @Override
        public void close() throws IOException {
                fsin.close();
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
                        InterruptedException {
                // TODO Auto-generated method stub
                return key;
        }

        @Override
        public EmailClass getCurrentValue() throws IOException,
                        InterruptedException {
                return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
                return (fsin.getPos() - start) / (float) (end - start);
        }

        @Override
        public void initialize(InputSplit inputSplit,
                        TaskAttemptContext taskAttemptContext) throws 
IOException,
                        InterruptedException {
                FileSplit fileSplit = (FileSplit) inputSplit;
                startTag = stTag.getBytes("utf-8");
                endTag = enTag.getBytes("utf-8");
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();

                Path file = fileSplit.getPath();

                FileSystem fs = file.getFileSystem(taskAttemptContext
                                .getConfiguration());
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
                if (fsin.getPos() < end) {
                        if (readUntilMatch(startTag, false)) {
                                try {
                                        buffer.write(startTag);
                                        if (readUntilMatch(endTag, true)) {
                                                key.set(fsin.getPos());
                                                System.out.println("Call to 
nextKeyValue");
                                                
value.initiate(buffer.getData(), 0, buffer.getLength());
                                                return true;
                                        }
                                } finally {
                                        buffer.reset();
                                        
                                }
                        }
                }
                return false;
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
                        throws IOException {
                int i = 0;
                while (true) {
                        int b = fsin.read();
                        // end of file:
                        if (b == -1)
                                return false;
                        // save to buffer:
                        if (withinBlock)
                                buffer.write(b);

                        // check if we're matching:
                        if (b == match[i]) {
                                i++;
                                if (i >= match.length)
                                        return true;
                        } else
                                i = 0;
                        // see if we've passed the stop point:
                        if (!withinBlock && i == 0 && fsin.getPos() >= end)
                                return false;
                }
        }

}
package EnronAssignment.src.enronassignment3.reduce;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmailReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(
                        Text _key,
                        java.lang.Iterable<Text> _values,
                        org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, 
Text>.Context output)
                        throws IOException, InterruptedException {
                for (Text text : _values) {
                        output.write(_key, text);
                }
        }
}
package EnronAssignment.src.enronassignment3.mapper;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import EnronAssignment.src.enronassignment3.input.EmailClass;

public class EmailMapper extends Mapper<LongWritable, EmailClass, Text, Text> {

        @Override
        protected void map(
                        LongWritable key,
                        EmailClass value,
                        org.apache.hadoop.mapreduce.Mapper<LongWritable, 
EmailClass, Text, Text>.Context context)
                        throws IOException, InterruptedException {
                context.write(new Text(value.messageID), new 
Text(value.fromEmailID));
                // System.out.println(value.messageID + " " + 
value.fromEmailID);
        }
}
package EnronAssignment.src.enronassignment3.driver;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import EnronAssignment.src.enronassignment3.input.EmailInputFormat;
import EnronAssignment.src.enronassignment3.mapper.EmailMapper;
import EnronAssignment.src.enronassignment3.reduce.EmailReducer;

public class DriverClassForEmailObj {

        // //////////////
        // MAIN METHOD //
        // //////////////

        /**
         * @param args
         * @throws InterruptedException
         * @throws ClassNotFoundException
         * @throws IOException
         */
        public static void main(String[] args) throws InterruptedException,
                        ClassNotFoundException, IOException {
                /*
                 * Setting the Configurations for the job
                 */
                Configuration configuration = new Configuration();
                String[] otherArgs = null;

                try {
                        otherArgs = new GenericOptionsParser(configuration, 
args)
                                        .getRemainingArgs();
                        if (otherArgs.length != 2) {
                                System.err.println("Usage: lineitem <in1> 
<out>");
                                System.exit(2);
                        }

                        // Just a user message
                        System.out.println("Starting Job: enron processing");
                        // configuring the threshold when reducer should start 
collecting
                        // the intermediate <K,V> pairs
                        
configuration.set("mapred.reduce.slowstart.completed.maps", "0.10");
                        // initialising the job
                        Job job = new Job(configuration, "enron");

                        /*
                         * Setting all the classes required by the job
                         */

                        job.setJarByClass(DriverClassForEmailObj.class);
                        
                        // Setting the input format for the MR job
                        job.setInputFormatClass(EmailInputFormat.class);
                        
                        //job.setReducerClass(EmailReducer.class);
                        job.setMapperClass(EmailMapper.class);

                        job.setOutputValueClass(Text.class);
                        job.setOutputKeyClass(Text.class);

                        // setting the number of reduce tasks
                        job.setNumReduceTasks(1);

                        /*
                         * Setting the input and output paths used for the 
input directories
                         * and the output directory
                         */
                        FileInputFormat.addInputPath(job, new Path(args[0]));
                        FileOutputFormat.setOutputPath(job, new Path(args[1]));

                        // Submitting the job and waiting for the job to get 
completed by
                        // pinging the master-node
                        System.exit(job.waitForCompletion(true) ? 0 : 1);

                } catch (IOException e) {
                        System.err.println("error in File IO");
                        e.printStackTrace();
                } catch (Exception e) {
                        System.err.println("error in Code");
                        e.printStackTrace();
                }
        }
}

Reply via email to