Author: olga Date: Thu Aug 14 15:20:11 2008 New Revision: 686064 URL: http://svn.apache.org/viewvc?rev=686064&view=rev Log: PIG-375: addition of implicit split
Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=686064&r1=686063&r2=686064&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Aug 14 15:20:11 2008 @@ -47,6 +47,7 @@ import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.ExpressionOperator; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LOVisitor; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; @@ -59,7 +60,9 @@ import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.CompilationMessageCollector; +import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.SplitIntroducer; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.impl.util.PropertiesUtil; @@ -535,7 +538,9 @@ // Set the logical plan values correctly in all the operators PlanSetter ps = new PlanSetter(lp); ps.visit(); - + + (new SplitIntroducer(lp)).introduceImplSplits(); + // run through validator CompilationMessageCollector collector = new CompilationMessageCollector() ; FrontendException caught = null; Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java?rev=686064&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java Thu Aug 14 15:20:11 2008 @@ -0,0 +1,107 @@ +package org.apache.pig.impl.plan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LOConst; +import org.apache.pig.impl.logicalLayer.LOSplit; +import org.apache.pig.impl.logicalLayer.LOSplitOutput; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LogicalPlan; + +public class SplitIntroducer extends PlanWalker<LogicalOperator, LogicalPlan> { + private NodeIdGenerator nodeIdGen; + + public SplitIntroducer(LogicalPlan plan) { + super(plan); + nodeIdGen = NodeIdGenerator.getGenerator(); + } + + private long getNextId(String scope) { + return nodeIdGen.getNextNodeId(scope); + } + + @Override + public PlanWalker<LogicalOperator, LogicalPlan> spawnChildWalker(LogicalPlan plan) { + return new SplitIntroducer(plan); + } + + public void introduceImplSplits() throws VisitorException { + List<LogicalOperator> roots = copySucs(mPlan.getRoots()); + if(roots == null) return; + for (LogicalOperator root : roots) { + processNode(root); + } + } + + @Override + /** + * This method is to conform to the interface. + */ + public void walk(PlanVisitor visitor) throws VisitorException { + throw new VisitorException( + "This method is not to be used. This Walker does not call any visit() methods. It only alters the plan by introducing implicit splits if necessary."); + } + + private void processNode(LogicalOperator root) throws VisitorException { + if(root instanceof LOSplit || root instanceof LOSplitOutput) return; + List<LogicalOperator> sucs = mPlan.getSuccessors(root); + if(sucs==null) return; + int size = sucs.size(); + if(size==0 || size==1) return; + sucs = copySucs(mPlan.getSuccessors(root)); + disconnect(root,sucs); + String scope = root.getOperatorKey().scope; + LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, getNextId(scope)), new ArrayList<LogicalOperator>()); + mPlan.add(splitOp); + try { + mPlan.connect(root, splitOp); + int index = -1; + for (LogicalOperator operator : sucs) { + LogicalPlan condPlan = new LogicalPlan(); + LOConst cnst = new LOConst(mPlan,new OperatorKey(scope, getNextId(scope)), true); + cnst.setType(DataType.BOOLEAN); + condPlan.add(cnst); + LOSplitOutput splitOutput = new LOSplitOutput(mPlan, new OperatorKey(scope, getNextId(scope)), ++index, condPlan); + splitOp.addOutput(splitOutput); + mPlan.add(splitOutput); + mPlan.connect(splitOp, splitOutput); + mPlan.connect(splitOutput, operator); + } + } catch (PlanException e) { + throw new VisitorException(e); + } + } + private List<LogicalOperator> copySucs(List<LogicalOperator> successors){ + ArrayList<LogicalOperator> ret = new ArrayList<LogicalOperator>(); + for (LogicalOperator operator : successors) { + ret.add(operator); + } + return ret; + } + + private void disconnect(LogicalOperator from, List<LogicalOperator> successors) { + for (LogicalOperator operator : successors) { + mPlan.disconnect(from, operator); + } + } + + /* public static void main(String[] args) throws ExecException, IOException, FrontendException { + PigServer ser = new PigServer(ExecType.LOCAL); + ser.registerQuery("A = load 'file:/etc/passwd' using PigStorage(':');"); + ser.registerQuery("B = foreach A generate $0;"); + ser.registerQuery("C = foreach A generate $1;"); + ser.registerQuery("D = group B by $0, C by $0;"); + LogicalPlan lp = ser.getPlanFromAlias("D", "Testing"); + lp.explain(System.out, System.err); + LogicalPlan fixedLp = ser.compileLp(lp, "Testing"); + System.out.println(); + fixedLp.explain(System.out, System.err); + }*/ +} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java?rev=686064&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java Thu Aug 14 15:20:11 2008 @@ -0,0 +1,52 @@ +package org.apache.pig.test; + + +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestImplicitSplit extends TestCase{ + private PigServer pigServer; + + @Before + public void setUp() throws Exception { + pigServer = new PigServer(ExecType.LOCAL); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testImplicitSplit() throws Exception{ + int LOOP_SIZE = 20; + File tmpFile = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + for(int i = 1; i <= LOOP_SIZE; i++) { + ps.println(i); + } + ps.close(); + pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';"); + pigServer.registerQuery("B = filter A by $0<=10;"); + pigServer.registerQuery("C = filter A by $0>10;"); + pigServer.registerQuery("D = union B,C;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + if(!iter.hasNext()) fail("No Output received"); + int cnt = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + ++cnt; + } + assertEquals(20, cnt); + } +}