[ https://issues.apache.org/jira/browse/PIG-697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12721975#action_12721975 ]
Santhosh Srinivasan commented on PIG-697: ----------------------------------------- 1. Some operators do not have any internal state that requires rewiring. Examples of such operators include LOStream, LOCross, etc. 2. I think that the additional walking should be removed. I added a TODO as I was not sure why it was added in the first place. 3. Yes, it will be added as part of the next patch. > Proposed improvements to pig's optimizer > ---------------------------------------- > > Key: PIG-697 > URL: https://issues.apache.org/jira/browse/PIG-697 > Project: Pig > Issue Type: Bug > Components: impl > Reporter: Alan Gates > Assignee: Santhosh Srinivasan > Attachments: OptimizerPhase1.patch, OptimizerPhase1_part2.patch, > OptimizerPhase2.patch, OptimizerPhase3_parrt1-1.patch, > OptimizerPhase3_parrt1.patch, OptimizerPhase3_part2_3.patch > > > I propose the following changes to pig optimizer, plan, and operator > functionality to support more robust optimization: > 1) Remove the required array from Rule. This will change rules so that they > only match exact patterns instead of allowing missing elements in the pattern. > This has the downside that if a given rule applies to two patterns (say > Load->Filter->Group, Load->Group) you have to write two rules. But it has > the upside that > the resulting rules know exactly what they are getting. The original intent > of this was to reduce the number of rules that needed to be written. But the > resulting rules have do a lot of work to understand the operators they are > working with. With exact matches only, each rule will know exactly the > operators it > is working on and can apply the logic of shifting the operators around. All > four of the existing rules set all entries of required to true, so removing > this > will have no effect on them. > 2) Change PlanOptimizer.optimize to iterate over the rules until there are no > conversions or a certain number of iterations has been reached. Currently the > function is: > {code} > public final void optimize() throws OptimizerException { > RuleMatcher matcher = new RuleMatcher(); > for (Rule rule : mRules) { > if (matcher.match(rule)) { > // It matches the pattern. Now check if the transformer > // approves as well. > List<List<O>> matches = matcher.getAllMatches(); > for (List<O> match:matches) > { > if (rule.transformer.check(match)) { > // The transformer approves. > rule.transformer.transform(match); > } > } > } > } > } > {code} > It would change to be: > {code} > public final void optimize() throws OptimizerException { > RuleMatcher matcher = new RuleMatcher(); > boolean sawMatch; > int iterators = 0; > do { > sawMatch = false; > for (Rule rule : mRules) { > List<List<O>> matches = matcher.getAllMatches(); > for (List<O> match:matches) { > // It matches the pattern. Now check if the transformer > // approves as well. > if (rule.transformer.check(match)) { > // The transformer approves. > sawMatch = true; > rule.transformer.transform(match); > } > } > } > // Not sure if 1000 is the right number of iterations, maybe it > // should be configurable so that large scripts don't stop too > // early. > } while (sawMatch && numIterations++ < 1000); > } > {code} > The reason for limiting the number of iterations is to avoid infinite loops. > The reason for iterating over the rules is so that each rule can be applied > multiple > times as necessary. This allows us to write simple rules, mostly swaps > between neighboring operators, without worrying that we get the plan right in > one pass. > For example, we might have a plan that looks like: > Load->Join->Filter->Foreach, and we want to optimize it to > Load->Foreach->Filter->Join. With two simple > rules (swap filter and join and swap foreach and filter), applied > iteratively, we can get from the initial to final plan, without needing to > understanding the > big picture of the entire plan. > 3) Add three calls to OperatorPlan: > {code} > /** > * Swap two operators in a plan. Both of the operators must have single > * inputs and single outputs. > * @param first operator > * @param second operator > * @throws PlanException if either operator is not single input and output. > */ > public void swap(E first, E second) throws PlanException { > ... > } > /** > * Push one operator in front of another. This function is for use when > * the first operator has multiple inputs. The caller can specify > * which input of the first operator the second operator should be pushed to. > * @param first operator, assumed to have multiple inputs. > * @param second operator, will be pushed in front of first > * @param inputNum, indicates which input of the first operator the second > * operator will be pushed onto. Numbered from 0. > * @throws PlanException if inputNum does not exist for first operator > */ > public void pushBefore(E first, E second, int inputNum) throws PlanException { > ... > } > /** > * Push one operator after another. This function is for use when the second > * operator has multiple outputs. The caller can specify which output of the > * second operator the first operator should be pushed to. > * @param first operator, will be pushed after the second operator > * @param second operator, assumed to have multiple outputs > * @param outputNum indicates which output of the second operator the first > * operator will be pushed onto. Numbered from 0. > * @throws PlanException if outputNum does not exist for second operator > */ > public void pushAfter(E first, E second, int outputNum) throws PlanException { > ... > } > {code} > The rules in the optimizer can use these three functions, along with the > existing insertBetween(), replace(), and removeAndReconnect() calls to > operate on the > plan. > 4) Add a new call to Operator: > {code} > /** > * Make any necessary changes to a node based on a change of position in the > * plan. This allows operators to rewire their projections, etc. when they > * are relocated in a plan. > * @param oldPred Operator that was previously the predecessor. > * @param newPred Operator thwas will now be the predecessor. > * @throws PlanException > */ > public abstract void rewire(Operator oldPred, Operator newPred) throws > PlanException; > {code} > This method will be called by the swap, pushBefore, pushAfter, insertBetween, > replace, and removeAndReconnect in OperatorPlan whenever an operator is moved > around so that the operator has a chance to make any necessary changes. > 5) Add new calls to LogicalOperator and PhysicalOperator > {code} > /** > * A struct detailing how a projection is altered by an operator. > */ > public class ProjectionMap { > /** > * Quick way for an operator to note that its input and output are the > * same. > */ > public boolean noChange; > /** > * Map of field changes, with keys being the output fields of the > * operator and values being the input fields. Fields are numbered from > * 0. So for a foreach operator derived from > * 'B = foreach A generate $0, $2, $3, udf($1)' > * would produce a mapping of 0->0, 1->2, 2->3 > */ > public Map<Integer, Integer> mappedFields; > /** > * List of fields removed from the input. This includes fields that were > * transformed, and thus are no longer the same fields. Using the > * example foreach given under mappedFields, this list would contain '1'. > */ > public List<Integer> removedFields; > /** > * List of fields in the output of this operator that were created by this > * operator. Using the example foreach given under mappedFields, this > list > * would contain '3'. > */ > public List<Integer> addedFields; > } > /** > * Produce a map describing how this operator modifies its projection. > * @returns ProjectionMap null indicates it does not know how the projection > * changes, for example a join of two inputs where one input does not have > * a schema. > */ > public abstract ProjectionMap getProjectionMap(); > /** > * Get a list of fields that this operator requires. This is not necessarily > * equivalent to the list of fields the operator projects. For example, > * a filter will project anything passed to it, but requires only the fields > * explicitly referenced in its filter expression. > * @return list of fields, numbered from 0. > */ > public abstract List<Integer> getRequiredFields(); > {code} > These calls will be called by optimizer rules to determine whether or not a > swap can be done (for example, you can't swap two operators if the second one > uses a > field added by the first), and once the swap is done they will be used by > rewire to understand how to map projections in the operators. > 6) It's not clear that the RuleMatcher, in its current form, will work with > rules that are not linear. That is, it matches rules that look like: > Operators {Foreach, Filter} > Edges {0->1} > But I don't know if it will match rules that look like: > Operators {Scan, Scan, Join} > Edges {0->2, 1->2} > For the optimizer to be able to determine join types and operations with > splits, it will have to be able to do that. > Examples of types of rules that is optimizer could support: > 1) Pushing filters in front of joins. > 2) Pushing foreachs with flattens (which thus greathly expand the data) down > the tree past filters, joins, etc. > 3) Pushing type casting used for schemas in loads down to the point where the > field is actually used. > 4) Deciding when to do fragment/replicate join or sort/merge join instead of > the standard hash join. > 5) The current optimizations: pushing limit up the tree, making implicit > splits explicit, merge load and stream where possible, using the combiner. > 6) Merge filters or foreachs where possible > In particular the combiner optimizer hopefully can be completely rewritten to > use the optimizer framework to make decisions about how to rework physical > plans > to push work into the combiner. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.