Author: olga Date: Wed Sep 10 11:55:53 2008 New Revision: 693927 URL: http://svn.apache.org/viewvc?rev=693927&view=rev Log: PIG-402, PIG-415: order related bugs
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Wed Sep 10 11:55:53 2008 @@ -183,3 +183,9 @@ PIG-398: Expressions not allowed inside foreach (sms via olgan) PIG-418: divide by 0 problem + + PIG-402: order by with user comparator (shravanmn via olgan) + + PIG-415: problem with comparators (shravanmn via olgan) + + Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 10 11:55:53 2008 @@ -287,7 +287,7 @@ op.setParentPlan(plans[i]); } } - + POPackage pack = null; if(mro.reducePlan.isEmpty()){ //MapOnly Job jobConf.setMapperClass(PigMapOnly.Map.class); @@ -310,7 +310,7 @@ jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan)); jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack)); } - POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0); + pack = (POPackage)mro.reducePlan.getRoots().get(0); mro.reducePlan.remove(pack); jobConf.setMapperClass(PigMapReduce.Map.class); jobConf.setReducerClass(PigMapReduce.Reduce.class); @@ -345,6 +345,10 @@ String compFuncSpec = mro.UDFs.get(0); Class comparator = PigContext.resolveClassName(compFuncSpec); if(ComparisonFunc.class.isAssignableFrom(comparator)) { + jobConf.setMapperClass(PigMapReduce.MapWithComparator.class); + pack.setKeyType(DataType.TUPLE); + jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); + jobConf.setOutputKeyClass(TupleFactory.getInstance().tupleClass()); jobConf.setOutputKeyComparatorClass(comparator); } } else { Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Wed Sep 10 11:55:53 2008 @@ -66,7 +66,7 @@ // If either are null, handle differently. if (b1[s1] == NullableBytesWritable.NOTNULL && b2[s2] == NullableBytesWritable.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableBytesWritable.NULL && Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Wed Sep 10 11:55:53 2008 @@ -65,7 +65,7 @@ // If either are null, handle differently. if (b1[s1] == NullableDoubleWritable.NOTNULL && b2[s2] == NullableDoubleWritable.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableDoubleWritable.NULL && Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Wed Sep 10 11:55:53 2008 @@ -66,7 +66,7 @@ // If either are null, handle differently. if (b1[s1] == NullableFloatWritable.NOTNULL && b2[s2] == NullableFloatWritable.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableFloatWritable.NULL && Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Wed Sep 10 11:55:53 2008 @@ -66,7 +66,7 @@ // If either are null, handle differently. if (b1[s1] == NullableIntWritable.NOTNULL && b2[s2] == NullableIntWritable.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableIntWritable.NULL && Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Wed Sep 10 11:55:53 2008 @@ -66,7 +66,7 @@ // If either are null, handle differently. if (b1[s1] == NullableLongWritable.NOTNULL && b2[s2] == NullableLongWritable.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableLongWritable.NULL && Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Sep 10 11:55:53 2008 @@ -16,6 +16,7 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.TargetedTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -27,12 +28,13 @@ public abstract class PigMapBase extends MapReduceBase{ private final Log log = LogFactory.getLog(getClass()); - + protected byte keyType; //Map Plan protected PhysicalPlan mp; + protected TupleFactory tf = TupleFactory.getInstance(); OutputCollector<WritableComparable, Writable> outputCollector; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Sep 10 11:55:53 2008 @@ -83,6 +83,20 @@ oc.collect(wcKey, it); } } + + public static class MapWithComparator extends PigMapBase implements + Mapper<Text, TargetedTuple, WritableComparable, Writable> { + + @Override + public void collect(OutputCollector<WritableComparable, Writable> oc, + Tuple tuple) throws ExecException, IOException { + Object key = tuple.get(0); + Tuple keyTuple = tf.newTuple(1); + keyTuple.set(0, key); + IndexedTuple it = (IndexedTuple) tuple.get(1); + oc.collect(keyTuple, it); + } + } public static class Reduce extends MapReduceBase implements Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Wed Sep 10 11:55:53 2008 @@ -66,7 +66,7 @@ // If either are null, handle differently. if (b1[s1] == NullableText.NOTNULL && b2[s2] == NullableText.NOTNULL) { - rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1); } else { // For sorting purposes two nulls are equal. if (b1[s1] == NullableText.NULL && Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=693927&r1=693926&r2=693927&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Wed Sep 10 11:55:53 2008 @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; +import org.apache.pig.ComparisonFunc; import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigServer; @@ -61,6 +62,7 @@ public void setUp() throws Exception{ FileLocalizer.setR(new Random()); pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); +// pigServer = new PigServer(ExecType.LOCAL); } static public class MyBagFunction extends EvalFunc<DataBag>{ @@ -306,16 +308,29 @@ @Test public void testSort() throws Exception{ - testSortDistinct(false); + testSortDistinct(false, false); + } + + @Test + public void testSortWithUDF() throws Exception{ + testSortDistinct(false, true); } @Test public void testDistinct() throws Exception{ - testSortDistinct(true); + testSortDistinct(true, false); } + + public static class TupComp extends ComparisonFunc { - private void testSortDistinct(boolean eliminateDuplicates) throws Exception{ + @Override + public int compare(Tuple t1, Tuple t2) { + return t1.compareTo(t2); + } + } + + private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{ int LOOP_SIZE = 1024*16; File tmpFile = File.createTempFile("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); @@ -330,7 +345,10 @@ if (eliminateDuplicates){ pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;"); }else{ - pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;"); + if(!useUDF) + pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;"); + else + pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";"); } pigServer.store("B", tmpOutputFile); @@ -355,7 +373,7 @@ } - public void testNestedPlan() throws Exception{ + /*public void testNestedPlan() throws Exception{ int LOOP_COUNT = 10; File tmpFile = File.createTempFile("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); @@ -464,7 +482,7 @@ ++numIdentity; } assertEquals(5, numIdentity); - } + }*/ }