Author: olga Date: Thu Oct 9 14:03:39 2008 New Revision: 703258 URL: http://svn.apache.org/viewvc?rev=703258&view=rev Log: missing files from path PIG-465
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=703258&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Thu Oct 9 14:03:39 2008 @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.Pair; + +/** + * This visitor visits the MRPlan and does the following + * for each MROper + * - visits the POPackage in the reduce plan and finds the corresponding + * POLocalRearrange(s) (either in the map plan of the same oper OR + * reduce plan of predecessor MROper). It then annotates the POPackage + * with information about which columns in the "value" are present in the + * "key" and will need to stitched in to the "value" + */ +public class POPackageAnnotator extends MROpPlanVisitor { + + /** + * @param plan MR plan to visit + */ + public POPackageAnnotator(MROperPlan plan) { + super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); + } + + @Override + public void visitMROp(MapReduceOper mr) throws VisitorException { + + // POPackage could be present in the combine plan + // OR in the reduce plan. POPostCombinerPackage could + // be present only in the reduce plan. Search in these two + // plans accordingly + + if(!mr.combinePlan.isEmpty()) { + PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan); + pkgDiscoverer.visit(); + POPackage pkg = pkgDiscoverer.getPkg(); + if(pkg != null) { + handlePackage(mr, pkg); + } + } + + if(!mr.reducePlan.isEmpty()) { + PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan); + pkgDiscoverer.visit(); + POPackage pkg = pkgDiscoverer.getPkg(); + if(pkg != null) { + // if the POPackage is actually a POPostCombinerPackage, then we should + // just look for the corresponding LocalRearrange(s) in the combine plan + if(pkg instanceof POPostCombinerPackage) { + if(!patchPackage(mr.combinePlan, pkg)) { + throw new VisitorException("Unexpected problem while trying " + + "to optimize (could not find LORearrange in combine plan)"); + } + } else { + handlePackage(mr, pkg); + } + } + } + + } + + private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException { + // the LocalRearrange(s) could either be in the map of this MapReduceOper + // OR in the reduce of predecessor MapReduceOpers + if(!patchPackage(mr.mapPlan, pkg)) { + // we did not find the LocalRearrange(s) in the map plan + // let's look in the predecessors + List<MapReduceOper> preds = this.mPlan.getPredecessors(mr); + for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) { + MapReduceOper mrOper = it.next(); + if(!patchPackage(mrOper.reducePlan, pkg)) { + throw new VisitorException("Unexpected problem while trying " + + "to optimize (could not find LORearrange in predecessor's reduce plan)"); + } + } + } + } + + private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException { + LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg); + lrDiscoverer.visit(); + // let our caller know if we managed to patch + // the package + return lrDiscoverer.isLoRearrangeFound(); + } + + /** + * Simple visitor of the "Reduce" physical plan + * which will get a reference to the POPacakge + * present in the plan + */ + class PackageDiscoverer extends PhyPlanVisitor { + + private POPackage pkg; + + public PackageDiscoverer(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); + } + + /* (non-Javadoc) + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream) + */ + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + this.pkg = pkg; + }; + + /* (non-Javadoc) + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage) + */ + @Override + public void visitCombinerPackage(POPostCombinerPackage pkg) + throws VisitorException { + this.pkg = pkg; + } + + /** + * @return the pkg + */ + public POPackage getPkg() { + return pkg; + } + + } + + /** + * Physical Plan visitor which tries to get the + * LocalRearrange(s) present in the plan (if any) and + * annotate the POPackage given to it with the information + * in the LocalRearrange (regarding columns in the "value" + * present in the "key") + */ + class LoRearrangeDiscoverer extends PhyPlanVisitor { + + private boolean loRearrangeFound = false; + private POPackage pkg; + + public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); + this.pkg = pkg; + } + + /* (non-Javadoc) + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream) + */ + @Override + public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { + loRearrangeFound = true; + Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo; + // annotate the package with information from the LORearrange + // update the keyInfo information if already present in the POPackage + keyInfo = pkg.getKeyInfo(); + if(keyInfo == null) + keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); + + keyInfo.put(new Integer(lrearrange.getIndex()), + new Pair<Boolean, Map<Integer, Integer>>( + lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); + pkg.setKeyInfo(keyInfo); + pkg.setKeyTuple(lrearrange.isKeyTuple()); + } + + /** + * @return the loRearrangeFound + */ + public boolean isLoRearrangeFound() { + return loRearrangeFound; + } + + } +} + Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java?rev=703258&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Thu Oct 9 14:03:39 2008 @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.VisitorException; + +/** + * The local rearrange operator is a part of the co-group + * implementation. It has an embedded physical plan that + * generates tuples of the form (grpKey,(indxed inp Tuple)). + * + */ +public class POLocalRearrangeForIllustrate extends POLocalRearrange { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public POLocalRearrangeForIllustrate(OperatorKey k) { + this(k, -1, null); + } + + public POLocalRearrangeForIllustrate(OperatorKey k, int rp) { + this(k, rp, null); + } + + public POLocalRearrangeForIllustrate(OperatorKey k, List<PhysicalOperator> inp) { + this(k, -1, inp); + } + + public POLocalRearrangeForIllustrate(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + index = -1; + leafOps = new ArrayList<ExpressionOperator>(); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitLocalRearrangeForIllustrate(this); + } + + @Override + public String name() { + return "Local Rearrange For Illustrate" + "[" + DataType.findTypeName(resultType) + + "]" + "{" + DataType.findTypeName(keyType) + "}" + "(" + + mIsDistinct + ") - " + mKey.toString(); + } + + protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{ + //Construct key + Object key; + if(resLst.size()>1){ + Tuple t = mTupleFactory.newTuple(resLst.size()); + int i=-1; + for(Result res : resLst) + t.set(++i, res.result); + key = t; + } + else{ + key = resLst.get(0).result; + } + + Tuple output = mTupleFactory.newTuple(3); + if (mIsDistinct) { + + //Put the key and the indexed tuple + //in a tuple and return + output.set(0, new Byte((byte)0)); + output.set(1, key); + output.set(2, mFakeTuple); + return output; + } else { + if(isCross){ + for(int i=0;i<plans.size();i++) + value.getAll().remove(0); + } + + //Put the index, key, and value + //in a tuple and return + output.set(0, new Byte(index)); + output.set(1, key); + output.set(2, value); + return output; + } + } + + /** + * Make a deep copy of this operator. + * @throws CloneNotSupportedException + */ + @Override + public POLocalRearrangeForIllustrate clone() throws CloneNotSupportedException { + List<PhysicalPlan> clonePlans = new + ArrayList<PhysicalPlan>(plans.size()); + for (PhysicalPlan plan : plans) { + clonePlans.add(plan.clone()); + } + POLocalRearrangeForIllustrate clone = new POLocalRearrangeForIllustrate(new OperatorKey( + mKey.scope, + NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), + requestedParallelism); + clone.setPlans(clonePlans); + clone.keyType = keyType; + clone.index = index; + // Needs to be called as setDistinct so that the fake index tuple gets + // created. + clone.setDistinct(mIsDistinct); + return clone; + } + +}