Author: thejas Date: Fri Sep 3 22:30:12 2010 New Revision: 992495 URL: http://svn.apache.org/viewvc?rev=992495&view=rev Log: PIG-1595: casting relation to scalar- problem with handling of data from non PigStorage loaders (adding files missing from previous checkin)
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalkerLPScalar.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ScalarFinder.java Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalkerLPScalar.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalkerLPScalar.java?rev=992495&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalkerLPScalar.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalkerLPScalar.java Fri Sep 3 22:30:12 2010 @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; +import org.apache.pig.newplan.logical.optimizer.ScalarFinder; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; + +/** + * Dependency walker that walks logical plan, it picks the leafs based + * on scalar-alias dependency order + */ +public class DependencyOrderWalkerLPScalar extends DependencyOrderWalker { + + public DependencyOrderWalkerLPScalar(OperatorPlan plan) { + super(plan); + } + + + @Override + protected List<Operator> getSinks() throws FrontendException{ + //find list of scalars and the LogicalRelationalOperator that has them + ScalarFinder sFinder = new ScalarFinder(plan); + sFinder.visit(); + HashMap<UserFuncExpression, LogicalRelationalOperator> scalars = + sFinder.getScalarLOMap(); + + //create a plan with only the sinks from original plan + // that maps dependency between sink nodes based on + // scalar dependency + LogicalPlan sinksPlan = new LogicalPlan(); + for(Map.Entry<UserFuncExpression, LogicalRelationalOperator> e : scalars.entrySet()){ + List<LogicalRelationalOperator> scalarSinks = + new ArrayList<LogicalRelationalOperator>(); + getSinks(e.getValue(), scalarSinks); + + List<LogicalRelationalOperator> sourceSinks = + new ArrayList<LogicalRelationalOperator>(); + getSinks(e.getKey().getImplicitReferencedOperator(), sourceSinks); + + for(LogicalRelationalOperator scalarSink : scalarSinks){ + for(LogicalRelationalOperator sourceSink : sourceSinks ){ + //if the link already exists, don't add again + if(sinksPlan.getSuccessors(sourceSink) != null + && sinksPlan.getSuccessors(sourceSink).contains(scalarSink) + ){ + continue; + } + // add the relationship - scalarSink depends on sourceSink + sinksPlan.add(sourceSink); + sinksPlan.add(scalarSink); + sinksPlan.connect(sourceSink, scalarSink); + } + } + + } + + //list of sink nodes ordered by the scalar dependency order + ArrayList<Operator> orderedSinkNodes = + new ArrayList<Operator>(); + + //keep track of sink nodes that have been added so far + Set<Operator> sinkNodesAdded = new HashSet<Operator>(); + + + //use the plan to get sink operators out first + List<Operator> sources; + do{ + sources = new ArrayList<Operator>(sinksPlan.getSources()); + for(Operator source : sources){ + // add the current sources to list + // and remove them from plan to get new sources + orderedSinkNodes.add(source); + sinkNodesAdded.add(source); + if(sinksPlan.getSuccessors(source) != null){ + //disconnect before removing + List<Operator>succs = new ArrayList<Operator>(sinksPlan.getSuccessors(source)); + for(Operator succ : succs){ + sinksPlan.disconnect(source, succ); + } + } + sinksPlan.remove(source); + } + } + while(sources.size() > 0); + + //add remaining sink nodes from original plan + List<Operator> allSinks = plan.getSinks(); + for(Operator sink : allSinks){ + if(!sinkNodesAdded.contains(sink)){ + orderedSinkNodes.add(sink); + } + } + + return orderedSinkNodes; + } + + + /** + * get all sinks that are successor of LogicalRelationalOperator lop + * and add them to scalarSinks + * @param lop LogicalRelationalOperator + * @param scalarSinks + */ + private void getSinks(LogicalRelationalOperator lop, + List<LogicalRelationalOperator> scalarSinks) { + + List<Operator> succs = lop.getPlan().getSuccessors(lop); + if(succs == null){ + // no successors, this is a sink + scalarSinks.add(lop); + return; + } + for(Operator op : succs){ + getSinks((LogicalRelationalOperator)op, scalarSinks); + } + + } + + +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ScalarFinder.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ScalarFinder.java?rev=992495&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ScalarFinder.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ScalarFinder.java Fri Sep 3 22:30:12 2010 @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.optimizer; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.MultiMap; +import org.apache.pig.newplan.DepthFirstWalker; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanWalker; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; +import org.apache.pig.newplan.logical.relational.LOCogroup; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LOJoin; +import org.apache.pig.newplan.logical.relational.LOSort; +import org.apache.pig.newplan.logical.relational.LOSplitOutput; +import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; + +public class ScalarFinder extends LogicalRelationalNodesVisitor { + + private HashMap<UserFuncExpression, LogicalRelationalOperator> scalars = + new HashMap<UserFuncExpression, LogicalRelationalOperator>(); + + public ScalarFinder(OperatorPlan plan) + throws FrontendException { + super(plan, new DepthFirstWalker(plan)); + } + + public HashMap<UserFuncExpression, LogicalRelationalOperator> getScalarLOMap(){ + return scalars; + } + + @Override + public void visit(LOFilter filter) throws FrontendException { + ScalarFinderInExpPlan sFinder = new ScalarFinderInExpPlan(filter.getFilterPlan(), filter); + sFinder.visit(); + } + + @Override + public void visit(LOJoin join) throws FrontendException { + Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlanValues(); + for (LogicalExpressionPlan joinPlan : joinPlans) { + ScalarFinderInExpPlan sFinder = new ScalarFinderInExpPlan(joinPlan, join); + sFinder.visit(); + } + } + + @Override + public void visit(LOForEach foreach) throws FrontendException { + OperatorPlan innerPlan = foreach.getInnerPlan(); + PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan); + pushWalker(newWalker); + currentWalker.walk(this); + popWalker(); + } + + @Override + public void visit(LOGenerate gen) throws FrontendException { + List<LogicalExpressionPlan> genPlans = gen.getOutputPlans(); + for (LogicalExpressionPlan genPlan : genPlans) { + ScalarFinderInExpPlan sFinder = new ScalarFinderInExpPlan(genPlan, gen); + sFinder.visit(); + } + } + + @Override + public void visit(LOCogroup loCogroup) throws FrontendException { + MultiMap<Integer, LogicalExpressionPlan> expPlans = loCogroup.getExpressionPlans(); + for (LogicalExpressionPlan expPlan : expPlans.values()) { + ScalarFinderInExpPlan sFinder = new ScalarFinderInExpPlan(expPlan, loCogroup); + sFinder.visit(); + } + } + + + @Override + public void visit(LOSplitOutput loSplitOutput) throws FrontendException { + ScalarFinderInExpPlan sFinder = + new ScalarFinderInExpPlan(loSplitOutput.getFilterPlan(), loSplitOutput); + sFinder.visit(); + } + + + @Override + public void visit(LOSort loSort) throws FrontendException { + List<LogicalExpressionPlan> sortPlans = loSort.getSortColPlans(); + for (LogicalExpressionPlan sortPlan : sortPlans) { + ScalarFinderInExpPlan sFinder = new ScalarFinderInExpPlan(sortPlan, loSort); + sFinder.visit(); + } + } + + + class ScalarFinderInExpPlan extends LogicalExpressionVisitor{ + + + private LogicalRelationalOperator logicalOp; + + protected ScalarFinderInExpPlan(LogicalExpressionPlan lep, LogicalRelationalOperator lo) + throws FrontendException { + super(lep, new DepthFirstWalker(lep)); + this.logicalOp = lo; + } + + public void visit(UserFuncExpression op) throws FrontendException { + if(op.getImplicitReferencedOperator() != null){ + scalars.put(op, logicalOp); + } + } + } + +}