I've been trying to figure out how to do a set difference in hadoop. I would like to take 2 file, and remove the values they have in common between them. Let's say I have two bags, 'students' and 'employees'. I want to find which students are just students, and which employees are just employees. So, an example:

Students:
(Jane)
(John)
(Dave)

Employees:
(Dave)
(Sue)
(Anne)

If I were to join these, I would get the students who are also employees, or: (Dave).

However, what I want is the distinct values:

Only_Student:
(Jane)
(John)

Only_Employee:
(Sue)
(Anne)


I was able to do this in pig, but I think I should be able to do it in one MapReduce pass. (With hadoop 20.1) I read from two files, and attached the file names as the values. (Students and Employees, in this case. My actually problem is on DNA, bacteria and viruses in this case.) Then I output from the reducer if I only get one value for a given key. However, I've had some real trouble figuring out MultipleOutput and the multiple inputs. I've attached my code. I'm getting this error, which is a total mystery to me: 09/12/02 22:33:52 INFO mapred.JobClient: Task Id : attempt_200911301448_0019_m_000000_2, Status : FAILED java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:807) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:504) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
       at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
       at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
       at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
       at org.apache.hadoop.mapred.Child.main(Child.java:170)


Thanks,
Jim
package org.myorg;


import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
//import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.JobConf;

public class DNAUnique {
 

    public static class DNAUniqueMapper 
	extends Mapper<Object, Text, Text, Text>
		implements Configurable  {

	private Text word = new Text();
	private Text location = new Text();
	
	private Configuration conf;
	private int kmerSize = 5;

	public Configuration getConf() {
	    return conf;
	}

	public void setConf(Configuration inConf) {
	    conf = inConf;
	}

	public void configure(Configuration conf) {
	    System.out.println("in configure");
	    kmerSize = conf.getInt("kmerSize",6);
	}

	public void map(Object key, Text value,  OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException {
	    configure(getConf());

	    FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
	    String fileName = fileSplit.getPath().getName();
	    if(fileName.contains("bact")) {
		location.set("b");
	    } else {
		location.set("v");
	    }

	    String line = value.toString();
	    StringTokenizer itr = new StringTokenizer(line.toLowerCase());
	    
	    word.set(itr.nextToken());
	    output.collect(word, location);
	    
	}
    }
 	
    public static class DNAUniqueReducer 
	extends Reducer<Text,Text,Text,Text> implements Configurable {
	private MultipleOutputs mos;
	private Configuration conf;

	public Configuration getConf() {
	    return conf;
	}

	public void setConf(Configuration inConf) {
	    conf = inConf;
	}
	public void configure(Configuration conf) {   
	    JobConf jconf = (JobConf) conf;
	    mos = new MultipleOutputs(jconf);
	}

	private Text space = new Text(" "); //Just some crap

	public void reduce(Text key, Iterable<Text> values,
			   OutputCollector output, Reporter reporter) throws IOException, InterruptedException {
	    configure(getConf());
	    int count = 0;
	    boolean isBact = false;
	    boolean isVirus = false;
	    for (Text val : values) {
		String location = val.toString();
		if(location.equals("b")) {
		    isBact = true;
		} else if(location.equals("v")) {
		    isVirus = true;
		}
		++count;
	    }

	    if(count == 1) {
		if(isBact) {
		    mos.getCollector("bacteria", reporter).collect<Text,Text>(key, space);
		}
		if(isVirus) {
		    mos.getCollector("virus", reporter).collect(key, space);
		}
	    }
	}
    }

    public static void main(String[] args) throws Exception {
	JobConf conf = new JobConf();
	//    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	//if (otherArgs.length != 2) {
	// System.err.println("Usage: wordcount <in> <out>");
	// System.exit(2);
	//}
	conf.setInt("kmerSize", 7);
	//	if(args.length > 2) {
	//    System.out.println("Setting kmer size " + args[2]);
	//   conf.setInt("kmerSize", Integer.parseInt(args[2]));
	//}
	conf.setInt("mapred.reduce.tasks", 1);
	conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
	MultipleOutputs.addNamedOutput(conf, "bacteria", TextOutputFormat.class, Text.class, Text.class);
	MultipleOutputs.addNamedOutput(conf, "virus", TextOutputFormat.class, Text.class, Text.class);
	Job job = new Job(conf, "dna count");
	//    FileInputFormat.setMinInputSplitSize(job,1048576L); //100MB
	//System.out.printf("Min: %d, Max %d", FileInputFormat.getMinSplitSize(job), FileInputFormat.getMaxSplitSize(job));
	job.setJarByClass(DNAUnique.class);
	job.setMapperClass(DNAUniqueMapper.class);
	job.setCombinerClass(DNAUniqueReducer.class);
	job.setReducerClass(DNAUniqueReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	
	
	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileInputFormat.addInputPath(job, new Path(args[1]));
	FileOutputFormat.setOutputPath(job, new Path(args[2]));


	System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
}

Reply via email to