Author: rding Date: Mon Aug 30 22:19:50 2010 New Revision: 991008 URL: http://svn.apache.org/viewvc?rev=991008&view=rev Log: PIG-1458: aggregate files for replicated join
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/conf/pig-default.properties hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=991008&r1=991007&r2=991008&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 22:19:50 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1458: aggregate files for replicated join (rding) + PIG-1205: Enhance HBaseStorage-- Make it support loading row key and implement StoreFunc (zjffdu and dvryaboy) PIG-1568: Optimization rule FilterAboveForeach is too restrictive and doesn't Modified: hadoop/pig/trunk/conf/pig-default.properties URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=991008&r1=991007&r2=991008&view=diff ============================================================================== --- hadoop/pig/trunk/conf/pig-default.properties (original) +++ hadoop/pig/trunk/conf/pig-default.properties Mon Aug 30 22:19:50 2010 @@ -28,3 +28,7 @@ pig.exec.reducers.max=999 #Temporary location to store the intermediate data. pig.temp.dir=/tmp/ + +#Threshold for merging FRJoin fragment files +pig.frjoin.merge.files.threshold=100 +pig.frjoin.merge.files.optimistic=false; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=991008&r1=991007&r2=991008&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Aug 30 22:19:50 2010 @@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,7 +31,16 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; @@ -39,6 +50,7 @@ import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; @@ -75,6 +87,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.DefaultIndexableLoader; @@ -171,9 +184,17 @@ public class MRCompiler extends PhyPlanV private CompilationMessageCollector messageCollector = null; private Map<PhysicalOperator,MapReduceOper> phyToMROpMap; - + public static final String USER_COMPARATOR_MARKER = "user.comparator.func:"; + private static final Log LOG = LogFactory.getLog(MRCompiler.class); + + public static final String FRJOIN_MERGE_FILES_THRESHOLD = "pig.frjoin.merge.files.threshold"; + public static final String FRJOIN_MERGE_FILES_OPTIMISTIC = "pig.frjoin.merge.files.optimistic"; + + private int frJoinFileMergeThreshold = 100; + private boolean frJoinOptimisticFileMerge = false; + public MRCompiler(PhysicalPlan plan) throws MRCompilerException { this(plan,null); } @@ -198,6 +219,13 @@ public class MRCompiler extends PhyPlanV scope = roots.get(0).getOperatorKey().getScope(); messageCollector = new CompilationMessageCollector() ; phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>(); + + frJoinFileMergeThreshold = Integer.parseInt(pigContext.getProperties() + .getProperty(FRJOIN_MERGE_FILES_THRESHOLD, "100")); + frJoinOptimisticFileMerge = pigContext.getProperties().getProperty( + FRJOIN_MERGE_FILES_OPTIMISTIC, "false").equals("true"); + LOG.info("FRJoin file merge threshold: " + frJoinFileMergeThreshold + + " optimistic? " + frJoinOptimisticFileMerge); } public void connectScalars() throws PlanException { @@ -1083,7 +1111,7 @@ public class MRCompiler extends PhyPlanV throw new MRCompilerException(msg, errCode, PigException.BUG, e); } } - + /** * This is an operator which will have multiple inputs(= to number of join inputs) * But it prunes off all inputs but the fragment input and creates separate MR jobs @@ -1109,18 +1137,44 @@ public class MRCompiler extends PhyPlanV continue; POStore str = getStore(); str.setSFile(replFiles[i]); - if (!mro.isMapDone()) { - mro.mapPlan.addAsLeaf(str); - mro.setMapDoneSingle(true); + + Configuration conf = + ConfigurationUtil.toConfiguration(pigContext.getProperties()); + boolean combinable = !conf.getBoolean("pig.noSplitCombination", false); + + if (!mro.isMapDone()) { + if (combinable && hasTooManyInputFiles(mro, conf)) { + POStore tmpSto = getStore(); + FileSpec fSpec = getTempFileSpec(); + tmpSto.setSFile(fSpec); + mro.mapPlan.addAsLeaf(tmpSto); + mro.setMapDoneSingle(true); + MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); + MRPlan.connect(catMROp, curMROp); + } else { + mro.mapPlan.addAsLeaf(str); + mro.setMapDoneSingle(true); + MRPlan.connect(mro, curMROp); + } } else if (mro.isMapDone() && !mro.isReduceDone()) { - mro.reducePlan.addAsLeaf(str); - mro.setReduceDone(true); + if (combinable && (mro.requestedParallelism >= frJoinFileMergeThreshold)) { + POStore tmpSto = getStore(); + FileSpec fSpec = getTempFileSpec(); + tmpSto.setSFile(fSpec); + mro.reducePlan.addAsLeaf(tmpSto); + mro.setReduceDone(true); + MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); + MRPlan.connect(catMROp, curMROp); + } else { + mro.reducePlan.addAsLeaf(str); + mro.setReduceDone(true); + MRPlan.connect(mro, curMROp); + } } else { int errCode = 2022; String msg = "Both map and reduce phases have been done. This is unexpected while compiling."; throw new PlanException(msg, errCode, PigException.BUG); - } - MRPlan.connect(compiledInputs[i], curMROp); + } } if (!curMROp.isMapDone()) { @@ -1147,7 +1201,92 @@ public class MRCompiler extends PhyPlanV throw new MRCompilerException(msg, errCode, PigException.BUG, e); } } - + + @SuppressWarnings("unchecked") + private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) { + if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) { + return false; + } + + if (mro instanceof NativeMapReduceOper) { + return frJoinOptimisticFileMerge ? false : true; + } + + PhysicalPlan mapPlan = mro.mapPlan; + + List<PhysicalOperator> roots = mapPlan.getRoots(); + if (roots == null || roots.size() == 0) return false; + + int numFiles = 0; + boolean ret = false; + try { + for (PhysicalOperator root : roots) { + POLoad ld = (POLoad) root; + String location = ld.getLFile().getFileName(); + URI uri = new URI(location); + if (uri.getScheme() == null + || uri.getScheme().equalsIgnoreCase("hdfs")) { + Path p = new Path(location); + FileSystem fs = p.getFileSystem(conf); + if (fs.exists(p)) { + LoadFunc loader = (LoadFunc) PigContext + .instantiateFuncFromSpec(ld.getLFile() + .getFuncSpec()); + Job job = new Job(conf); + loader.setLocation(location, job); + InputFormat inf = loader.getInputFormat(); + List<InputSplit> splits = inf.getSplits(new JobContext( + job.getConfiguration(), job.getJobID())); + List<List<InputSplit>> results = MapRedUtil + .getCombinePigSplits(splits, fs + .getDefaultBlockSize(), conf); + numFiles += results.size(); + } else { + List<MapReduceOper> preds = MRPlan.getPredecessors(mro); + if (preds != null && preds.size() == 1) { + MapReduceOper pred = preds.get(0); + if (!pred.reducePlan.isEmpty()) { + numFiles += pred.requestedParallelism; + } else { // map-only job + ret = hasTooManyInputFiles(pred, conf); + break; + } + } else if (!frJoinOptimisticFileMerge) { + // can't determine the number of input files. + // Treat it as having too manyfiles + numFiles = frJoinFileMergeThreshold; + break; + } + } + } + } + } catch (IOException e) { + LOG.warn("failed to get number of input files", e); + } catch (URISyntaxException e) { + LOG.warn("failed to get number of input files", e); + } catch (InterruptedException e) { + LOG.warn("failed to get number of input files", e); + } + + LOG.info("number of input files to FR Join: " + numFiles); + return ret ? true : (numFiles >= frJoinFileMergeThreshold); + } + + /* + * Use Mult File Combiner to concatenate small input files + */ + private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, POStore str) + throws PlanException, ExecException { + + MapReduceOper mro = startNew(fSpec, old); + mro.mapPlan.addAsLeaf(str); + mro.setMapDone(true); + + LOG.info("Insert a concatenate job for FR join"); + + return mro; + } + /** Leftmost relation is referred as base relation (this is the one fed into mappers.) * First, close all MROpers except for first one (referred as baseMROPer) * Then, create a MROper which will do indexing job (idxMROper) Added: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=991008&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Mon Aug 30 22:19:50 2010 @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; + +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.utils.TestHelper; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.PigStats.JobGraph; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFRJoin2 { + + private static MiniCluster cluster = MiniCluster.buildCluster(); + + private static final String INPUT_DIR = "frjoin"; + private static final String INPUT_FILE = "input"; + + private static final int FILE_MERGE_THRESHOLD = 5; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(new Path(INPUT_DIR)); + int LOOP_SIZE = 2; + for (int i=0; i<FILE_MERGE_THRESHOLD; i++) { + String[] input = new String[2*LOOP_SIZE]; + for (int n=0; n<LOOP_SIZE; n++) { + for (int j=0; j<LOOP_SIZE;j++) { + input[n*LOOP_SIZE + j] = i + "\t" + j; + } + } + Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input); + } + + String[] input2 = new String[2*(LOOP_SIZE/2)]; + int k = 0; + for (int i=1; i<=LOOP_SIZE/2; i++) { + String si = i + ""; + for (int j=0; j<=LOOP_SIZE/2; j++) { + input2[k++] = si + "\t" + j; + } + } + Util.createInputFile(cluster, INPUT_FILE, input2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + cluster.shutDown(); + } + + @Test + public void testConcatenateJob() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + + pigServer.registerQuery("C = join A by y, B by y using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + + // In this case, multi-file-combiner is used so there is no need to add + // a concatenate job + assertEquals(2, PigStats.get().getJobGraph().size()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + + pigServer.registerQuery("C = join A by y, B by y using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + + assertEquals(2, PigStats.get().getJobGraph().size()); + } + + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + @Test + public void testTooManyReducers() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";"); + pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + dbfrj.add(t); + } + + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(3, jGraph.size()); + // find added map-only concatenate job + JobStats js = (JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0); + assertEquals(1, js.getNumberMaps()); + assertEquals(0, js.getNumberReduces()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + dbshj.add(t); + } + assertEquals(2, PigStats.get().getJobGraph().size()); + } + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + @Test + public void testUnknownNumMaps() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("B = Filter A by x < 50;"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + // In this case, multi-file-combiner is used in grandparent job + // so there is no need to add a concatenate job + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(2, jGraph.size()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + assertEquals(2, PigStats.get().getJobGraph().size()); + } + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + @Test + public void testUnknownNumMaps2() throws Exception { + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("C = join A by x, B by x using 'repl';"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.getPigContext().getProperties().setProperty( + MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD)); + pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + // In this case, multi-file-combiner is used in grandparent job + // so there is no need to add a concatenate job + JobGraph jGraph = PigStats.get().getJobGraph(); + assertEquals(3, jGraph.size()); + } + { + pigServer.getPigContext().getProperties().setProperty( + "pig.noSplitCombination", "true"); + pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + assertEquals(3, PigStats.get().getJobGraph().size()); + } + assertEquals(dbfrj.size(), dbshj.size()); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + +}