[ 
https://issues.apache.org/jira/browse/PIG-697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Santhosh Srinivasan updated PIG-697:
------------------------------------

    Attachment: OptimizerPhase3_parrt1-1.patch

Attaching patch incorporating the review comments.

> Proposed improvements to pig's optimizer
> ----------------------------------------
>
>                 Key: PIG-697
>                 URL: https://issues.apache.org/jira/browse/PIG-697
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>            Reporter: Alan Gates
>            Assignee: Santhosh Srinivasan
>         Attachments: OptimizerPhase1.patch, OptimizerPhase1_part2.patch, 
> OptimizerPhase2.patch, OptimizerPhase3_parrt1-1.patch, 
> OptimizerPhase3_parrt1.patch
>
>
> I propose the following changes to pig optimizer, plan, and operator 
> functionality to support more robust optimization:
> 1) Remove the required array from Rule.  This will change rules so that they 
> only match exact patterns instead of allowing missing elements in the pattern.
> This has the downside that if a given rule applies to two patterns (say 
> Load->Filter->Group, Load->Group) you have to write two rules.  But it has 
> the upside that
> the resulting rules know exactly what they are getting.  The original intent 
> of this was to reduce the number of rules that needed to be written.  But the
> resulting rules have do a lot of work to understand the operators they are 
> working with.  With exact matches only, each rule will know exactly the 
> operators it
> is working on and can apply the logic of shifting the operators around.  All 
> four of the existing rules set all entries of required to true, so removing 
> this
> will have no effect on them.
> 2) Change PlanOptimizer.optimize to iterate over the rules until there are no 
> conversions or a certain number of iterations has been reached.  Currently the
> function is:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         for (Rule rule : mRules) {
>             if (matcher.match(rule)) {
>                 // It matches the pattern.  Now check if the transformer
>                 // approves as well.
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches)
>                 {
>                       if (rule.transformer.check(match)) {
>                           // The transformer approves.
>                           rule.transformer.transform(match);
>                       }
>                 }
>             }
>         }
>     }
> {code}
> It would change to be:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         boolean sawMatch;
>         int iterators = 0;
>         do {
>             sawMatch = false;
>             for (Rule rule : mRules) {
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches) {
>                     // It matches the pattern.  Now check if the transformer
>                     // approves as well.
>                     if (rule.transformer.check(match)) {
>                         // The transformer approves.
>                         sawMatch = true;
>                         rule.transformer.transform(match);
>                     }
>                 }
>             }
>             // Not sure if 1000 is the right number of iterations, maybe it
>             // should be configurable so that large scripts don't stop too 
>             // early.
>         } while (sawMatch && numIterations++ < 1000);
>     }
> {code}
> The reason for limiting the number of iterations is to avoid infinite loops.  
> The reason for iterating over the rules is so that each rule can be applied 
> multiple
> times as necessary.  This allows us to write simple rules, mostly swaps 
> between neighboring operators, without worrying that we get the plan right in 
> one pass.
> For example, we might have a plan that looks like:  
> Load->Join->Filter->Foreach, and we want to optimize it to 
> Load->Foreach->Filter->Join.  With two simple
> rules (swap filter and join and swap foreach and filter), applied 
> iteratively, we can get from the initial to final plan, without needing to 
> understanding the
> big picture of the entire plan.
> 3) Add three calls to OperatorPlan:
> {code}
> /**
>  * Swap two operators in a plan.  Both of the operators must have single
>  * inputs and single outputs.
>  * @param first operator
>  * @param second operator
>  * @throws PlanException if either operator is not single input and output.
>  */
> public void swap(E first, E second) throws PlanException {
>     ...
> }
> /**
>  * Push one operator in front of another.  This function is for use when
>  * the first operator has multiple inputs.  The caller can specify
>  * which input of the first operator the second operator should be pushed to.
>  * @param first operator, assumed to have multiple inputs.
>  * @param second operator, will be pushed in front of first
>  * @param inputNum, indicates which input of the first operator the second
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if inputNum does not exist for first operator
>  */
> public void pushBefore(E first, E second, int inputNum) throws PlanException {
>     ...
> }
> /**
>  * Push one operator after another.  This function is for use when the second
>  * operator has multiple outputs.  The caller can specify which output of the
>  * second operator the first operator should be pushed to.
>  * @param first operator, will be pushed after the second operator
>  * @param second operator, assumed to have multiple outputs
>  * @param outputNum indicates which output of the second operator the first 
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if outputNum does not exist for second operator
>  */
> public void pushAfter(E first, E second, int outputNum) throws PlanException {
>     ...
> }
> {code}
> The rules in the optimizer can use these three functions, along with the 
> existing insertBetween(), replace(), and removeAndReconnect() calls to 
> operate on the
> plan.
> 4) Add a new call to Operator:
> {code}
> /**
>  * Make any necessary changes to a node based on a change of position in the
>  * plan.  This allows operators to rewire their projections, etc. when they
>  * are relocated in a plan.
>  * @param oldPred Operator that was previously the predecessor.
>  * @param newPred Operator thwas will now be the predecessor.
>  * @throws PlanException
>  */
> public abstract void rewire(Operator oldPred, Operator newPred) throws 
> PlanException;
> {code}
> This method will be called by the swap, pushBefore, pushAfter, insertBetween, 
> replace, and removeAndReconnect in OperatorPlan whenever an operator is moved
> around so that the operator has a chance to make any necessary changes.  
> 5) Add new calls to LogicalOperator and PhysicalOperator
> {code}
> /**
>  * A struct detailing how a projection is altered by an operator.
>  */
> public class ProjectionMap {
>     /**
>      * Quick way for an operator to note that its input and output are the
>      * same.
>      */
>     public boolean noChange;
>     /**
>      * Map of field changes, with keys being the output fields of the 
>      * operator and values being the input fields.  Fields are numbered from
>      * 0.  So for a foreach operator derived from
>      * 'B = foreach A generate $0, $2, $3, udf($1)' 
>      * would produce a mapping of 0->0, 1->2, 2->3
>      */
>     public Map<Integer, Integer> mappedFields;
>     /**
>      * List of fields removed from the input.  This includes fields that were
>      * transformed, and thus are no longer the same fields.  Using the
>      * example foreach given under mappedFields, this list would contain '1'.
>      */
>     public List<Integer> removedFields;
>     /**
>      * List of fields in the output of this operator that were created by this
>      * operator.  Using the example foreach given under mappedFields, this 
> list
>      * would contain '3'.
>      */
>     public List<Integer> addedFields;
> }
> /**
>  * Produce a map describing how this operator modifies its projection.
>  * @returns ProjectionMap null indicates it does not know how the projection
>  * changes, for example a join of two inputs where one input does not have
>  * a schema.
>  */
> public abstract ProjectionMap getProjectionMap();
> /**
>  * Get a list of fields that this operator requires.  This is not necessarily
>  * equivalent to the list of fields the operator projects.  For example,
>  * a filter will project anything passed to it, but requires only the fields
>  * explicitly referenced in its filter expression.
>  * @return list of fields, numbered from 0.
>  */
> public abstract List<Integer> getRequiredFields();
> {code}
> These calls will be called by optimizer rules to determine whether or not a 
> swap can be done (for example, you can't swap two operators if the second one 
> uses a
> field added by the first), and once the swap is done they will be used by 
> rewire to understand how to map projections in the operators.
> 6)  It's not clear that the RuleMatcher, in its current form, will work with 
> rules that are not linear.  That is, it matches rules that look like:
> Operators {Foreach, Filter}
> Edges {0->1}
> But I don't know if it will match rules that look like:
> Operators {Scan, Scan, Join}
> Edges {0->2, 1->2}
> For the optimizer to be able to determine join types and operations with 
> splits, it will have to be able to do that.
> Examples of types of rules that is optimizer could support:
> 1) Pushing filters in front of joins.
> 2) Pushing foreachs with flattens (which thus greathly expand the data) down 
> the tree past filters, joins, etc.
> 3) Pushing type casting used for schemas in loads down to the point where the 
> field is actually used.
> 4) Deciding when to do fragment/replicate join or sort/merge join instead of 
> the standard hash join.
> 5) The current optimizations:  pushing limit up the tree, making implicit 
> splits explicit, merge load and stream where possible, using the combiner.
> 6) Merge filters or foreachs where possible
> In particular the combiner optimizer hopefully can be completely rewritten to 
> use the optimizer framework to make decisions about how to rework physical 
> plans
> to push work into the combiner.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to