Author: daijy Date: Sun Sep 26 21:23:38 2010 New Revision: 1001524 URL: http://svn.apache.org/viewvc?rev=1001524&view=rev Log: PIG-1644: New logical plan: Plan.connect with position is misused in some places
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Sun Sep 26 21:23:38 2010 @@ -207,6 +207,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1644: New logical plan: Plan.connect with position is misused in some +places (daijy) + PIG-1643: join fails for a query with input having 'load using pigstorage without schema' + 'foreach' (daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java Sun Sep 26 21:23:38 2010 @@ -316,5 +316,118 @@ public abstract class BaseOperatorPlan i return ""; } return os.toString(); - } + } + + @Override + public void replace(Operator oldOperator, Operator newOperator) throws FrontendException { + add(newOperator); + + List<Operator> preds = getPredecessors(oldOperator); + if (preds!=null) { + List<Operator> predsCopy = new ArrayList<Operator>(); + predsCopy.addAll(preds); + for (int i=0;i<predsCopy.size();i++) { + Operator pred = predsCopy.get(i); + Pair<Integer, Integer> pos = disconnect(pred, oldOperator); + connect(pred, pos.first, newOperator, i); + } + } + + List<Operator> succs = getSuccessors(oldOperator); + if (succs!=null) { + List<Operator> succsCopy = new ArrayList<Operator>(); + succsCopy.addAll(succs); + for (int i=0;i<succsCopy.size();i++) { + Operator succ = succsCopy.get(i); + Pair<Integer, Integer> pos = disconnect(oldOperator, succ); + connect(newOperator, i, succ, pos.second); + } + } + + remove(oldOperator); + } + + // We assume if node has multiple inputs, it only has one output; + // if node has multiple outputs, it only has one input. + // Otherwise, we don't know how to connect inputs to outputs. + // This assumption is true for logical plan/physical plan, and most MR plan + @Override + public void removeAndReconnect(Operator operatorToRemove) throws FrontendException { + List<Operator> predsCopy = null; + if (getPredecessors(operatorToRemove)!=null && getPredecessors(operatorToRemove).size()!=0) { + predsCopy = new ArrayList<Operator>(); + predsCopy.addAll(getPredecessors(operatorToRemove)); + } + + List<Operator> succsCopy = null; + if (getSuccessors(operatorToRemove)!=null && getSuccessors(operatorToRemove).size()!=0) { + succsCopy = new ArrayList<Operator>(); + succsCopy.addAll(getSuccessors(operatorToRemove)); + } + + if (predsCopy!=null && predsCopy.size()>1 && succsCopy!=null && succsCopy.size()>1) { + throw new FrontendException("Cannot remove and reconnect node with multiple inputs/outputs", 2256); + } + + if (predsCopy!=null && predsCopy.size()>1) { + // node has multiple inputs, it can only has one output (or no output) + // reconnect inputs to output + Operator succ = null; + Pair<Integer, Integer> pos2 = null; + if (succsCopy!=null) { + succ = succsCopy.get(0); + pos2 = disconnect(operatorToRemove, succ); + } + for (Operator pred : predsCopy) { + Pair<Integer, Integer> pos1 = disconnect(pred, operatorToRemove); + if (succ!=null) { + connect(pred, pos1.first, succ, pos2.second); + } + } + } else if (succsCopy!=null && succsCopy.size()>1) { + // node has multiple outputs, it can only has one output (or no output) + // reconnect input to outputs + Operator pred = null; + Pair<Integer, Integer> pos1 = null; + if (predsCopy!=null) { + pred = predsCopy.get(0); + pos1 = disconnect(pred, operatorToRemove); + } + for (Operator succ : succsCopy) { + Pair<Integer, Integer> pos2 = disconnect(operatorToRemove, succ); + if (pred!=null) { + connect(pred, pos1.first, succ, pos2.second); + } + } + } else { + // Only have one input/output + Operator pred = null; + Pair<Integer, Integer> pos1 = null; + if (predsCopy!=null) { + pred = predsCopy.get(0); + pos1 = disconnect(pred, operatorToRemove); + } + + Operator succ = null; + Pair<Integer, Integer> pos2 = null; + if (succsCopy!=null) { + succ = succsCopy.get(0); + pos2 = disconnect(operatorToRemove, succ); + } + + if (pred!=null && succ!=null) { + connect(pred, pos1.first, succ, pos2.second); + } + } + + remove(operatorToRemove); + } + + @Override + public void insertBetween(Operator pred, Operator operatorToInsert, Operator succ) throws FrontendException { + add(operatorToInsert); + Pair<Integer, Integer> pos = disconnect(pred, succ); + connect(pred, pos.first, operatorToInsert, 0); + connect(operatorToInsert, 0, succ, pos.second); + } } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java Sun Sep 26 21:23:38 2010 @@ -160,4 +160,30 @@ public interface OperatorPlan { * @throws FrontendException */ public boolean isEqual( OperatorPlan other ) throws FrontendException; + + /** + * This method replace the oldOperator with the newOperator, make all connection + * to the new operator in the place of old operator + * @param oldOperator operator to be replaced + * @param newOperator operator to replace + * @throws FrontendException + */ + public void replace(Operator oldOperator, Operator newOperator) throws FrontendException; + + /** + * This method remove a node operatorToRemove. It also Connect all its successors to + * predecessor/connect all it's predecessors to successor + * @param operatorToRemove operator to remove + * @throws FrontendException + */ + public void removeAndReconnect(Operator operatorToRemove) throws FrontendException; + + /** + * This method insert node operatorToInsert between pred and succ. Both pred and succ cannot be null + * @param pred predecessor of inserted node after this method + * @param operatorToInsert operato to insert + * @param succ successor of inserted node after this method + * @throws FrontendException + */ + public void insertBetween(Operator pred, Operator operatorToInsert, Operator succ) throws FrontendException; } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java Sun Sep 26 21:23:38 2010 @@ -162,7 +162,7 @@ public class OperatorSubPlan implements @Override public void removeSoftLink(Operator from, Operator to) { - throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan"); + throw new UnsupportedOperationException("removeSoftLink() can not be called on OperatorSubPlan"); } @Override @@ -173,5 +173,26 @@ public class OperatorSubPlan implements @Override public List<Operator> getSoftLinkSuccessors(Operator op) { return basePlan.getSoftLinkSuccessors(op); - } + } + + @Override + public void insertBetween(Operator pred, Operator operatorToInsert, Operator succ) + throws FrontendException { + throw new UnsupportedOperationException("insertBetween() can not be called on OperatorSubPlan"); + + } + + @Override + public void removeAndReconnect(Operator operatorToRemove) + throws FrontendException { + throw new UnsupportedOperationException("removeAndReconnect() can not be called on OperatorSubPlan"); + + } + + @Override + public void replace(Operator oldOperator, Operator newOperator) + throws FrontendException { + throw new UnsupportedOperationException("replace() can not be called on OperatorSubPlan"); + + } } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Sun Sep 26 21:23:38 2010 @@ -86,9 +86,7 @@ public class ForeachInnerPlanVisitor ext org.apache.pig.newplan.Operator newPred = innerOpsMap.get(pred); if (newPred.getPlan().getSuccessors(newPred)!=null) { org.apache.pig.newplan.Operator newSucc = newOp.getPlan().getSuccessors(newPred).get(0); - Pair<Integer, Integer> pair = newOp.getPlan().disconnect(newPred, newSucc); - newOp.getPlan().connect(newPred, newOp); - newOp.getPlan().connect(newOp, pair.first, newSucc, pair.second); + newOp.getPlan().insertBetween(newPred, newOp, newSucc); } else { newOp.getPlan().connect(newPred, newOp); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java Sun Sep 26 21:23:38 2010 @@ -115,11 +115,11 @@ public class Util { List<Operator> next = plan.getSuccessors(op); if (next != null) { LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(branch); - Pair<Integer, Integer> pos = plan.disconnect(op, nextOp); - plan.connect(foreach, pos.first, nextOp, pos.second); + plan.insertBetween(op, foreach, nextOp); + } + else { + plan.connect(op, foreach); } - - plan.connect(op, foreach); LogicalPlan innerPlan = new LogicalPlan(); foreach.setInnerPlan(innerPlan); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Sun Sep 26 21:23:38 2010 @@ -171,9 +171,7 @@ public class ColumnPruneVisitor extends // add foreach to the base plan p.add(foreach); - Pair<Integer,Integer> disconnectedPos = p.disconnect(load, next); - p.connect(load, disconnectedPos.first.intValue(), foreach, 0 ); - p.connect(foreach, 0, next, disconnectedPos.second.intValue()); + p.insertBetween(load, foreach, next); LogicalPlan innerPlan = new LogicalPlan(); foreach.setInnerPlan(innerPlan); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Sun Sep 26 21:23:38 2010 @@ -260,13 +260,8 @@ public class FilterAboveForeach extends * -- And ForEach is FilterPred */ - Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach); - Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter); - Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc); - - currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second); - currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second); - currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second); + currentPlan.removeAndReconnect(filter); + currentPlan.insertBetween(forEachPred, filter, foreach); subPlan.add(forEachPred); subPlan.add(foreach); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Sun Sep 26 21:23:38 2010 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; import org.apache.pig.newplan.logical.relational.LOCogroup; import org.apache.pig.newplan.logical.relational.LOCross; import org.apache.pig.newplan.logical.relational.LODistinct; @@ -118,16 +119,11 @@ public class LimitOptimizer extends Rule if (pred instanceof LOForEach) { // We can safely move LOLimit up - // Get operator before LOFilter + // Get operator before LOForEach Operator prepredecessor = currentPlan.getPredecessors(pred) - .get(0); - Operator succ = currentPlan.getSuccessors(limit).get(0); - currentPlan.disconnect(prepredecessor, pred); - currentPlan.disconnect(pred, limit); - currentPlan.disconnect(limit, succ); - currentPlan.connect(prepredecessor, limit); - currentPlan.connect(limit, pred); - currentPlan.connect(pred, succ); + .get(0); + currentPlan.removeAndReconnect(limit); + currentPlan.insertBetween(prepredecessor, limit, pred); } else if (pred instanceof LOCross || pred instanceof LOUnion) { // Limit can be duplicated, and the new instance pushed in front // of an operator for the following operators @@ -146,10 +142,7 @@ public class LimitOptimizer extends Rule } else { newLimit = new LOLimit((LogicalPlan) currentPlan, limit .getLimit()); - currentPlan.add(newLimit); - currentPlan.disconnect(prepredecessor, pred); - currentPlan.connect(prepredecessor, newLimit); - currentPlan.connect(newLimit, pred); + currentPlan.insertBetween(prepredecessor, newLimit, pred); } } } else if (pred instanceof LOSort) { @@ -161,11 +154,7 @@ public class LimitOptimizer extends Rule .getLimit() : limit.getLimit()); // remove the limit - Operator succ = currentPlan.getSuccessors(limit).get(0); - currentPlan.disconnect(sort, limit); - currentPlan.disconnect(limit, succ); - currentPlan.connect(sort, succ); - currentPlan.remove(limit); + currentPlan.removeAndReconnect(limit); } else if (pred instanceof LOLimit) { // Limit is merged into another LOLimit LOLimit beforeLimit = (LOLimit) pred; @@ -174,11 +163,7 @@ public class LimitOptimizer extends Rule .getLimit() : limit.getLimit()); // remove the limit - Operator succ = currentPlan.getSuccessors(limit).get(0); - currentPlan.disconnect(beforeLimit, limit); - currentPlan.disconnect(limit, succ); - currentPlan.connect(beforeLimit, succ); - currentPlan.remove(limit); + currentPlan.removeAndReconnect(limit); } else if (pred instanceof LOSplitOutput) { // Limit and OrderBy (LOSort) can be separated by split List<Operator> grandparants = currentPlan.getPredecessors(pred); @@ -197,13 +182,7 @@ public class LimitOptimizer extends Rule sort.getUserFunc()); newSort.setLimit(limit.getLimit()); - Operator succ = currentPlan.getSuccessors(limit).get(0); - currentPlan.disconnect(pred, limit); - currentPlan.disconnect(limit, succ); - currentPlan.add(newSort); - currentPlan.connect(pred, newSort); - currentPlan.connect(newSort, succ); - currentPlan.remove(limit); + currentPlan.replace(limit, newSort); } } } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java Sun Sep 26 21:23:38 2010 @@ -75,24 +75,21 @@ public class MergeFilter extends Rule { if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) { LOFilter next = (LOFilter)succeds.get(0); combineFilterCond(filter, next); - Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next); - List<Operator> ll = currentPlan.getSuccessors(next); - if (ll!= null && ll.size()>0) { - Operator op = ll.get(0); - Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op); - currentPlan.connect(filter, p1.first, op, p2.second); - subPlan.add(op); - } + List<Operator> succs = currentPlan.getSuccessors(next); + if (succs!=null && succs.size()>0) { + subPlan.add(succs.get(0)); + } + // Since we remove next, we need to merge soft link into filter List<Operator> nextSoftPreds = currentPlan.getSoftLinkPredecessors(next); if (nextSoftPreds!=null) { for (Operator softPred : nextSoftPreds) { + currentPlan.removeSoftLink(softPred, next); currentPlan.createSoftLink(softPred, filter); } } - - currentPlan.remove(next); + currentPlan.removeAndReconnect(next); } Iterator<Operator> iter = filter.getFilterPlan().getOperators(); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Sun Sep 26 21:23:38 2010 @@ -179,7 +179,7 @@ public class MergeForEach extends Rule { Operator exp2NextToSink = newExpPlan.getPredecessors(exp2Sink).get(0); Pair<Integer, Integer> pos = newExpPlan.disconnect(exp2NextToSink, exp2Sink); newExpPlan.remove(exp2Sink); - newExpPlan.connect(exp2NextToSink, pos.first, exp1Source, pos.second); + newExpPlan.connect(exp2NextToSink, pos.first, exp1Source, 0); } else { newExpPlan.remove(exp2Sink); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Sun Sep 26 21:23:38 2010 @@ -42,6 +42,7 @@ import org.apache.pig.newplan.PColFilter import org.apache.pig.newplan.optimizer.Rule; import org.apache.pig.newplan.optimizer.Transformer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; public class PartitionFilterOptimizer extends Rule { private String[] partitionKeys; @@ -157,17 +158,8 @@ public class PartitionFilterOptimizer ex } catch (IOException e) { throw new FrontendException( e ); } - if(pColFilterFinder.isFilterRemovable()) { - // remove this filter from the plan - Operator from = currentPlan.getPredecessors( loFilter ).get( 0 ); - currentPlan.disconnect( from, loFilter ); - List<Operator> succs = currentPlan.getSuccessors( loFilter ); - if( succs != null ) { - Operator to = succs.get( 0 ); - currentPlan.disconnect( loFilter, to ); - currentPlan.connect( from, to ); - } - currentPlan.remove( loFilter ); + if(pColFilterFinder.isFilterRemovable()) { + currentPlan.removeAndReconnect( loFilter ); } } } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Sun Sep 26 21:23:38 2010 @@ -211,17 +211,17 @@ public class PushDownForEachFlatten exte LOForEach foreach = (LOForEach)matched.getSources().get(0); Operator next = currentPlan.getSuccessors( foreach ).get(0); if( next instanceof LOSort ) { - Operator pred = currentPlan.getPredecessors( foreach ).get( 0 ); + currentPlan.removeAndReconnect(foreach); + List<Operator> succs = currentPlan.getSuccessors( next ); - currentPlan.disconnect( pred, foreach ); - currentPlan.disconnect( foreach, next ); - currentPlan.connect( pred, next ); - currentPlan.connect( next, foreach ); if( succs != null ) { - for( Operator succ : succs ) { - currentPlan.disconnect( next, succ ); - currentPlan.connect( foreach, succ ); + List<Operator> succsCopy = new ArrayList<Operator>(); + succsCopy.addAll(succs); + for( Operator succ : succsCopy ) { + currentPlan.insertBetween(next, foreach, succ); } + } else { + currentPlan.connect( next, foreach ); } } else if( next instanceof LOCross || next instanceof LOJoin ) { List<Operator> preds = currentPlan.getPredecessors( next ); @@ -275,9 +275,7 @@ public class PushDownForEachFlatten exte currentPlan.connect( next, newForeach ); } else { opAfterX = succs.get( 0 ); - Pair<Integer, Integer> pos = currentPlan.disconnect( next, opAfterX ); - currentPlan.connect( next, pos.first, newForeach, pos.second ); - currentPlan.connect( newForeach, opAfterX ); + currentPlan.insertBetween(next, newForeach, opAfterX); } // Finally remove flatten flags from the original foreach and regenerate schemas for those impacted. Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Sun Sep 26 21:23:38 2010 @@ -207,8 +207,6 @@ public class PushUpFilter extends Rule { Operator predecessor = this.findNonFilterPredecessor( filter ); subPlan.add( predecessor) ; - // Disconnect the filter in the plan without removing it from the plan. - Operator predec = currentPlan.getPredecessors( filter ).get( 0 ); Operator succed; if (currentPlan.getSuccessors(filter)!=null) @@ -216,14 +214,12 @@ public class PushUpFilter extends Rule { else succed = null; - Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter); - if (succed!=null) { subPlan.add(succed); - Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed); - currentPlan.connect(predec, p1.first, succed, p2.second); } + currentPlan.removeAndReconnect(filter); + if( predecessor instanceof LOSort || predecessor instanceof LODistinct || ( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) { // For sort, put the filter in front of it. @@ -322,9 +318,7 @@ public class PushUpFilter extends Rule { // Insert the filter in between the given two operators. private void insertFilter(Operator prev, Operator predecessor, LOFilter filter) throws FrontendException { - Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor ); - currentPlan.connect( prev, p3.first, filter, 0 ); - currentPlan.connect( filter, 0, predecessor, p3.second ); + currentPlan.insertBetween(prev, filter, predecessor); } // Identify those among preds that will need to have a filter between it and the predecessor. Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java Sun Sep 26 21:23:38 2010 @@ -89,9 +89,7 @@ public class SplitFilter extends Rule { if (succeds != null) { succed = succeds.get(0); subPlan.add(succed); - Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed); - currentPlan.connect(filter2, 0, succed, p.second); - currentPlan.connect(filter, p.first, filter2, 0); + currentPlan.insertBetween(filter, filter2, succed); } else { currentPlan.connect(filter, 0, filter2, 0); } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java Sun Sep 26 21:23:38 2010 @@ -120,10 +120,7 @@ public abstract class TypeCastInserter e // Insert the foreach into the plan and patch up the plan. Operator next = currentPlan.getSuccessors(op).get(0); - Pair<Integer,Integer> disconnectedPos = currentPlan.disconnect(op, next); - currentPlan.add(foreach); - currentPlan.connect(op, disconnectedPos.first.intValue(), foreach, 0 ); - currentPlan.connect(foreach, 0, next, disconnectedPos.second.intValue()); + currentPlan.insertBetween(op, foreach, next); List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>(); LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[s.size()]); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java Sun Sep 26 21:23:38 2010 @@ -46,6 +46,7 @@ import org.apache.pig.newplan.logical.ex import org.apache.pig.newplan.logical.relational.LOFilter; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LOSplit; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor; import org.apache.pig.newplan.logical.relational.LogicalSchema; @@ -1538,5 +1539,351 @@ public class TestNewPlanOperatorPlan ext assertTrue(D1.isEqual(D2)); } - + @Test + public void testReplace1() throws FrontendException { + // has multiple inputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator join1 = new SillyOperator("join1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(load2); + plan.add(filter1); + plan.add(filter2); + plan.add(join1); + plan.connect(load1, join1); + plan.connect(load2, filter1); + plan.connect(filter1, join1); + plan.connect(join1, filter2); + + Operator join2 = new SillyOperator("join2", plan); + plan.replace(join1, join2); + + List<Operator> preds = plan.getPredecessors(join2); + assert(preds.size()==2); + assert(preds.contains(load1)); + assert(preds.contains(filter1)); + + List<Operator> succs = plan.getSuccessors(join2); + assert(succs.size()==1); + assert(succs.contains(filter2)); + } + + @Test + public void testReplace2() throws FrontendException { + // has multiple outputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator split1 = new SillyOperator("split1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(split1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, split1); + plan.connect(split1, filter1); + plan.connect(split1, filter2); + + Operator split2 = new SillyOperator("split2", plan); + plan.replace(split1, split2); + + List<Operator> preds = plan.getPredecessors(split2); + assert(preds.size()==1); + assert(preds.contains(load1)); + + List<Operator> succs = plan.getSuccessors(split2); + assert(succs.size()==2); + assert(succs.contains(filter1)); + assert(succs.contains(filter2)); + } + + @Test + public void testReplace3() throws FrontendException { + // single input/output + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, filter1); + plan.connect(filter1, filter2); + + Operator filter3 = new SillyOperator("filter3", plan); + plan.replace(filter1, filter3); + + List<Operator> preds = plan.getPredecessors(filter3); + assert(preds.size()==1); + assert(preds.contains(load1)); + + List<Operator> succs = plan.getSuccessors(filter3); + assert(succs.size()==1); + assert(succs.contains(filter2)); + } + + @Test + public void testReplace4() throws FrontendException { + // output is null + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, filter1); + plan.connect(filter1, filter2); + + Operator filter3 = new SillyOperator("filter3", plan); + plan.replace(filter2, filter3); + + List<Operator> preds = plan.getPredecessors(filter3); + assert(preds.size()==1); + assert(preds.contains(filter1)); + + List<Operator> succs = plan.getSuccessors(filter3); + assert(succs==null); + } + + @Test + public void testReplace5() throws FrontendException { + // input is null + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, filter1); + plan.connect(filter1, filter2); + + Operator load2 = new SillyOperator("load2", plan); + plan.replace(load1, load2); + + List<Operator> preds = plan.getPredecessors(load2); + assert(preds==null); + + List<Operator> succs = plan.getSuccessors(load2); + assert(succs.size()==1); + assert(succs.contains(filter1)); + } + + @Test + public void testReplace6() throws FrontendException { + // has multiple inputs/outputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + // fake operator to take multiple inputs/outputs + Operator fake1 = new SillyOperator("fake1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + Operator filter3 = new SillyOperator("filter3", plan); + plan.add(load1); + plan.add(load2); + plan.add(filter1); + plan.add(filter2); + plan.add(filter3); + plan.add(fake1); + plan.connect(load1, fake1); + plan.connect(load2, filter1); + plan.connect(filter1, fake1); + plan.connect(fake1, filter2); + plan.connect(fake1, filter3); + + Operator fake2 = new SillyOperator("fake2", plan); + plan.replace(fake1, fake2); + + List<Operator> preds = plan.getPredecessors(fake2); + assert(preds.size()==2); + assert(preds.contains(load1)); + assert(preds.contains(filter1)); + + List<Operator> succs = plan.getSuccessors(fake2); + assert(succs.size()==2); + assert(succs.contains(filter2)); + assert(succs.contains(filter3)); + } + + @Test + public void testRemove1() throws FrontendException { + // single input/output + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator join1 = new SillyOperator("join1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(load2); + plan.add(filter1); + plan.add(filter2); + plan.add(join1); + plan.connect(load1, join1); + plan.connect(load2, filter1); + plan.connect(filter1, join1); + plan.connect(join1, filter2); + + plan.removeAndReconnect(filter1); + + List<Operator> preds = plan.getPredecessors(join1); + assert(preds.size()==2); + assert(preds.contains(load2)); + } + + @Test + public void testRemove2() throws FrontendException { + // input is null + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator join1 = new SillyOperator("join1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(load2); + plan.add(filter1); + plan.add(filter2); + plan.add(join1); + plan.connect(load1, join1); + plan.connect(load2, filter1); + plan.connect(filter1, join1); + plan.connect(join1, filter2); + + plan.removeAndReconnect(load1); + + List<Operator> preds = plan.getPredecessors(join1); + assert(preds.size()==1); + assert(preds.contains(filter1)); + + plan.removeAndReconnect(filter1); + preds = plan.getPredecessors(join1); + assert(preds.size()==1); + assert(preds.contains(load2)); + } + + @Test + public void testRemove3() throws FrontendException { + // output is null + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, filter1); + plan.connect(filter1, filter2); + + plan.removeAndReconnect(filter2); + + List<Operator> succs = plan.getSuccessors(filter2); + assert(succs==null); + } + + @Test + public void testRemove4() throws FrontendException { + // has multiple inputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator join1 = new SillyOperator("join1", plan); + Operator fake1 = new SillyOperator("fake1", plan); + plan.add(load1); + plan.add(load2); + plan.add(filter1); + plan.add(join1); + plan.connect(load1, join1); + plan.connect(load2, filter1); + plan.connect(filter1, join1); + plan.connect(join1, fake1); + + plan.removeAndReconnect(join1); + + List<Operator> preds = plan.getPredecessors(fake1); + assert(preds.size()==2); + assert(preds.contains(load1)); + assert(preds.contains(filter1)); + } + + @Test + public void testRemove5() throws FrontendException { + // has multiple outputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator split1 = new SillyOperator("split1", plan); + Operator split2 = new SillyOperator("split2", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(split1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, split1); + plan.connect(split1, split2); + plan.connect(split2, filter1); + plan.connect(split2, filter2); + + plan.removeAndReconnect(split2); + + List<Operator> succs = plan.getSuccessors(split1); + assert(succs.size()==2); + assert(succs.contains(filter1)); + assert(succs.contains(filter2)); + } + + @Test + public void testRemove6() throws FrontendException { + // has multiple inputs/outputs + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator load2 = new SillyOperator("load2", plan); + Operator fake1 = new SillyOperator("fake1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + Operator filter2 = new SillyOperator("filter2", plan); + plan.add(load1); + plan.add(load2); + plan.add(fake1); + plan.add(filter1); + plan.add(filter2); + plan.connect(load1, fake1); + plan.connect(load2, fake1); + plan.connect(fake1, filter1); + plan.connect(fake1, filter2); + + try { + plan.removeAndReconnect(fake1); + fail(); + } catch (FrontendException e) { + assertTrue(e.getErrorCode()==2256); + } + } + + @Test + public void testInsertBetween1() throws FrontendException { + // single input + SillyPlan plan = new SillyPlan(); + Operator load1 = new SillyOperator("load1", plan); + Operator filter1 = new SillyOperator("filter1", plan); + plan.add(load1); + plan.add(filter1); + plan.connect(load1, filter1); + + Operator filter2 = new SillyOperator("filter2", plan); + plan.insertBetween(load1, filter2, filter1); + + List<Operator> succs = plan.getSuccessors(filter2); + assert(succs.size()==1); + assert(succs.contains(filter1)); + + List<Operator> preds = plan.getPredecessors(filter2); + assert(preds.size()==1); + assert(preds.contains(load1)); + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1001524&r1=1001523&r2=1001524&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Sun Sep 26 21:23:38 2010 @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; @@ -29,6 +30,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.fs.Path; import org.apache.log4j.FileAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -38,6 +40,7 @@ import org.apache.pig.FilterFunc; import org.apache.pig.PigServer; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor; import org.junit.After; @@ -1872,5 +1875,44 @@ public class TestPruneColumn extends Tes assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"})); } + + // See PIG-1644 + @Test + public void testSplitOutputWithForEach() throws Exception { + Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext()); + Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext()); + pigServer.setBatchOn(); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);"); + pigServer.registerQuery("B = foreach A generate a0, a1, a2;"); + pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("C = order B by a2;"); + pigServer.registerQuery("D = foreach C generate a2;"); + pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';"); + pigServer.executeBatch(); + + BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString()))); + String line = reader1.readLine(); + assertTrue(line.equals("1\t2\t3")); + + line = reader1.readLine(); + assertTrue(line.equals("2\t3\t4")); + + assertTrue(reader1.readLine()==null); + + BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString()))); + line = reader2.readLine(); + assertTrue(line.equals("3")); + + line = reader2.readLine(); + assertTrue(line.equals("4")); + + assertTrue(reader2.readLine()==null); + + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $3"})); + + reader1.close(); + reader2.close(); + } + }