Author: pradeepkth Date: Thu Aug 5 19:24:13 2010 New Revision: 982739 URL: http://svn.apache.org/viewvc?rev=982739&view=rev Log: PIG-1534: Code discovering UDFs in the script has a bug in a order by case (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Aug 5 19:24:13 2010 @@ -114,6 +114,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1534: Code discovering UDFs in the script has a bug in a order by case +(pradeepkth) + PIG-1533: Compression codec should be a per-store property (rding) PIG-1527: No need to deserialize UDFContext on the client side (rding) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Thu Aug 5 19:24:13 2010 @@ -63,6 +63,7 @@ public class SampleOptimizer extends MRO this.mPlan.remove(op); } + @Override public void visitMROp(MapReduceOper mr) throws VisitorException { // See if this is a sampling job. List<PhysicalOperator> pos = mr.mapPlan.getRoots(); @@ -168,6 +169,8 @@ public class SampleOptimizer extends MRO // First argument is FuncSpec of loader function to subsume, this we want to set for // ourselves. rslargs[0] = predFs.getFuncSpec().toString(); + // Add the loader's funcspec to the list of udf's associated with this mr operator + mr.UDFs.add(rslargs[0]); // Second argument is the number of samples per block, read this from the original. rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1]; FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, rslargs)); @@ -191,6 +194,8 @@ public class SampleOptimizer extends MRO newLoad.setSignature(predLoad.getSignature()); try { succ.mapPlan.replace(succLoad, newLoad); + // Add the loader's funcspec to the list of udf's associated with this mr operator + succ.UDFs.add(newLoad.getLFile().getFuncSpec().toString()); } catch (PlanException e) { throw new VisitorException(e); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Thu Aug 5 19:24:13 2010 @@ -223,4 +223,46 @@ public class TestSampleOptimizer { // After optimizer visits, number of MR jobs = 2 assertEquals(2,count); } + + @Test + public void testOrderByUDFSet() throws Exception { + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan("a = load 'input1' using BinStorage();"); + planTester.buildPlan("b = order a by $0;"); + LogicalPlan lp = planTester.buildPlan("store b into '/tmp';"); + + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + int count = 1; + MapReduceOper mrOper = mrPlan.getRoots().get(0); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + ++count; + } + // Before optimizer visits, number of MR jobs = 3. + assertEquals(3,count); + + SampleOptimizer so = new SampleOptimizer(mrPlan); + so.visit(); + + count = 1; + mrOper = mrPlan.getRoots().get(0); + // the first mrOper should be the sampling job - it's udf list should only + // contain BinStorage + assertTrue(mrOper.UDFs.size()==1); + assertTrue(mrOper.UDFs.contains("BinStorage")); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + // the second mr oper is the real order by job - it's udf list should + // contain BinStorage corresponding to the load and PigStorage + // corresponding to the store + assertTrue(mrOper.UDFs.size()==2); + assertTrue(mrOper.UDFs.contains("BinStorage")); + assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage")); + ++count; + } + // After optimizer visits, number of MR jobs = 2 + assertEquals(2,count); + } }