Author: thejas Date: Fri Aug 13 22:45:46 2010 New Revision: 985392 URL: http://svn.apache.org/viewvc?rev=985392&view=rev Log: PIG-1448: Detach tuple from inner plans of physical operator (thejas)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Aug 13 22:45:46 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1448: Detach tuple from inner plans of physical operator (thejas) + PIG-965: PERFORMANCE: optimize common case in matches (PORegex) (ankit.modi via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Fri Aug 13 22:45:46 2010 @@ -84,6 +84,12 @@ public class PODemux extends PhysicalOpe * The leaf of the current pipeline */ private PhysicalOperator curLeaf = null; + + + /** + * The current pipeline plan + */ + private PhysicalPlan curPlan = null; /* * Indicating if this operator is in a combiner. @@ -192,7 +198,8 @@ public class PODemux extends PhysicalOpe } else { if (getNext) { - + if(curPlan != null) + curPlan.detachInput(); Result inp = processInput(); if (inp.returnStatus == POStatus.STATUS_EOP) { @@ -238,6 +245,8 @@ public class PODemux extends PhysicalOpe if (processedSet.cardinality() == myPlans.size()) { curLeaf = null; + if(curPlan != null) + curPlan.detachInput(); Result inp = processInput(); if (inp.returnStatus == POStatus.STATUS_OK) { attachInputWithIndex((Tuple)inp.result); @@ -296,22 +305,21 @@ public class PODemux extends PhysicalOpe // is expected by the inner plans, as well as the index of the associated // inner plan. PigNullableWritable fld = (PigNullableWritable)res.get(0); - // choose an inner plan to run based on the index set by // the POLocalRearrange operator and passed to this operator // by POMultiQueryPackage int index = fld.getIndex(); index &= idxPart; - - PhysicalPlan pl = myPlans.get(index); - if (!(pl.getRoots().get(0) instanceof PODemux)) { + + curPlan = myPlans.get(index); + if (!(curPlan.getRoots().get(0) instanceof PODemux)) { res.set(0, fld.getValueAsPigType()); } - - myPlans.get(index).attachInput(res); - return myPlans.get(index).getLeaves().get(0); + + curPlan.attachInput(res); + return curPlan.getLeaves().get(0); } - + /** * Sets a flag indicating if this operator is * in a combiner. Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Aug 13 22:45:46 2010 @@ -259,7 +259,7 @@ public class POFRJoin extends PhysicalOp Tuple key = TupleFactory.getInstance().newTuple(1); key.set(0, lrOutTuple.get(1)); Tuple value = getValueTuple(lr, lrOutTuple); - + lr.detachInput(); // Configure the for each operator with the relevant bags int i = -1; boolean noMatch = false; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Aug 13 22:45:46 2010 @@ -146,6 +146,7 @@ public class POFilter extends PhysicalOp } */ res = comOp.getNext(dummyBool); + plan.detachInput(); if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) return res; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Aug 13 22:45:46 2010 @@ -118,8 +118,7 @@ public class POLocalRearrange extends Ph private int mProjectedColsMapSize = 0; private int mSecondaryProjectedColsMapSize = 0; - private Tuple lrOutput; - + private boolean useSecondaryKey = false; // By default, we strip keys from the value. @@ -144,7 +143,6 @@ public class POLocalRearrange extends Ph secondaryLeafOps = new ArrayList<ExpressionOperator>(); mProjectedColsMap = new HashMap<Integer, Integer>(); mSecondaryProjectedColsMap = new HashMap<Integer, Integer>(); - lrOutput = mTupleFactory.newTuple(3); } @Override @@ -222,7 +220,6 @@ public class POLocalRearrange extends Ph // indices and hence would go to different invocation of reduce() this.index = multiQuery ? (byte)(index | PigNullableWritable.mqFlag) : (byte)index; } - lrOutput.set(0, Byte.valueOf(this.index)); } public boolean isDistinct() { @@ -378,11 +375,22 @@ public class POLocalRearrange extends Ph res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result); res.returnStatus = POStatus.STATUS_OK; + detachPlans(plans); + + if(secondaryPlans != null) + detachPlans(secondaryPlans); + return res; } return inp; } + private void detachPlans(List<PhysicalPlan> plans) { + for (PhysicalPlan ep : plans) { + ep.detachInput(); + } + } + protected Object getKeyFromResult(List<Result> resLst, byte type) throws ExecException { Object key; if(resLst.size()>1){ @@ -412,6 +420,8 @@ public class POLocalRearrange extends Ph } protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{ + Tuple lrOutput = mTupleFactory.newTuple(3); + lrOutput.set(0, Byte.valueOf(this.index)); //Construct key Object key; Object secondaryKey=null; @@ -673,13 +683,6 @@ public class POLocalRearrange extends Ph clone.secondaryKeyType = secondaryKeyType; clone.useSecondaryKey = useSecondaryKey; clone.index = index; - try { - clone.lrOutput.set(0, index); - } catch (ExecException e) { - CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting index of output."); - cnse.initCause(e); - throw cnse; - } // Needs to be called as setDistinct so that the fake index tuple gets // created. clone.setDistinct(mIsDistinct); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Aug 13 22:45:46 2010 @@ -461,7 +461,7 @@ public class POMergeCogroup extends Phys String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; throw new ExecException(errMsg,errCode,PigException.BUG); } - + lr.detachInput(); return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll()); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Aug 13 22:45:46 2010 @@ -419,6 +419,7 @@ public class POMergeJoin extends Physica } } else { Result res = rightPipelineLeaf.getNext(dummyTuple); + rightPipelineLeaf.detachInput(); switch(res.returnStatus){ case POStatus.STATUS_OK: return res; @@ -461,6 +462,7 @@ public class POMergeJoin extends Physica POLocalRearrange lr = LRs[lrIdx]; lr.attachInput((Tuple)inp.result); Result lrOut = lr.getNext(dummyTuple); + lr.detachInput(); if(lrOut.returnStatus!=POStatus.STATUS_OK){ int errCode = 2167; String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Fri Aug 13 22:45:46 2010 @@ -240,6 +240,7 @@ public class POMultiQueryPackage extends pack.attachInput(curKey, tupIter); Result res = pack.getNext(t); + pack.detachInput(); Tuple tuple = (Tuple)res.result; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=985392&r1=985391&r2=985392&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Fri Aug 13 22:45:46 2010 @@ -108,6 +108,8 @@ public class POOptimizedForEach extends } if(res.returnStatus==POStatus.STATUS_EOP) { processingPlan = false; + for(PhysicalPlan plan : inputPlans) + plan.detachInput(); return res; } if(res.returnStatus==POStatus.STATUS_ERR) {