Author: rding Date: Fri Sep 3 23:29:00 2010 New Revision: 992507 URL: http://svn.apache.org/viewvc?rev=992507&view=rev Log: PIG-1548: Optimize scalar to consolidate the part file
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/conf/pig-default.properties hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=992507&r1=992506&r2=992507&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Fri Sep 3 23:29:00 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1548: Optimize scalar to consolidate the part file (rding) + PIG-1600: Docs update (chandec via olgan) PIG-1585: Add new properties to help and documentation(olgan) Modified: hadoop/pig/branches/branch-0.8/conf/pig-default.properties URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/conf/pig-default.properties?rev=992507&r1=992506&r2=992507&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/conf/pig-default.properties (original) +++ hadoop/pig/branches/branch-0.8/conf/pig-default.properties Fri Sep 3 23:29:00 2010 @@ -30,5 +30,5 @@ pig.exec.reducers.max=999 pig.temp.dir=/tmp/ #Threshold for merging FRJoin fragment files -pig.frjoin.merge.files.threshold=100 -pig.frjoin.merge.files.optimistic=false; +pig.files.concatenation.threshold=100 +pig.optimistic.files.concatenation=false; Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=992507&r1=992506&r2=992507&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 3 23:29:00 2010 @@ -189,11 +189,11 @@ public class MRCompiler extends PhyPlanV private static final Log LOG = LogFactory.getLog(MRCompiler.class); - public static final String FRJOIN_MERGE_FILES_THRESHOLD = "pig.frjoin.merge.files.threshold"; - public static final String FRJOIN_MERGE_FILES_OPTIMISTIC = "pig.frjoin.merge.files.optimistic"; + public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold"; + public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation"; - private int frJoinFileMergeThreshold = 100; - private boolean frJoinOptimisticFileMerge = false; + private int fileConcatenationThreshold = 100; + private boolean optimisticFileConcatenation = false; public MRCompiler(PhysicalPlan plan) throws MRCompilerException { this(plan,null); @@ -220,22 +220,61 @@ public class MRCompiler extends PhyPlanV messageCollector = new CompilationMessageCollector() ; phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>(); - frJoinFileMergeThreshold = Integer.parseInt(pigContext.getProperties() - .getProperty(FRJOIN_MERGE_FILES_THRESHOLD, "100")); - frJoinOptimisticFileMerge = pigContext.getProperties().getProperty( - FRJOIN_MERGE_FILES_OPTIMISTIC, "false").equals("true"); - LOG.info("FRJoin file merge threshold: " + frJoinFileMergeThreshold - + " optimistic? " + frJoinOptimisticFileMerge); + fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties() + .getProperty(FILE_CONCATENATION_THRESHOLD, "100")); + optimisticFileConcatenation = pigContext.getProperties().getProperty( + OPTIMISTIC_FILE_CONCATENATION, "false").equals("true"); + LOG.info("File concatenation threshold: " + fileConcatenationThreshold + + " optimistic? " + optimisticFileConcatenation); } - public void connectScalars() throws PlanException { + public void connectScalars() throws PlanException, IOException { List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>(); for(MapReduceOper mrOp: MRPlan) { mrOpList.add(mrOp); } + + Configuration conf = + ConfigurationUtil.toConfiguration(pigContext.getProperties()); + boolean combinable = !conf.getBoolean("pig.noSplitCombination", false); + + Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>(); + for(MapReduceOper mrOp: mrOpList) { - for(PhysicalOperator scalar: mrOp.scalars) { - MRPlan.connect(phyToMROpMap.get(scalar), mrOp); + for(PhysicalOperator scalar: mrOp.scalars) { + MapReduceOper mro = phyToMROpMap.get(scalar); + List<PhysicalOperator> succs = plan.getSuccessors(scalar); + if (succs.size() == 1 && succs.get(0) instanceof POStore) { + POStore sto = (POStore)plan.getSuccessors(scalar).get(0); + FileSpec oldSpec = sto.getSFile(); + MapReduceOper mro2 = seen.get(oldSpec); + boolean hasSeen = false; + if (mro2 != null) { + hasSeen = true; + mro = mro2; + } + if (!hasSeen + && combinable + && (mro.reducePlan.isEmpty() ? hasTooManyInputFiles(mro, conf) + : (mro.requestedParallelism >= fileConcatenationThreshold))) { + PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan; + FileSpec newSpec = getTempFileSpec(); + + // replace oldSpec in mro with newSpec + new FindStoreNameVisitor(pl, newSpec, oldSpec).visit(); + + POStore newSto = getStore(); + newSto.setSFile(oldSpec); + MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto); + MRPlan.connect(catMROp, mrOp); + seen.put(oldSpec, catMROp); + } else { + MRPlan.connect(mro, mrOp); + if (!hasSeen) seen.put(oldSpec, mro); + } + } else { + MRPlan.connect(mro, mrOp); + } } } } @@ -1157,7 +1196,7 @@ public class MRCompiler extends PhyPlanV MRPlan.connect(mro, curMROp); } } else if (mro.isMapDone() && !mro.isReduceDone()) { - if (combinable && (mro.requestedParallelism >= frJoinFileMergeThreshold)) { + if (combinable && (mro.requestedParallelism >= fileConcatenationThreshold)) { POStore tmpSto = getStore(); FileSpec fSpec = getTempFileSpec(); tmpSto.setSFile(fSpec); @@ -1209,7 +1248,7 @@ public class MRCompiler extends PhyPlanV } if (mro instanceof NativeMapReduceOper) { - return frJoinOptimisticFileMerge ? false : true; + return optimisticFileConcatenation ? false : true; } PhysicalPlan mapPlan = mro.mapPlan; @@ -1251,10 +1290,10 @@ public class MRCompiler extends PhyPlanV ret = hasTooManyInputFiles(pred, conf); break; } - } else if (!frJoinOptimisticFileMerge) { + } else if (!optimisticFileConcatenation) { // can't determine the number of input files. // Treat it as having too manyfiles - numFiles = frJoinFileMergeThreshold; + numFiles = fileConcatenationThreshold; break; } } @@ -1268,8 +1307,8 @@ public class MRCompiler extends PhyPlanV LOG.warn("failed to get number of input files", e); } - LOG.info("number of input files to FR Join: " + numFiles); - return ret ? true : (numFiles >= frJoinFileMergeThreshold); + LOG.info("number of input files: " + numFiles); + return ret ? true : (numFiles >= fileConcatenationThreshold); } /* @@ -1282,10 +1321,10 @@ public class MRCompiler extends PhyPlanV mro.mapPlan.addAsLeaf(str); mro.setMapDone(true); - LOG.info("Insert a concatenate job for FR join"); + LOG.info("Insert a file-concatenation job"); return mro; - } + } /** Leftmost relation is referred as base relation (this is the one fed into mappers.) * First, close all MROpers except for first one (referred as baseMROPer) @@ -2929,4 +2968,24 @@ public class MRCompiler extends PhyPlanV } } + private static class FindStoreNameVisitor extends PhyPlanVisitor { + + FileSpec newSpec; + FileSpec oldSpec; + + FindStoreNameVisitor (PhysicalPlan plan, FileSpec newSpec, FileSpec oldSpec) { + super(plan, + new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); + this.newSpec = newSpec; + this.oldSpec = oldSpec; + } + + @Override + public void visitStore(POStore sto) throws VisitorException { + FileSpec spec = sto.getSFile(); + if (oldSpec.equals(spec)) { + sto.setSFile(newSpec); + } + } + } } Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java?rev=992507&r1=992506&r2=992507&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java Fri Sep 3 23:29:00 2010 @@ -45,6 +45,7 @@ public class TestFRJoin2 { private static final String INPUT_FILE = "input"; private static final int FILE_MERGE_THRESHOLD = 5; + private static final int MIN_FILE_MERGE_THRESHOLD = 1; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -55,7 +56,7 @@ public class TestFRJoin2 { String[] input = new String[2*LOOP_SIZE]; for (int n=0; n<LOOP_SIZE; n++) { for (int j=0; j<LOOP_SIZE;j++) { - input[n*LOOP_SIZE + j] = i + "\t" + j; + input[n*LOOP_SIZE + j] = i + "\t" + (j + n); } } Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input); @@ -77,8 +78,150 @@ public class TestFRJoin2 { cluster.shutDown(); } + // test simple scalar alias with file concatenation following + // a MapReduce job @Test - public void testConcatenateJob() throws Exception { + public void testConcatenateJobForScalar() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = group A all parallel 5;"); + pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.y) as max;"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + + pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(3, jGraph.size()); + // find added map-only concatenate job + JobStats js = (JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0); + assertEquals(1, js.getNumberMaps()); + assertEquals(0, js.getNumberReduces()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + + pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + + assertEquals(2, PigStats.get().getJobGraph().size()); + } + + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + // test simple scalar alias with file concatenation following + // a Map-only job + @Test + public void testConcatenateJobForScalar2() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD)); + + pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(3, jGraph.size()); + // find added map-only concatenate job + JobStats js = (JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0); + assertEquals(1, js.getNumberMaps()); + assertEquals(0, js.getNumberReduces()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + + pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + + assertEquals(2, PigStats.get().getJobGraph().size()); + } + + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + // test scalar alias with file concatenation following + // a multi-query job + @Test + public void testConcatenateJobForScalar3() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("C = group A all parallel 5;"); + pigServer.registerQuery("D = foreach C generate COUNT(A) as count;"); + pigServer.registerQuery("E = foreach C generate MAX(A.x) as max;"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD)); + + pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;"); + Iterator<Tuple> iter = pigServer.openIterator("F"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(4, jGraph.size()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + + pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;"); + Iterator<Tuple> iter = pigServer.openIterator("F"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + + assertEquals(2, PigStats.get().getJobGraph().size()); + } + + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + @Test + public void testConcatenateJobForFRJoin() throws Exception { PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster .getProperties()); @@ -88,7 +231,7 @@ public class TestFRJoin2 { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { pigServer.getPigContext().getProperties().setProperty( - MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD)); pigServer.registerQuery("C = join A by y, B by y using 'repl';"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -97,9 +240,7 @@ public class TestFRJoin2 { dbfrj.add(iter.next()); } - // In this case, multi-file-combiner is used so there is no need to add - // a concatenate job - assertEquals(2, PigStats.get().getJobGraph().size()); + assertEquals(3, PigStats.get().getJobGraph().size()); } { pigServer.getPigContext().getProperties().setProperty( @@ -130,7 +271,7 @@ public class TestFRJoin2 { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { pigServer.getPigContext().getProperties().setProperty( - MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -171,17 +312,16 @@ public class TestFRJoin2 { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { pigServer.getPigContext().getProperties().setProperty( - MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD)); pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';"); Iterator<Tuple> iter = pigServer.openIterator("C"); while(iter.hasNext()) { dbfrj.add(iter.next()); } - // In this case, multi-file-combiner is used in grandparent job - // so there is no need to add a concatenate job + JobGraph jGraph = PigStats.get().getJobGraph(); - assertEquals(2, jGraph.size()); + assertEquals(3, jGraph.size()); } { pigServer.getPigContext().getProperties().setProperty( @@ -208,17 +348,16 @@ public class TestFRJoin2 { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { pigServer.getPigContext().getProperties().setProperty( - MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD)); pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';"); Iterator<Tuple> iter = pigServer.openIterator("D"); while(iter.hasNext()) { dbfrj.add(iter.next()); } - // In this case, multi-file-combiner is used in grandparent job - // so there is no need to add a concatenate job + JobGraph jGraph = PigStats.get().getJobGraph(); - assertEquals(3, jGraph.size()); + assertEquals(5, jGraph.size()); } { pigServer.getPigContext().getProperties().setProperty(