Hello, As a follow-up to this question <http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201408.mbox/%3CCAOcM5s9g9WzdfcJFxLvy0K7R3ezmvfuQfVoXcv=txajhcp3...@mail.gmail.com%3E>, I ended up changing the global sorting to use two chained MR jobs. Thanks to the MapReduce Design Pattern book for the inspiration: https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/ch4/TotalOrderSorting.java
The code is as follows, *the mapper *: public static class Mapp extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("MAPPER 1"); // Split the line List<Text> fields = Library.splitLine(value, ' '); if (fields.size() != 9) return; System.out.println(fields.get(3)); context.write(fields.get(3), value); } } *The reducer: * public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { System.out.println("REDUCER 1"); for (Text val : values) { context.write(key, val); } } } *and my driver code:* public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] margs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (margs.length != 3) { System.err.println("Usage: TotalOrderSorting <user data> <out> parallel"); System.exit(1); } System.out.println("in :"+margs[0]+" out :"+margs[1]+" parallel: "+margs[2]); Path inputPath = new Path(margs[0] + "/page_views"); String outputdir = margs[1]+"/L9out"; Path outputOrder = new Path(outputdir); Path partitionFile = new Path(outputdir + "_partitions.lst"); Path outputStage = new Path(outputdir + "_staging"); Job job = new Job(conf, "L9 MR - TotalOrderSortingStage"); job.setJarByClass(L9.class); // Use the mapper implementation with zero reduce tasks job.setMapperClass(Mapp.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TextInputFormat.setInputPaths(job, inputPath); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job, outputStage); // Submit the job and get completion code. int code = job.waitForCompletion(true) ? 0 : 1; if (code == 0) { Job orderJob = new Job(conf, "TotalOrderSortingStage"); orderJob.setJarByClass(L9.class); // Here, use the identity mapper to output the key/value pairs in // the SequenceFile orderJob.setMapperClass(Mapper.class); orderJob.setReducerClass(Reduce.class); // Set the number of reduce tasks to an appropriate number for the // amount of data being sorted //orderJob.setNumReduceTasks(3); orderJob.setNumReduceTasks(400); // Use Hadoop's TotalOrderPartitioner class orderJob.setPartitionerClass(TotalOrderPartitioner.class); // Set the partition file TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(), partitionFile); orderJob.setOutputKeyClass(Text.class); orderJob.setOutputValueClass(Text.class); // Set the input to the previous job's output orderJob.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(orderJob, outputStage); // Set the output path to the command line parameter TextOutputFormat.setOutputPath(orderJob, outputOrder); // Set the separator to an empty string orderJob.getConfiguration().set( "mapred.textoutputformat.separator", ""); // Use the InputSampler to go through the output of the previous // job, sample it, and create the partition file //InputSampler.writePartitionFile(orderJob,new InputSampler.RandomSampler<Text, Text>(0.5, 3, 2)); //InputSampler.writePartitionFile(orderJob,new InputSampler.RandomSampler<Text, Text>(.001, 399, 444)); InputSampler.writePartitionFile(orderJob,new InputSampler.RandomSampler<Text, Text>(.001, 10000)); // Submit the job code = orderJob.waitForCompletion(true) ? 0 : 2; } // Cleanup the partition file and the staging directory //FileSystem.get(new Configuration()).delete(partitionFile, false); //FileSystem.get(new Configuration()).delete(outputStage, true); //FileSystem.get(new Configuration()).delete(outputOrder, true); System.exit(code); } } For some reason, on the second job I get an error as follows: ​ [exec] 14/09/01 20:08:48 INFO mapred.JobClient: Task Id : attempt_201408311908_0046_m_000029_1, Status : FAILED [exec] java.lang.IllegalArgumentException: Can't read partitions file [exec] at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116) [exec] at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62) [exec] at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) [exec] at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:676) [exec] at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756) [exec] at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364) [exec] at org.apache.hadoop.mapred.Child$4.run(Child.java:255) [exec] at java.security.AccessController.doPrivileged(Native Method) [exec] at javax.security.auth.Subject.doAs(Subject.java:415) [exec] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) [exec] at org.apache.hadoop.mapred.Child.main(Child.java:249) [exec] Caused by: java.io.IOException: Split points are out of order [exec] at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:96) [exec] ... 10 more ​ As I am using the SplitSampler "out of the box​​", I am not sure how these managed to get unordered?? The result is that the first job passes fine, but the map phase of the second job fails with the above exception. Thanks for the help! Keren -- Keren Ouaknine www.kereno.com