Author: yanz Date: Mon May 17 22:16:52 2010 New Revision: 945405 URL: http://svn.apache.org/viewvc?rev=945405&view=rev Log: PIG-1421 Name node calls made by each mapper (xuefuz via yanz)
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=945405&r1=945404&r2=945405&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon May 17 22:16:52 2010 @@ -82,6 +82,8 @@ Trunk (unreleased changes) BUG FIXES + PIG-1421 Name node calls made by each mapper (xuefuz via yanz) + PIG-1342 Avoid making unnecessary name node calls for writes in Zebra (chaow via yanz) PIG-1356 TableLoader makes unnecessary calls to build a Job instance that create a new JobClient in the hadoop 0.20.9 (yanz) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=945405&r1=945404&r2=945405&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Mon May 17 22:16:52 2010 @@ -261,7 +261,6 @@ public class BasicTable { } partition.setSource(cgTuples); inferredMapping = true; - buildStatus(); } else { // the projection is not changed, so we do not need to recalculate the @@ -320,8 +319,6 @@ public class BasicTable { else cgTuples[nx] = null; } - if (schemaFile.isSorted()) - buildStatus(); closed = false; } catch (Exception e) { @@ -756,6 +753,12 @@ public class BasicTable { return schemaFile.getDeletedCGs(); } + public static String getDeletedCGs(Path path, Configuration conf) + throws IOException { + SchemaFile schF = new SchemaFile(path, new String[0], conf); + return schF.getDeletedCGs(); + } + private void buildStatus() throws IOException { status = new BasicTableStatus(); if (firstValidCG >= 0) { Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=945405&r1=945404&r2=945405&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Mon May 17 22:16:52 2010 @@ -190,9 +190,43 @@ public class TableInputFormat extends In } setInputExpr(conf, expr); } + + setDeletedCGsInConf( conf, paths ); } /** + * Temporary fix for name node call in each mapper. It sets two flags in the conf so that mapper can skip + * the work that's done here at frontend. + * + * It needs to check if the flag is already set (because setInputPaths() is also called on the backend + * thru pig code path (thru TableLoader.setLocation()). + * + * @param conf + * @param paths + */ + private static void setDeletedCGsInConf(Configuration conf, Path[] paths) { + if( !conf.get( INPUT_FE, "false" ).equals( "true" ) ) { + try { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for( Path p : paths ) { + if (first) + first = false; + else + sb.append( DELETED_CG_SEPARATOR_PER_UNION ); + sb.append( BasicTable.Reader.getDeletedCGs( p, conf ) ); + } + + conf.set(INPUT_FE, "true"); + conf.set(INPUT_DELETED_CGS, sb.toString()); + } catch(Exception ex) { + throw new RuntimeException( "Failed to find deleted column groups" + ex.toString() ); + } + } + } + + + /** * Set the input expression in the Configuration object. * * @param conf @@ -954,28 +988,17 @@ public class TableInputFormat extends In new ArrayList<BasicTableStatus>(nLeaves); try { - StringBuilder sb = new StringBuilder(); boolean sorted = expr.sortedSplitRequired(); - boolean first = true; for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) { LeafTableInfo leaf = it.next(); BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf ); reader.setProjection(leaf.getProjection()); BasicTableStatus s = reader.getStatus(); - status.add(s); + status.add(s); readers.add(reader); - if (first) - first = false; - else { - sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION); - } - sb.append(reader.getDeletedCGs()); } - conf.set(INPUT_FE, "true"); - conf.set(INPUT_DELETED_CGS, sb.toString()); - if( readers.isEmpty() ) { // I think we should throw exception here. return new ArrayList<InputSplit>(); Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=945405&r1=945404&r2=945405&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Mon May 17 22:16:52 2010 @@ -72,6 +72,15 @@ public class TableLoader extends LoadFun private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString"; private static final String UDFCONTEXT_GLOBAL_SORTING = "zebra.UDFContext.globalSorting"; + private static final String UDFCONTEXT_PATHS_STRING = "zebra.UDFContext.pathsString"; + private static final String UDFCONTEXT_INPUT_DELETED_CGS = "zebra.UDFContext.deletedcgs"; + private static final String INPUT_SORT = "mapreduce.lib.table.input.sort"; + private static final String INPUT_SPLIT_MODE = "mapreduce.lib.table.input.split_mode"; + + private static final String INPUT_FE = "mapreduce.lib.table.input.fe"; + private static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs"; + private static final String GLOBALLY_SORTED = "globally_sorted"; + private static final String LOCALLY_SORTED = "locally_sorted"; private String projectionString; @@ -190,8 +199,36 @@ public class TableLoader extends LoadFun sortInfo = TableInputFormat.getSortInfo( job ); } } - - + + /** + * This is a light version of setSortOrder, which is called in getLocation(), which is also called at backend. + * We need to do this to avoid name node call in mappers. Original setSortOrder() is stilled called (in + * getSchema()), so it's okay to skip the checks that are performed when setSortOrder() is called. + * + * This will go away once we have a better solution. + * + * @param job + * @throws IOException + */ + private void setSortOrderLight(Job job) throws IOException { + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + boolean requireGlobalOrder = "true".equals(properties.getProperty( UDFCONTEXT_GLOBAL_SORTING)); + if (requireGlobalOrder && !sorted) + throw new IOException("Global sorting can be only asked on table loaded as sorted"); + if( sorted ) { + SplitMode splitMode = + requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : SplitMode.LOCALLY_SORTED; + + Configuration conf = job.getConfiguration(); + conf.setBoolean(INPUT_SORT, true); + if (splitMode == SplitMode.GLOBALLY_SORTED) + conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED); + else if (splitMode == SplitMode.LOCALLY_SORTED) + conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED); + } + } + /** * This method sets projection. * @@ -289,16 +326,50 @@ public class TableLoader extends LoadFun throw new IOException( "Invalid object type passed to TableLoader" ); } + /** + * This method is called by pig on both frontend and backend. + */ @Override public void setLocation(String location, Job job) throws IOException { - Path[] paths = getPathsFromLocation( location, job ); - TableInputFormat.setInputPaths( job, paths ); + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + // Retrieve paths from UDFContext to avoid name node call in mapper. + String pathString = properties.getProperty( UDFCONTEXT_PATHS_STRING ); + Path[]paths = deserializePaths( pathString ); + + // Retrieve deleted column group information to avoid name node call in mapper. + String deletedCGs = properties.getProperty( UDFCONTEXT_INPUT_DELETED_CGS ); + job.getConfiguration().set(INPUT_FE, "true"); + job.getConfiguration().set(INPUT_DELETED_CGS, deletedCGs ); + + TableInputFormat.setInputPaths( job, paths ); + // The following obviously goes beyond of set location, but this is the only place that we // can do and it's suggested by Pig team. - setSortOrder( job ); + setSortOrderLight( job ); setProjection( job ); } + + private static String serializePaths(Path[] paths) { + StringBuilder sb = new StringBuilder(); + for( int i = 0; i < paths.length; i++ ) { + sb.append( paths[i].toString() ); + if( i < paths.length -1 ) { + sb.append( ";" ); + } + } + return sb.toString(); + } + + private static Path[] deserializePaths(String val) { + String[] paths = val.split( ";" ); + Path[] result = new Path[paths.length]; + for( int i = 0; i < paths.length; i++ ) { + result[i] = new Path( paths[i] ); + } + return result; + } @SuppressWarnings("unchecked") @Override @@ -314,9 +385,19 @@ public class TableLoader extends LoadFun @Override public ResourceSchema getSchema(String location, Job job) throws IOException { - Path[] paths = getPathsFromLocation( location, job); + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + + // Save the paths in UDFContext so that it can be retrieved in setLocation(). + Path[] paths = getPathsFromLocation( location, job ); + properties.setProperty( UDFCONTEXT_PATHS_STRING, serializePaths( paths ) ); + TableInputFormat.setInputPaths( job, paths ); + // Save the deleted column group information in UDFContext so that it can be used in setLocation(). + // Property INPUT_DELETED_CGS is set in TableInputFormat.setInputPaths(). We just retrieve it here. + properties.setProperty( UDFCONTEXT_INPUT_DELETED_CGS, job.getConfiguration().get(INPUT_DELETED_CGS) ); + Schema tableSchema = null; if( paths.length == 1 ) { tableSchema = BasicTable.Reader.getSchema( paths[0], job.getConfiguration() );