Author: thejas Date: Wed Sep 29 15:46:05 2010 New Revision: 1002692 URL: http://svn.apache.org/viewvc?rev=1002692&view=rev Log: PIG-1649: FRJoin fails to compute number of input files for replicated input
Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1002692&r1=1002691&r2=1002692&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Wed Sep 29 15:46:05 2010 @@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1649: FRJoin fails to compute number of input files for replicated input (thejas) + PIG-1637: Combiner not use because optimizor inserts a foreach between group and algebric function (daijy) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1002692&r1=1002691&r2=1002692&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 29 15:46:05 2010 @@ -47,6 +47,7 @@ import org.apache.hadoop.util.RunJar; import org.apache.pig.ComparisonFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; +import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; @@ -88,6 +89,7 @@ import org.apache.pig.impl.util.JarManag import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.UriUtil; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.ScriptState; @@ -709,13 +711,23 @@ public class JobControlCompiler{ long size = 0; FileSystem fs = FileSystem.get(conf); for (String input : inputs){ - Path path = new Path(input); - String schema = path.toUri().getScheme(); - if (schema==null || schema.equalsIgnoreCase("hdfs") || schema.equalsIgnoreCase("file")){ - FileStatus[] status=fs.globStatus(new Path(input)); + //Using custom uri parsing because 'new Path(location).toUri()' fails + // for some valid uri's (eg jdbc style), and 'new Uri(location)' fails + // for valid hdfs paths that contain curly braces + if(UriUtil.isHDFSFileOrLocal(input)){ + //skip if it is not hdfs or local file + continue; + } + //the input file location might be a list of comma separeated files, + // separate them out + for(String location : LoadFunc.getPathStrings(input)){ + if(! UriUtil.isHDFSFileOrLocal(location)){ + continue; + } + FileStatus[] status=fs.globStatus(new Path(location)); if (status != null){ for (FileStatus s : status){ - size += getPathLength(fs, s); + size += getPathLength(fs, s); } } } Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1002692&r1=1002691&r2=1002692&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Sep 29 15:46:05 2010 @@ -19,8 +19,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -34,7 +32,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; @@ -80,7 +77,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; @@ -98,7 +94,6 @@ import org.apache.pig.impl.builtin.Poiss import org.apache.pig.impl.builtin.RandomSampleLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; -import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -112,7 +107,9 @@ import org.apache.pig.impl.util.Compiler import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UriUtil; import org.apache.pig.impl.util.Utils; +import org.mortbay.util.URIUtil; /** * The compiler that compiles a given physical plan @@ -1279,48 +1276,54 @@ public class MRCompiler extends PhyPlanV try { for (PhysicalOperator root : roots) { POLoad ld = (POLoad) root; - String location = ld.getLFile().getFileName(); - URI uri = new URI(location); - if (uri.getScheme() == null - || uri.getScheme().equalsIgnoreCase("hdfs")) { - Path p = new Path(location); - FileSystem fs = p.getFileSystem(conf); - if (fs.exists(p)) { - LoadFunc loader = (LoadFunc) PigContext - .instantiateFuncFromSpec(ld.getLFile() - .getFuncSpec()); - Job job = new Job(conf); - loader.setLocation(location, job); - InputFormat inf = loader.getInputFormat(); - List<InputSplit> splits = inf.getSplits(new JobContext( - job.getConfiguration(), job.getJobID())); - List<List<InputSplit>> results = MapRedUtil - .getCombinePigSplits(splits, fs - .getDefaultBlockSize(), conf); - numFiles += results.size(); - } else { - List<MapReduceOper> preds = MRPlan.getPredecessors(mro); - if (preds != null && preds.size() == 1) { - MapReduceOper pred = preds.get(0); - if (!pred.reducePlan.isEmpty()) { - numFiles += pred.requestedParallelism; - } else { // map-only job - ret = hasTooManyInputFiles(pred, conf); + String fileName = ld.getLFile().getFileName(); + + if(UriUtil.isHDFSFile(fileName)){ + // Only if the input is an hdfs file, this optimization is + // useful (to reduce load on namenode) + + //separate out locations separated by comma + String [] locations = LoadFunc.getPathStrings(fileName); + for(String location : locations){ + if(!UriUtil.isHDFSFile(location)) + continue; + Path path = new Path(location); + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + LoadFunc loader = (LoadFunc) PigContext + .instantiateFuncFromSpec(ld.getLFile() + .getFuncSpec()); + Job job = new Job(conf); + loader.setLocation(location, job); + InputFormat inf = loader.getInputFormat(); + List<InputSplit> splits = inf.getSplits(new JobContext( + job.getConfiguration(), job.getJobID())); + List<List<InputSplit>> results = MapRedUtil + .getCombinePigSplits(splits, fs + .getDefaultBlockSize(), conf); + numFiles += results.size(); + } else { + List<MapReduceOper> preds = MRPlan.getPredecessors(mro); + if (preds != null && preds.size() == 1) { + MapReduceOper pred = preds.get(0); + if (!pred.reducePlan.isEmpty()) { + numFiles += pred.requestedParallelism; + } else { // map-only job + ret = hasTooManyInputFiles(pred, conf); + break; + } + } else if (!optimisticFileConcatenation) { + // can't determine the number of input files. + // Treat it as having too manyfiles + numFiles = fileConcatenationThreshold; break; } - } else if (!optimisticFileConcatenation) { - // can't determine the number of input files. - // Treat it as having too manyfiles - numFiles = fileConcatenationThreshold; - break; } } } } } catch (IOException e) { LOG.warn("failed to get number of input files", e); - } catch (URISyntaxException e) { - LOG.warn("failed to get number of input files", e); } catch (InterruptedException e) { LOG.warn("failed to get number of input files", e); } Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java?rev=1002692&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java Wed Sep 29 15:46:05 2010 @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class UriUtil { + public static boolean isHDFSFile(String uri){ + if(uri == null) + return false; + if(uri.startsWith("/") || uri.startsWith("hdfs:")){ + return true; + } + return false; + } + + public static boolean isHDFSFileOrLocal(String uri){ + if(uri == null) + return false; + if(uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("file:")){ + return true; + } + return false; + } + +} Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java?rev=1002692&r1=1002691&r2=1002692&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java Wed Sep 29 15:46:05 2010 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import java.io.File; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; @@ -47,8 +48,14 @@ public class TestFRJoin2 { private static final int FILE_MERGE_THRESHOLD = 5; private static final int MIN_FILE_MERGE_THRESHOLD = 1; + //contents of input dir joined by comma + private static String concatINPUT_DIR = null; + private File logFile; + + @BeforeClass public static void setUpBeforeClass() throws Exception { + StringBuilder strBuilder = new StringBuilder(); FileSystem fs = cluster.getFileSystem(); fs.mkdirs(new Path(INPUT_DIR)); int LOOP_SIZE = 2; @@ -59,9 +66,14 @@ public class TestFRJoin2 { input[n*LOOP_SIZE + j] = i + "\t" + (j + n); } } - Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input); + String newFile = INPUT_DIR + "/part-0000" + i; + Util.createInputFile(cluster, newFile, input); + strBuilder.append(newFile); + strBuilder.append(","); } - + strBuilder.deleteCharAt(strBuilder.length() - 1); + concatINPUT_DIR = strBuilder.toString(); + String[] input2 = new String[2*(LOOP_SIZE/2)]; int k = 0; for (int i=1; i<=LOOP_SIZE/2; i++) { @@ -132,11 +144,12 @@ public class TestFRJoin2 { // a Map-only job @Test public void testConcatenateJobForScalar2() throws Exception { + logFile = Util.resetLog(MRCompiler.class, logFile); PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster .getProperties()); pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); - pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "/{part-00*}" +"' as (x:int,y:int);"); pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); @@ -157,6 +170,10 @@ public class TestFRJoin2 { JobStats js = (JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0); assertEquals(1, js.getNumberMaps()); assertEquals(0, js.getNumberReduces()); + Util.checkLogFileMessage(logFile, + new String[] {"number of input files: 0", "failed to get number of input files"}, + false + ); } { pigServer.getPigContext().getProperties().setProperty( @@ -224,11 +241,12 @@ public class TestFRJoin2 { @Test public void testConcatenateJobForFRJoin() throws Exception { + logFile = Util.resetLog(MRCompiler.class, logFile); PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster .getProperties()); pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); - pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "/{part-00*}" + "' as (x:int,y:int);"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { @@ -243,6 +261,11 @@ public class TestFRJoin2 { } assertEquals(3, PigStats.get().getJobGraph().size()); + Util.checkLogFileMessage(logFile, + new String[] {"number of input files: 0", "failed to get number of input files"}, + false + ); + } { pigServer.getPigContext().getProperties().setProperty( @@ -259,7 +282,7 @@ public class TestFRJoin2 { } assertEquals(dbfrj.size(), dbshj.size()); - assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } @Test @@ -309,7 +332,7 @@ public class TestFRJoin2 { public void testUnknownNumMaps() throws Exception { PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);"); + pigServer.registerQuery("A = LOAD '" + concatINPUT_DIR + "' as (x:int,y:int);"); pigServer.registerQuery("B = Filter A by x < 50;"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); { Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java?rev=1002692&r1=1002691&r2=1002692&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Wed Sep 29 15:46:05 2010 @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; @@ -51,8 +52,11 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.SimpleLayout; import org.apache.pig.ExecType; -import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -75,8 +79,6 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.logicalLayer.parser.QueryParser; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.util.LogUtils; import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor; import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter; import org.apache.pig.newplan.logical.optimizer.SchemaResetter; @@ -738,4 +740,64 @@ public class Util { reader.close(); return tuples; } + + /** + * Delete the existing logFile for the class and set the logging to a + * use a new log file and set log level to DEBUG + * @param clazz class for which the log file is being set + * @param logFile current log file + * @return new log file + * @throws Exception + */ + public static File resetLog(Class<?> clazz, File logFile) throws Exception { + if (logFile != null) + logFile.delete(); + Logger logger = Logger.getLogger(clazz); + logger.removeAllAppenders(); + logger.setLevel(Level.DEBUG); + SimpleLayout layout = new SimpleLayout(); + File newLogFile = File.createTempFile("log", ""); + FileAppender appender = new FileAppender(layout, newLogFile.toString(), + false, false, 0); + logger.addAppender(appender); + return newLogFile; + } + + /** + * Check if logFile (does not/)contains the given list of messages. + * @param logFile + * @param messages + * @param expected if true, the messages are expected in the logFile, + * otherwise messages should not be there in the log + */ + public static void checkLogFileMessage(File logFile, String[] messages, boolean expected) { + BufferedReader reader = null; + + try { + reader = new BufferedReader(new FileReader(logFile)); + String logMessage = ""; + String line; + while ((line = reader.readLine()) != null) { + logMessage = logMessage + line + "\n"; + } + for (int i = 0; i < messages.length; i++) { + boolean present = logMessage.contains(messages[i]); + if (expected) { + if(!present){ + fail("The message " + messages[i] + " is not present in" + + "log file contents: " + logMessage); + } + }else{ + if(present){ + fail("The message " + messages[i] + " is present in" + + "log file contents: " + logMessage); + } + } + } + return ; + } + catch (IOException e) { + fail("caught exception while checking log message :" + e); + } + } }