Hello,

We're trying to do a reduce-side join by applying two different
mappers (TransformationSessionMapper and TransformationActionMapper)
to two different input files and joining them using
TransformationReducer. See attached Classify.java for complete source.

When running it, we get the following error. JoinKey is our own
implementation that is used for performing secondary sort. Somehow
TransformationActionMapper gets passed a JoinKey when it expects a
LongWritable (TextInputFormat). Is Hadoop actually applying the
mappers in sequence?

$ hadoop jar /tmp/classify.jar Classify
10/02/11 16:40:16 INFO jvm.JvmMetrics: Initializing JVM Metrics with
processName=JobTracker, sessionId=
10/02/11 16:40:16 WARN mapred.JobClient: No job jar file set.  User
classes may not be found. See JobConf(Class) or
JobConf#setJar(String).
10/02/11 16:40:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
with processName=JobTracker, sessionId= - already initialized
10/02/11 16:40:16 INFO input.FileInputFormat: Total input paths to process : 1
10/02/11 16:40:16 INFO input.FileInputFormat: Total input paths to process : 1
10/02/11 16:40:16 INFO mapred.JobClient: Running job: job_local_0001
10/02/11 16:40:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
with processName=JobTracker, sessionId= - already initialized
10/02/11 16:40:16 INFO input.FileInputFormat: Total input paths to process : 1
10/02/11 16:40:16 INFO input.FileInputFormat: Total input paths to process : 1
10/02/11 16:40:16 INFO mapred.MapTask: io.sort.mb = 100
10/02/11 16:40:16 INFO mapred.MapTask: data buffer = 79691776/99614720
10/02/11 16:40:16 INFO mapred.MapTask: record buffer = 262144/327680
10/02/11 16:40:16 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.LongWritable, recieved Classify$JoinKey
        at 
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:807)
        at 
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:504)
        at 
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at Classify$TransformationActionMapper.map(Classify.java:161)
        at Classify$TransformationActionMapper.map(Classify.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at 
org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:51)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:176)
10/02/11 16:40:17 INFO mapred.JobClient:  map 0% reduce 0%
10/02/11 16:40:17 INFO mapred.JobClient: Job complete: job_local_0001
10/02/11 16:40:17 INFO mapred.JobClient: Counters: 0
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URI;
import java.util.Iterator;
import java.util.regex.Pattern;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import weka.classifiers.Classifier;
import weka.core.Instance;


public class Classify extends Configured implements Tool {
	private static final String cachedClassifierFilePath = "/tmp/classifier";
	private static final String arffHeaderFilePath = "/tmp/header.arff";
	
	public static class JoinKey implements WritableComparable<JoinKey> {
		
		private Text key;
		private IntWritable ordering;
		
		public JoinKey(Text key, IntWritable ordering) {
			this.key = key;
			this.ordering = ordering;
		}
		
		public JoinKey(String key, Integer ordering) {
			this(new Text(key), new IntWritable(ordering));
		}
		
		public Text key() {
			return key;
		}
		
		public IntWritable ordering() {
			return ordering;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			key.readFields(in);
			ordering.readFields(in);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			key.write(out);
			ordering.write(out);
		}
		
		@Override
		public int hashCode() {
			return key.hashCode();
		}
		
		@Override
		public boolean equals(Object other) {
			if (other instanceof JoinKey) {
				JoinKey otherKey = (JoinKey) other;
				return key.equals(otherKey.key) && ordering.equals(otherKey.ordering);
			} else {
				return false;
			}
		}

		@Override
		public int compareTo(JoinKey other) {
			int cmp = key.compareTo(other.key);
			
			if (cmp != 0)
				return cmp;
			else
				return ordering.compareTo(other.ordering);
		}
		
	}
	
	public static class TransformationSessionMapper extends Mapper<LongWritable, Text, JoinKey, Instance> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			int begin = 19, numAttributes = 5;
			
			String line = value.toString();
			String fields[] = line.split("\t");
			String session_id = fields[0];
			
