Author: olga Date: Sat Sep 13 09:52:05 2008 New Revision: 694985 URL: http://svn.apache.org/viewvc?rev=694985&view=rev Log: PIG-429: Self join wth implicit split has the join output in wrong order
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=694985&r1=694984&r2=694985&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Sat Sep 13 09:52:05 2008 @@ -196,3 +196,6 @@ correctly (pradeepk vi olgan) PIG-421: error with complex nested plan (sms via olgan) + + PIG-429: Self join wth implicit split has the join output in wrong order + (pradeepk via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=694985&r1=694984&r2=694985&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Sat Sep 13 09:52:05 2008 @@ -23,6 +23,7 @@ import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.LOConst; +import org.apache.pig.impl.logicalLayer.LOPrinter; import org.apache.pig.impl.logicalLayer.LOSplitOutput; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -61,15 +62,30 @@ try { mPlan.add(splitOp); - - // Find all the successors and disconnect them. Keep our own copy + // Find all the successors and connect appropriately with split + // and splitoutput operators. Keep our own copy // of the list, as we're changing the graph by doing these calls // and that will change the list of predecessors. List<LogicalOperator> succs = new ArrayList<LogicalOperator>(mPlan.getSuccessors(nodes.get(0))); int index = -1; + boolean nodeConnectedToSplit = false; for (LogicalOperator succ : succs) { - mPlan.disconnect(nodes.get(0), succ); + if(!nodeConnectedToSplit) { + mPlan.insertBetween(nodes.get(0), splitOp, succ); + // nodes.get(0) should be connected to Split (only once) and + // split -> splitoutput -> successor - this is for the first successor + // for the next successor we just want to connect in the order + // split -> splitoutput -> successor without involving nodes.get(0) + // in the above call we have connected + // nodes.get(0) to split (we will set the flag + // to true later in this loop iteration). Hence in subsequent + // iterations we will only disconnect nodes.get(0) from its + // successor and connect the split-splitoutput chain + // to the successor + } else { + mPlan.disconnect(nodes.get(0), succ); + } LogicalPlan condPlan = new LogicalPlan(); LOConst cnst = new LOConst(mPlan, new OperatorKey(scope, idGen.getNextNodeId(scope)), new Boolean(true)); @@ -79,12 +95,22 @@ new OperatorKey(scope, idGen.getNextNodeId(scope)), ++index, condPlan); splitOp.addOutput(splitOutput); mPlan.add(splitOutput); - mPlan.connect(splitOp, splitOutput); - mPlan.connect(splitOutput, succ); + + if(!nodeConnectedToSplit) { + // node.get(0) should be connected to Split (only once) and + // split to splitoutput to successor - this is for the first successor + // for the next successor we just want to connect in the order + // split - splitoutput - successor. + // the call below is in the first successor case + mPlan.insertBetween(splitOp, splitOutput, succ); + nodeConnectedToSplit = true; + } else { + mPlan.connect(splitOp, splitOutput); + mPlan.connect(splitOutput, succ); + } // Patch up the contained plans of succ fixUpContainedPlans(nodes.get(0), splitOutput, succ, null); } - mPlan.connect(nodes.get(0), splitOp); } catch (Exception e) { throw new OptimizerException(e); }