Author: rding
Date: Fri Sep  3 23:29:00 2010
New Revision: 992507

URL: http://svn.apache.org/viewvc?rev=992507&view=rev
Log:
PIG-1548: Optimize scalar to consolidate the part file

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    hadoop/pig/branches/branch-0.8/conf/pig-default.properties
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=992507&r1=992506&r2=992507&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Fri Sep  3 23:29:00 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1548: Optimize scalar to consolidate the part file (rding)
+
 PIG-1600: Docs update (chandec via olgan)
 
 PIG-1585: Add new properties to help and documentation(olgan)

Modified: hadoop/pig/branches/branch-0.8/conf/pig-default.properties
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/conf/pig-default.properties?rev=992507&r1=992506&r2=992507&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/conf/pig-default.properties (original)
+++ hadoop/pig/branches/branch-0.8/conf/pig-default.properties Fri Sep  3 
23:29:00 2010
@@ -30,5 +30,5 @@ pig.exec.reducers.max=999
 pig.temp.dir=/tmp/
 
 #Threshold for merging FRJoin fragment files
-pig.frjoin.merge.files.threshold=100
-pig.frjoin.merge.files.optimistic=false;
+pig.files.concatenation.threshold=100
+pig.optimistic.files.concatenation=false;

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=992507&r1=992506&r2=992507&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Sep  3 23:29:00 2010
@@ -189,11 +189,11 @@ public class MRCompiler extends PhyPlanV
    
     private static final Log LOG = LogFactory.getLog(MRCompiler.class);
     
-    public static final String FRJOIN_MERGE_FILES_THRESHOLD = 
"pig.frjoin.merge.files.threshold";
-    public static final String FRJOIN_MERGE_FILES_OPTIMISTIC = 
"pig.frjoin.merge.files.optimistic";
+    public static final String FILE_CONCATENATION_THRESHOLD = 
"pig.files.concatenation.threshold";
+    public static final String OPTIMISTIC_FILE_CONCATENATION = 
"pig.optimistic.files.concatenation";
     
-    private int frJoinFileMergeThreshold = 100;
-    private boolean frJoinOptimisticFileMerge = false;
+    private int fileConcatenationThreshold = 100;
+    private boolean optimisticFileConcatenation = false;
     
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
@@ -220,22 +220,61 @@ public class MRCompiler extends PhyPlanV
         messageCollector = new CompilationMessageCollector() ;
         phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
         