			Instance instance = new Instance(numAttributes);
			
			for (int i = 0; i < numAttributes - 2; i++) {
				System.err.println(fields[begin + i]);
				instance.setValue(i, Double.parseDouble(fields[begin + i]));
			}
			
			context.write(new JoinKey(session_id, 2), instance);
		}

	}
	
	public static class TransformationActionMapper extends Mapper<LongWritable, Text, JoinKey, IntWritable> {
		
		private static final String TYPE_REGEX = "^click_?thr$";

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String fields[] = line.split("\t");
			String session_id = fields[4];
			
			if (Pattern.matches(TYPE_REGEX, fields[5]))
				context.write(new JoinKey(session_id, 1), new IntWritable(1));
			else
				context.write(new JoinKey(session_id, 1), new IntWritable(0));
		}
		
	}
	
	
	public static class TransformationReducer extends Reducer<JoinKey, Object, Text, Instance> {
		
		private int sum = 0;

		@Override
		protected void reduce(JoinKey key, Iterable<Object> values, Context context)
				throws IOException, InterruptedException {
			
			Iterator<Object> iter = values.iterator();
			
			Object o;
			while (iter.hasNext()) {
				o = iter.next();
				
				if (o instanceof IntWritable)
					sum += ((IntWritable) o).get();
				else {
					Instance instance = (Instance) o;
					instance.setValue(4, sum);
					
					context.write(key.key(), instance);
				}
			}
		}
		
	}
	
	public static class ClassificationMapper extends Mapper<Text, Instance, Text, DoubleWritable> {
		
		private Classifier classifier = null;

		@Override
		protected void map(Text sessionId, Instance instance, Context context)
				throws IOException, InterruptedException {
			
			try {
				double result = classifier.classifyInstance(instance);
				context.write(sessionId, new DoubleWritable(result));
				
				 // this following code is retarded, but blame Weka's classifyInstance() for that.
			} catch (IOException e) {
				throw e;
			} catch (InterruptedException e) {
				throw e;
			} catch (Exception e) {
				throw new IOException(e);
			}
		}

		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			Configuration conf = context.getConfiguration();
			URI uris[] = DistributedCache.getCacheFiles(conf);
			
			for (URI uri : uris) {
				if (cachedClassifierFilePath.equals(uri.getPath())) {
					Path classifierFile = new Path(uri.getPath());
					FileSystem fs = classifierFile.getFileSystem(conf);
					FSDataInputStream inputStream = fs.open(classifierFile);
					
					ObjectInputStream objectStream = new ObjectInputStream(inputStream);
					
					Object o;
					
					try {
						o = objectStream.readObject();
					} catch (ClassNotFoundException e) {
						throw new IOException(e);
					}
					
					if (o instanceof Classifier) {
						classifier = (Classifier) o;
					} else {
						throw new IOException("Expected an instance of Classifier, got object of type " + o.getClass());
					}
				}
			}
		
			if (classifier == null)
				throw new IOException("Could not retrieve classifier from local cache");
		}
		
	}
	
	public static class ClassificationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

		@Override
		protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
				throws IOException, InterruptedException {
			
			Iterator<DoubleWritable> iter = values.iterator();
			
			if (iter.hasNext())
				context.write(key, iter.next());
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Configuration(), new Classify(), args);
		System.exit(exitCode);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		
		Job job = new Job(conf, "Classifier");
		job.setJarByClass(Classify.class);
		
		MultipleInputs.addInputPath(job, new Path("/tmp/hadoop/sessions-20100205155800.txt"), TextInputFormat.class, TransformationSessionMapper.class);
		MultipleInputs.addInputPath(job, new Path("/tmp/hadoop/actions-20100205155800.txt"), TextInputFormat.class, TransformationActionMapper.class);
		FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/output"));
		
		job.setReducerClass(TransformationReducer.class);
		
		return job.waitForCompletion(true) ? 0 : 1;

	}
}

Reply via email to