I've been trying to figure out how to do a set difference in hadoop. I
would like to take 2 file, and remove the values they have in common
between them. Let's say I have two bags, 'students' and 'employees'. I
want to find which students are just students, and which employees are
just employees. So, an example:
Students:
(Jane)
(John)
(Dave)
Employees:
(Dave)
(Sue)
(Anne)
If I were to join these, I would get the students who are also
employees, or: (Dave).
However, what I want is the distinct values:
Only_Student:
(Jane)
(John)
Only_Employee:
(Sue)
(Anne)
I was able to do this in pig, but I think I should be able to do it in
one MapReduce pass. (With hadoop 20.1) I read from two files, and
attached the file names as the values. (Students and Employees, in this
case. My actually problem is on DNA, bacteria and viruses in this
case.) Then I output from the reducer if I only get one value for a
given key. However, I've had some real trouble figuring out
MultipleOutput and the multiple inputs. I've attached my code. I'm
getting this error, which is a total mystery to me:
09/12/02 22:33:52 INFO mapred.JobClient: Task Id :
attempt_200911301448_0019_m_000000_2, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:807)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:504)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Thanks,
Jim
package org.myorg;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
//import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.JobConf;
public class DNAUnique {
public static class DNAUniqueMapper
extends Mapper<Object, Text, Text, Text>
implements Configurable {
private Text word = new Text();
private Text location = new Text();
private Configuration conf;
private int kmerSize = 5;
public Configuration getConf() {
return conf;
}
public void setConf(Configuration inConf) {
conf = inConf;
}
public void configure(Configuration conf) {
System.out.println("in configure");
kmerSize = conf.getInt("kmerSize",6);
}
public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException {
configure(getConf());
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
if(fileName.contains("bact")) {
location.set("b");
} else {
location.set("v");
}
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
word.set(itr.nextToken());
output.collect(word, location);
}
}
public static class DNAUniqueReducer
extends Reducer<Text,Text,Text,Text> implements Configurable {
private MultipleOutputs mos;
private Configuration conf;
public Configuration getConf() {
return conf;
}
public void setConf(Configuration inConf) {
conf = inConf;
}
public void configure(Configuration conf) {
JobConf jconf = (JobConf) conf;
mos = new MultipleOutputs(jconf);
}
private Text space = new Text(" "); //Just some crap
public void reduce(Text key, Iterable<Text> values,
OutputCollector output, Reporter reporter) throws IOException, InterruptedException {
configure(getConf());
int count = 0;
boolean isBact = false;
boolean isVirus = false;
for (Text val : values) {
String location = val.toString();
if(location.equals("b")) {
isBact = true;
} else if(location.equals("v")) {
isVirus = true;
}
++count;
}
if(count == 1) {
if(isBact) {
mos.getCollector("bacteria", reporter).collect<Text,Text>(key, space);
}
if(isVirus) {
mos.getCollector("virus", reporter).collect(key, space);
}
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf();
// String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//if (otherArgs.length != 2) {
// System.err.println("Usage: wordcount <in> <out>");
// System.exit(2);
//}
conf.setInt("kmerSize", 7);
// if(args.length > 2) {
// System.out.println("Setting kmer size " + args[2]);
// conf.setInt("kmerSize", Integer.parseInt(args[2]));
//}
conf.setInt("mapred.reduce.tasks", 1);
conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
MultipleOutputs.addNamedOutput(conf, "bacteria", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(conf, "virus", TextOutputFormat.class, Text.class, Text.class);
Job job = new Job(conf, "dna count");
// FileInputFormat.setMinInputSplitSize(job,1048576L); //100MB
//System.out.printf("Min: %d, Max %d", FileInputFormat.getMinSplitSize(job), FileInputFormat.getMaxSplitSize(job));
job.setJarByClass(DNAUnique.class);
job.setMapperClass(DNAUniqueMapper.class);
job.setCombinerClass(DNAUniqueReducer.class);
job.setReducerClass(DNAUniqueReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}