-        frJoinFileMergeThreshold = Integer.parseInt(pigContext.getProperties()
-                .getProperty(FRJOIN_MERGE_FILES_THRESHOLD, "100"));
-        frJoinOptimisticFileMerge = pigContext.getProperties().getProperty(
-                FRJOIN_MERGE_FILES_OPTIMISTIC, "false").equals("true");
-        LOG.info("FRJoin file merge threshold: " + frJoinFileMergeThreshold
-                + " optimistic? " + frJoinOptimisticFileMerge);
+        fileConcatenationThreshold = 
Integer.parseInt(pigContext.getProperties()
+                .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
+        optimisticFileConcatenation = pigContext.getProperties().getProperty(
+                OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
+        LOG.info("File concatenation threshold: " + fileConcatenationThreshold
+                + " optimistic? " + optimisticFileConcatenation);
     }
     
-    public void connectScalars() throws PlanException {
+    public void connectScalars() throws PlanException, IOException {
         List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
         for(MapReduceOper mrOp: MRPlan) {
             mrOpList.add(mrOp);
         }
+        
+        Configuration conf = 
+            ConfigurationUtil.toConfiguration(pigContext.getProperties());
+        boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
+        
+        Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, 
MapReduceOper>();
+        
         for(MapReduceOper mrOp: mrOpList) {
-            for(PhysicalOperator scalar: mrOp.scalars) {
-                MRPlan.connect(phyToMROpMap.get(scalar), mrOp);
+            for(PhysicalOperator scalar: mrOp.scalars) {                
+                MapReduceOper mro = phyToMROpMap.get(scalar);
+                List<PhysicalOperator> succs = plan.getSuccessors(scalar);
+                if (succs.size() == 1 && succs.get(0) instanceof POStore) {    
                               
+                    POStore sto = (POStore)plan.getSuccessors(scalar).get(0);  
+                    FileSpec oldSpec = sto.getSFile();
+                    MapReduceOper mro2 = seen.get(oldSpec);
+                    boolean hasSeen = false;
+                    if (mro2 != null) {
+                        hasSeen = true;
+                        mro = mro2;
+                    }
+                    if (!hasSeen
+                            && combinable
+                            && (mro.reducePlan.isEmpty() ? 
hasTooManyInputFiles(mro, conf)
+                                    : (mro.requestedParallelism >= 
fileConcatenationThreshold))) {
+                        PhysicalPlan pl = mro.reducePlan.isEmpty() ? 
mro.mapPlan : mro.reducePlan;
+                        FileSpec newSpec = getTempFileSpec();
+                        
+                        // replace oldSpec in mro with newSpec
+                        new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
+                        
+                        POStore newSto = getStore();
+                        newSto.setSFile(oldSpec);                        
+                        MapReduceOper catMROp = getConcatenateJob(newSpec, 
mro, newSto); 
+                        MRPlan.connect(catMROp, mrOp);   
+                        seen.put(oldSpec, catMROp);
+                    } else {
+                        MRPlan.connect(mro, mrOp);
+                        if (!hasSeen) seen.put(oldSpec, mro);
+                    }
+                } else {
+                    MRPlan.connect(mro, mrOp);
+                }
             }
         }
     }
@@ -1157,7 +1196,7 @@ public class MRCompiler extends PhyPlanV
                         MRPlan.connect(mro, curMROp);
                     }
                 } else if (mro.isMapDone() && !mro.isReduceDone()) {
-                    if (combinable && (mro.requestedParallelism >= 
frJoinFileMergeThreshold)) {
+                    if (combinable && (mro.requestedParallelism >= 
fileConcatenationThreshold)) {
                         POStore tmpSto = getStore();
                         FileSpec fSpec = getTempFileSpec();
                         tmpSto.setSFile(fSpec); 
@@ -1209,7 +1248,7 @@ public class MRCompiler extends PhyPlanV
         }
         
         if (mro instanceof NativeMapReduceOper) {
-            return frJoinOptimisticFileMerge ? false : true;
+            return optimisticFileConcatenation ? false : true;
         }
                
         PhysicalPlan mapPlan = mro.mapPlan;
@@ -1251,10 +1290,10 @@ public class MRCompiler extends PhyPlanV
                                 ret = hasTooManyInputFiles(pred, conf);
                                 break;
                             }
-                        } else if (!frJoinOptimisticFileMerge) {               
     
+                        } else if (!optimisticFileConcatenation) {             
       
                             // can't determine the number of input files. 
                             // Treat it as having too manyfiles
-                            numFiles = frJoinFileMergeThreshold;
+                            numFiles = fileConcatenationThreshold;
                             break;
                         }
                     }
@@ -1268,8 +1307,8 @@ public class MRCompiler extends PhyPlanV
             LOG.warn("failed to get number of input files", e); 
         }
                 
