Author: pradeepkth
Date: Thu Aug  5 19:24:13 2010
New Revision: 982739

URL: http://svn.apache.org/viewvc?rev=982739&view=rev
Log:
PIG-1534: Code discovering UDFs in the script has a bug in a order by case 
(pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982739&r1=982738&r2=982739&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug  5 19:24:13 2010
@@ -114,6 +114,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1534: Code discovering UDFs in the script has a bug in a order by case
+(pradeepkth)
+
 PIG-1533: Compression codec should be a per-store property (rding)
 
 PIG-1527: No need to deserialize UDFContext on the client side (rding)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Thu Aug  5 19:24:13 2010
@@ -63,6 +63,7 @@ public class SampleOptimizer extends MRO
             this.mPlan.remove(op);
     }
 
+    @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
         // See if this is a sampling job.
         List<PhysicalOperator> pos = mr.mapPlan.getRoots();
@@ -168,6 +169,8 @@ public class SampleOptimizer extends MRO
         // First argument is FuncSpec of loader function to subsume, this we 
want to set for
         // ourselves.
         rslargs[0] = predFs.getFuncSpec().toString();
+        // Add the loader's funcspec to the list of udf's associated with this 
mr operator
+        mr.UDFs.add(rslargs[0]);
         // Second argument is the number of samples per block, read this from 
the original.
         rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
         FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, 
rslargs));
@@ -191,6 +194,8 @@ public class SampleOptimizer extends MRO
         newLoad.setSignature(predLoad.getSignature());
         try {
             succ.mapPlan.replace(succLoad, newLoad);
+            // Add the loader's funcspec to the list of udf's associated with 
this mr operator
+            succ.UDFs.add(newLoad.getLFile().getFuncSpec().toString());
         } catch (PlanException e) {
             throw new VisitorException(e);
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Thu Aug  
5 19:24:13 2010
@@ -223,4 +223,46 @@ public class TestSampleOptimizer {
         // After optimizer visits, number of MR jobs = 2
         assertEquals(2,count);
     }
+    
+    @Test
+    public void testOrderByUDFSet() throws Exception {
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input1' using BinStorage();");
+        planTester.buildPlan("b = order a by $0;");
+        LogicalPlan lp = planTester.buildPlan("store b into '/tmp';");
+        
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        int count = 1;
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // Before optimizer visits, number of MR jobs = 3.
+        assertEquals(3,count);
+
+        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        so.visit();
+
+        count = 1;
+        mrOper = mrPlan.getRoots().get(0);
+        // the first mrOper should be the sampling job - it's udf list should 
only
+        // contain BinStorage
+        assertTrue(mrOper.UDFs.size()==1);
+        assertTrue(mrOper.UDFs.contains("BinStorage"));
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            // the second mr oper is the real order by job - it's udf list 
should
+            // contain BinStorage corresponding to the load and PigStorage
+            // corresponding to the store
+            assertTrue(mrOper.UDFs.size()==2);
+            assertTrue(mrOper.UDFs.contains("BinStorage"));
+            
assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage"));
+            ++count;
+        }        
+        // After optimizer visits, number of MR jobs = 2
+        assertEquals(2,count);
+    }
 }


Reply via email to