Author: daijy Date: Sat Aug 28 08:11:57 2010 New Revision: 990323 URL: http://svn.apache.org/viewvc?rev=990323&view=rev Log: PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-8.patch)
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java?rev=990323&r1=990322&r2=990323&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java Sat Aug 28 08:11:57 2010 @@ -32,15 +32,20 @@ import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; import org.apache.pig.newplan.OperatorSubPlan; import org.apache.pig.newplan.ReverseDependencyOrderWalker; +import org.apache.pig.newplan.logical.expression.LogicalExpression; import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor; import org.apache.pig.newplan.logical.expression.MapLookupExpression; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor; +import org.apache.pig.newplan.logical.relational.LOCogroup; import org.apache.pig.newplan.logical.relational.LOFilter; import org.apache.pig.newplan.logical.relational.LOGenerate; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.pig.newplan.logical.relational.LOLoad; import org.apache.pig.newplan.logical.relational.LOSort; +import org.apache.pig.newplan.logical.relational.LOStore; +import org.apache.pig.newplan.logical.relational.LOUnion; import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; @@ -106,14 +111,11 @@ public class MapKeysPruneHelper { MapMarker marker = new MapMarker(currentPlan); marker.visit(); - // Get all Uids from Sinks - List<Operator> sinks = currentPlan.getSinks(); - Set<Long> sinkMapUids = new HashSet<Long>(); - for( Operator sink : sinks ) { - LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema(); - sinkMapUids.addAll( getMapUids( schema ) ); - } - + // If the uid is the input uid of LOStore, LOCogroup, LOUnion, UserFunc, that means + // the entire map may be used. For simplicity, we do not prune any map key in this case + Set<Long> fullMapUids = new HashSet<Long>(); + FullMapCollector collector = new FullMapCollector(currentPlan, fullMapUids); + collector.visit(); // If we have found specific keys which are needed then we return true; // Else if we dont have any specific keys we return false @@ -123,12 +125,12 @@ public class MapKeysPruneHelper { (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS); // Now for all full maps found in sinks we cannot prune them at source - if( ! sinkMapUids.isEmpty() && annotationValue != null && + if( ! fullMapUids.isEmpty() && annotationValue != null && !annotationValue.isEmpty() ) { Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] ); LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema(); for( Integer col : annotationKeyArray ) { - if( sinkMapUids.contains(sourceSchema.getField(col).uid)) { + if( fullMapUids.contains(sourceSchema.getField(col).uid)) { annotationValue.remove( col ); } } @@ -172,13 +174,11 @@ public class MapKeysPruneHelper { * @param schema Schema having fields * @return */ - private Set<Long> getMapUids(LogicalSchema schema ) { + private static Set<Long> getMapUids(LogicalSchema schema ) { Set<Long> uids = new HashSet<Long>(); if( schema != null ) { for( LogicalFieldSchema field : schema.getFields() ) { - if( field.type == DataType.MAP ) { - uids.add( field.uid ); - } + uids.add( field.uid ); } } return uids; @@ -188,7 +188,6 @@ public class MapKeysPruneHelper { return subplan; } - /** * This class collects all the information required to create * the list of keys required for a map @@ -298,4 +297,72 @@ public class MapKeysPruneHelper { } } } + + static public class FullMapCollector extends AllExpressionVisitor { + Set<Long> fullMapUids = new HashSet<Long>(); + + protected FullMapCollector(OperatorPlan plan, Set<Long> fullMapUids) throws FrontendException { + super(plan, new DependencyOrderWalker(plan)); + this.fullMapUids = fullMapUids; + } + + @Override + public void visit(LOStore store) throws FrontendException { + super.visit(store); + Set<Long> uids = getMapUids(store.getSchema()); + fullMapUids.addAll(uids); + } + + @SuppressWarnings("unchecked") + @Override + public void visit(LOUnion union) throws FrontendException { + super.visit(union); + List<Operator> preds = plan.getPredecessors(union); + if (preds!=null) { + for (Operator pred : preds) { + LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema(); + Set<Long> uids = getMapUids(schema); + fullMapUids.addAll(uids); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void visit(LOCogroup cogroup) throws FrontendException { + super.visit(cogroup); + List<Operator> preds = plan.getPredecessors(cogroup); + if (preds!=null) { + for (Operator pred : preds) { + LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema(); + Set<Long> uids = getMapUids(schema); + fullMapUids.addAll(uids); + } + } + } + + @Override + protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) + throws FrontendException { + return new FullMapExpCollector(expr, fullMapUids); + } + + static class FullMapExpCollector extends LogicalExpressionVisitor { + Set<Long> fullMapUids = new HashSet<Long>(); + protected FullMapExpCollector(OperatorPlan plan, Set<Long> fullMapUids) + throws FrontendException { + super(plan, new DependencyOrderWalker(plan)); + this.fullMapUids = fullMapUids; + } + + @Override + public void visit(UserFuncExpression userFunc) throws FrontendException { + List<Operator> succs = userFunc.getPlan().getSuccessors(userFunc); + if (succs==null) return; + LogicalExpression succ = (LogicalExpression)succs.get(0); + if (succ.getFieldSchema()!=null && succ.getFieldSchema().type==DataType.MAP) + fullMapUids.add(succ.getFieldSchema().uid); + } + } + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=990323&r1=990322&r2=990323&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Sat Aug 28 08:11:57 2010 @@ -1193,7 +1193,7 @@ public class TestPruneColumn extends Tes assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0", "Map key required for A: $1->[key2, key1]"})); } - /* + @Test public void testMapKey3() throws Exception { pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); @@ -1211,7 +1211,7 @@ public class TestPruneColumn extends Tes assertFalse(iter.hasNext()); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"})); - }*/ + } @Test public void testMapKey4() throws Exception {