-        LOG.info("number of input files to FR Join: " + numFiles);
-        return ret ? true : (numFiles >= frJoinFileMergeThreshold);
+        LOG.info("number of input files: " + numFiles);
+        return ret ? true : (numFiles >= fileConcatenationThreshold);
     }
     
     /*
@@ -1282,10 +1321,10 @@ public class MRCompiler extends PhyPlanV
         mro.mapPlan.addAsLeaf(str);
         mro.setMapDone(true);
         
-        LOG.info("Insert a concatenate job for FR join");
+        LOG.info("Insert a file-concatenation job");
                 
         return mro;
-    }
+    }    
     
     /** Leftmost relation is referred as base relation (this is the one fed 
into mappers.) 
      *  First, close all MROpers except for first one (referred as baseMROPer)
@@ -2929,4 +2968,24 @@ public class MRCompiler extends PhyPlanV
         }
     }
 
+    private static class FindStoreNameVisitor extends PhyPlanVisitor {
+
+        FileSpec newSpec;
+        FileSpec oldSpec;
+
+        FindStoreNameVisitor (PhysicalPlan plan, FileSpec newSpec, FileSpec 
oldSpec) {
+            super(plan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.newSpec = newSpec;
+            this.oldSpec = oldSpec;
+        }
+
+        @Override
+        public void visitStore(POStore sto) throws VisitorException {
+            FileSpec spec = sto.getSFile();
+            if (oldSpec.equals(spec)) {
+                sto.setSFile(newSpec);
+            }           
+        }
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java?rev=992507&r1=992506&r2=992507&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
(original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
Fri Sep  3 23:29:00 2010
@@ -45,6 +45,7 @@ public class TestFRJoin2 {
     private static final String INPUT_FILE = "input";
     
     private static final int FILE_MERGE_THRESHOLD = 5;
+    private static final int MIN_FILE_MERGE_THRESHOLD = 1;
     
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -55,7 +56,7 @@ public class TestFRJoin2 {
             String[] input = new String[2*LOOP_SIZE];
             for (int n=0; n<LOOP_SIZE; n++) {
                 for (int j=0; j<LOOP_SIZE;j++) {
-                    input[n*LOOP_SIZE + j] = i + "\t" + j;
+                    input[n*LOOP_SIZE + j] = i + "\t" + (j + n);
                 }
             }
             Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input);
@@ -77,8 +78,150 @@ public class TestFRJoin2 {
         cluster.shutDown();
     }
 
+    // test simple scalar alias with file concatenation following 
+    // a MapReduce job
     @Test
-    public void testConcatenateJob() throws Exception {
+    public void testConcatenateJobForScalar() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = group A all parallel 5;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, 
MAX(A.y) as max;");
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+            
+            pigServer.registerQuery("D= foreach A generate x / C.count, C.max 
- y;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(3, jGraph.size());
+            // find added map-only concatenate job 
+            JobStats js = 
(JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
+            assertEquals(1, js.getNumberMaps());   
+            assertEquals(0, js.getNumberReduces()); 
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            
+            pigServer.registerQuery("D= foreach A generate x / C.count, C.max 
- y;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+    // test simple scalar alias with file concatenation following 
+    // a Map-only job
+    @Test
+    public void testConcatenateJobForScalar2() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);");
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+            
+            pigServer.registerQuery("D = foreach A generate x / C.x, y + 
C.y;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(3, jGraph.size());
+            // find added map-only concatenate job 
+            JobStats js = 
(JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
+            assertEquals(1, js.getNumberMaps());   
+            assertEquals(0, js.getNumberReduces()); 
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            
+            pigServer.registerQuery("D = foreach A generate x / C.x, y + 
C.y;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+    // test scalar alias with file concatenation following 
+    // a multi-query job
+    @Test
+    public void testConcatenateJobForScalar3() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("C = group A all parallel 5;");
+        pigServer.registerQuery("D = foreach C generate COUNT(A) as count;");
+        pigServer.registerQuery("E = foreach C generate MAX(A.x) as max;");
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+            
+            pigServer.registerQuery("F = foreach B generate x / D.count, y + 
E.max;");
+            Iterator<Tuple> iter = pigServer.openIterator("F");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(4, jGraph.size());
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            
+            pigServer.registerQuery("F = foreach B generate x / D.count, y + 
E.max;");
+            Iterator<Tuple> iter = pigServer.openIterator("F");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+    @Test
+    public void testConcatenateJobForFRJoin() throws Exception {
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
                 .getProperties());
         
@@ -88,7 +231,7 @@ public class TestFRJoin2 {
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(MIN_FILE_MERGE_THRESHOLD));            
             
             pigServer.registerQuery("C = join A by y, B by y using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -97,9 +240,7 @@ public class TestFRJoin2 {
                 dbfrj.add(iter.next());
             }
             
-            // In this case, multi-file-combiner is used so there is no need 
to add
-            // a concatenate job
-            assertEquals(2, PigStats.get().getJobGraph().size());
+            assertEquals(3, PigStats.get().getJobGraph().size());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
@@ -130,7 +271,7 @@ public class TestFRJoin2 {
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
             pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
             
@@ -171,17 +312,16 @@ public class TestFRJoin2 {
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(MIN_FILE_MERGE_THRESHOLD));  
             pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
             
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            // In this case, multi-file-combiner is used in grandparent job
-            // so there is no need to add a concatenate job
+ 
             JobGraph jGraph = PigStats.get().getJobGraph();
-            assertEquals(2, jGraph.size());
+            assertEquals(3, jGraph.size());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
@@ -208,17 +348,16 @@ public class TestFRJoin2 {
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, 
String.valueOf(MIN_FILE_MERGE_THRESHOLD));  
             pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
             
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            // In this case, multi-file-combiner is used in grandparent job
-            // so there is no need to add a concatenate job
+
             JobGraph jGraph = PigStats.get().getJobGraph();
-            assertEquals(3, jGraph.size());
+            assertEquals(5, jGraph.size());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(


Reply via email to