[ 
https://issues.apache.org/jira/browse/PIG-161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12605422#action_12605422
 ] 

Alan Gates commented on PIG-161:
--------------------------------

As part of the pipeline rework, we have discovered some issues in the original 
design regarding how we handle foreach/generate.  In particular this has
been an issue for how nested plans will work.  The original design looked like 
this:

{noformat}
Foreach {
    Plan nestedPlan; // one plan for the whole nested logic.  All the 
operations in this plan would be relational operators.
}

Generate {
    Plan[] nestedPlans; // one plan for each element that would be generated.  
All operators in this plan would be expression operators.
}
{noformat}

The runtime logic of foreach.getNext() was:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
rc = nestedPlan.getNext();
return rc;
{noformat}

and the runtime logic of generate.getNext() was:
{noformat}
rc = new tuple
for (np : nestedPlans) {
    tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}

This led to a couple of issues.
# Nested plans which are DAGs and not trees (which are quite common) are hard 
to handle.
# Generate was not properly running all tuples through the nested plan before 
passing on the input to its attached plans.  This led to aggregate functions 
getting only the first tuple in a bag instead of all the tuples.  Generate 
could not really be changed to address this as it is not clear to it when it 
does and doesn't need to keep pulling for tuples.

To address this issue we propose the following design.

{noformat}
Foreach {
    Plan[] nestedPlans; // one plan for each element in the projection.  May be 
relational or expression operators.
}

Accumulate {
    Plan nestedPlan; // Consists entirely of expression operators.
}
{noformat}

In the case where the nested plan contains a realtional operator (which means 
there was an actual nested section in the script) then a new relational
operator, accumulate, will be added to the end of the plan.  It's task will be 
to accumulate tuples from the nested pipeline, construct a bag, and
attach that bag as the input to any expression operators given as part of the 
generate projection.

So the runtime logic of foreach.getNext() will now be:
{noformat}
input = predecessor.getNext();
rc = new tuple;
for (np : nestPlans) {
    np.attach(input);
    tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}

and the rutime logic of accumulate.getNext() will be:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
return nestedPlan.getNext();
{noformat}

For clarity, let us consider a couple of use cases.

Case 1:  no nested section in the script.
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B generate group, COUNT($1), SUM($1.$1);
{noformat}

The plans for this will be:

Top level plan:

load -> group -> foreach

The foreach will have three nested plans:

plan 1: project(0)

plan 2: project(1) -> COUNT()

plan 3: project(1) -> project(1) -> SUM()

So for each tuple it gets, foreach will attach it to each of the above three 
plans, and then call getNext() on the leaves of those plans.  It will take
the resulting three values and construct a tuple and return that as the result 
of its getNext().

One thing to note here is in plan 3, project is being used in different ways.  
The first project is being handed a tuple, and asked to output
a single field (which happens to be a bag).  The second project is being handed 
a bag and asked to output a bag.  The schema of its input bag is
{(int, int)} and the schema of its output bag is {(int)}.  So project needs to 
know when it is being used to output a bag vs when it is being asked to
output a field from a tuple, which may be of any type.

Case 2:  nested section in the script
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B {
    C1 = distinct $1;
    C2 = filter $1 by $1 > 0;
    generate group, COUNT(C1), SUM(C2.$1);
};
{noformat}

The plans for this will be:

Top level plan:

load -> group -> foreach

The foreach will have three nested plans:

plan 1:  project(0)

plan 2: project(1) -> distinct -> accumulate

The accumulate will have a nested plan of: project( * ) -> COUNT()

plan 3: project(1) -> filter -> accumulate

The accumulate will have a nested plan of : project(1) -> SUM()

So effectively, we're proposing several changes:

# Removal of generate as a relational operator.  Its functionality will be 
absorbed into foreach.
# Splitting of nested plans, one for each generate.  This sacrifices some 
optimizations.  Consider for example if in the script above it had been C2 = 
filter C1 by $1 > 0.  Ideally we would only evaluate distinct once.  Under this 
proposal we would evaluate it twice.  For now that is ok because evaluating it 
once and then splitting output is much more difficult.
# Creation of "bookend" operators.  The project will facilitate transition from 
expression operators to relational operators at the top of these nested plans 
by taking a bag attached as input and streaming out the tuples one at a time 
(this functionality is already in place, built for the original generate 
implementation).  The accumulate will facilitate transitions from realtions to 
an expression operator.  One oddity here will be that it will be the only 
relational operator that can return a type other than bag.  It may return any 
type.

This proposal will entail the following changes:

On the physical side:

# Change project.getNext(Bag) to handle the case where it's given a bag.  In 
this case it should return a bag, stripping the bag to contain only the 
field(s) being projected.
# Change foreach to handle functionality previously in generate, including the 
flattening logic.
# Create an accumulate operator.

On the logical side:
# Changes to project.getSchema() to figure out when project needs to return a 
bag vs when it needs to return any type
# Changes to parsing of foreach to decide when an accumulate is necessary in 
the plan, and when it isn't.
# Changes to foreach.getSchema() to take on much of the previous functionality 
of generate.getSchema().  
# These changes will most likely force changes in the type checker as well.



> Rework physical plan
> --------------------
>
>                 Key: PIG-161
>                 URL: https://issues.apache.org/jira/browse/PIG-161
>             Project: Pig
>          Issue Type: Sub-task
>            Reporter: Alan Gates
>            Assignee: Alan Gates
>         Attachments: arithmeticOperators.patch, BinCondAndNegative.patch, 
> CastAndMapLookUp.patch, incr2.patch, incr3.patch, incr4.patch, incr5.patch, 
> logToPhyTranslator.patch, missingOps.patch, 
> MRCompilerTests_PlansAndOutputs.txt, Phy_AbsClass.patch, physicalOps.patch, 
> physicalOps.patch, physicalOps.patch, physicalOps.patch, 
> physicalOps_latest.patch, POCast.patch, POCast.patch, podistinct.patch, 
> pogenerate.patch, pogenerate.patch, pogenerate.patch, posort.patch, 
> POUserFuncCorrection.patch, 
> TEST-org.apache.pig.test.TestLocalJobSubmission.txt, 
> TEST-org.apache.pig.test.TestLogToPhyCompiler.txt, 
> TEST-org.apache.pig.test.TestLogToPhyCompiler.txt, 
> TEST-org.apache.pig.test.TestMapReduce.txt, 
> TEST-org.apache.pig.test.TestTypeCheckingValidator.txt, 
> TEST-org.apache.pig.test.TestUnion.txt, translator.patch, translator.patch, 
> translator.patch, translator.patch
>
>
> This bug tracks work to rework all of the physical operators as described in 
> http://wiki.apache.org/pig/PigTypesFunctionalSpec

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to