Author: harsh Date: Tue Oct 9 09:58:21 2012 New Revision: 1395936 URL: http://svn.apache.org/viewvc?rev=1395936&view=rev Log: MAPREDUCE-4574. Fix TotalOrderParitioner to work with non-WritableComparable key types. Contributed by Harsh J. (harsh)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1395936&r1=1395935&r2=1395936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct 9 09:58:21 2012 @@ -138,6 +138,9 @@ Trunk (Unreleased) MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it (harsh) + MAPREDUCE-4574. Fix TotalOrderParitioner to work with + non-WritableComparable key types. (harsh) + Release 2.0.3-alpha - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Tue Oct 9 09:58:21 2012 @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.Partitio */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TotalOrderPartitioner<K extends WritableComparable<?>,V> +public class TotalOrderPartitioner<K ,V> extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V> implements Partitioner<K,V> { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Tue Oct 9 09:58:21 2012 @@ -47,7 +47,7 @@ import org.apache.hadoop.util.Reflection */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TotalOrderPartitioner<K extends WritableComparable<?>,V> +public class TotalOrderPartitioner<K,V> extends Partitioner<K,V> implements Configurable { private Node partitions; @@ -298,12 +298,13 @@ public class TotalOrderPartitioner<K ext @SuppressWarnings("unchecked") // map output key class private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, Configuration conf) throws IOException { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); + SequenceFile.Reader reader = new SequenceFile.Reader( + conf, + SequenceFile.Reader.file(p)); ArrayList<K> parts = new ArrayList<K>(); K key = ReflectionUtils.newInstance(keyClass, conf); - NullWritable value = NullWritable.get(); try { - while (reader.next(key, value)) { + while ((key = (K) reader.next(key)) != null) { parts.add(key); key = ReflectionUtils.newInstance(keyClass, conf); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Tue Oct 9 09:58:21 2012 @@ -21,19 +21,25 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.WritableSerialization; import org.apache.hadoop.mapreduce.MRJobConfig; public class TestTotalOrderPartitioner extends TestCase { @@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner e new Text("yak"), // 9 }; + private static final String[] splitJavaStrings = new String[] { + // -inf // 0 + new String("aabbb"), // 1 + new String("babbb"), // 2 + new String("daddd"), // 3 + new String("dddee"), // 4 + new String("ddhee"), // 5 + new String("dingo"), // 6 + new String("hijjj"), // 7 + new String("n"), // 8 + new String("yak"), // 9 + }; + static class Check<T> { T data; int part; @@ -76,19 +95,41 @@ public class TestTotalOrderPartitioner e testStrings.add(new Check<Text>(new Text("hi"), 6)); }; - private static <T extends WritableComparable<?>> Path writePartitionFile( + private static final ArrayList<Check<String>> testJavaStrings = + new ArrayList<Check<String>>(); + static { + testJavaStrings.add(new Check<String>(new String("aaaaa"), 0)); + testJavaStrings.add(new Check<String>(new String("aaabb"), 0)); + testJavaStrings.add(new Check<String>(new String("aabbb"), 1)); + testJavaStrings.add(new Check<String>(new String("aaaaa"), 0)); + testJavaStrings.add(new Check<String>(new String("babbb"), 2)); + testJavaStrings.add(new Check<String>(new String("baabb"), 1)); + testJavaStrings.add(new Check<String>(new String("yai"), 8)); + testJavaStrings.add(new Check<String>(new String("yak"), 9)); + testJavaStrings.add(new Check<String>(new String("z"), 9)); + testJavaStrings.add(new Check<String>(new String("ddngo"), 5)); + testJavaStrings.add(new Check<String>(new String("hi"), 6)); + }; + + + private static <T> Path writePartitionFile( String testname, Configuration conf, T[] splits) throws IOException { final FileSystem fs = FileSystem.getLocal(conf); final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") - ).makeQualified(fs); + ).makeQualified( + fs.getUri(), + fs.getWorkingDirectory()); Path p = new Path(testdir, testname + "/_partition.lst"); TotalOrderPartitioner.setPartitionFile(conf, p); conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1); SequenceFile.Writer w = null; try { - w = SequenceFile.createWriter(fs, conf, p, - splits[0].getClass(), NullWritable.class, - SequenceFile.CompressionType.NONE); + w = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(splits[0].getClass()), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.NONE)); for (int i = 0; i < splits.length; ++i) { w.append(splits[i], NullWritable.get()); } @@ -99,6 +140,31 @@ public class TestTotalOrderPartitioner e return p; } + public void testTotalOrderWithCustomSerialization() throws Exception { + TotalOrderPartitioner<String, NullWritable> partitioner = + new TotalOrderPartitioner<String, NullWritable>(); + Configuration conf = new Configuration(); + conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName(), + WritableSerialization.class.getName()); + conf.setClass(MRJobConfig.KEY_COMPARATOR, + JavaSerializationComparator.class, + Comparator.class); + Path p = TestTotalOrderPartitioner.<String>writePartitionFile( + "totalordercustomserialization", conf, splitJavaStrings); + conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class); + try { + partitioner.setConf(conf); + NullWritable nw = NullWritable.get(); + for (Check<String> chk : testJavaStrings) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } + public void testTotalOrderMemCmp() throws Exception { TotalOrderPartitioner<Text,NullWritable> partitioner = new TotalOrderPartitioner<Text,NullWritable>();