[ 
https://issues.apache.org/jira/browse/PIG-1916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13050201#comment-13050201
 ] 

Zhijie Shen commented on PIG-1916:
----------------------------------

Make a report of my progress and the issue I've met.

First, by adding the nested cross syntax (e.g., changing grammar source files 
and the hooked functions), Pig can now accept the nested cross statement, 
generate the logic plan and reach the step of translating the logic plan into 
the physical plan. The major issue here is multiple inputs. The existing nested 
operators only have one input, hence current grammar seems to assume one input 
for the nested operators. However, cross can accept multiple inputs. Therefore 
the input syntax in the nested block has to been changed as well.

How to translate the logic nested cross into the correct physical operators 
looks like the essence part of this project. I've spent time to investigating 
into the translation, as well as the map/reduce plan compiling and map/reduce 
job execution.

During the stage of logic plan generation, both top-level and nested cross 
statements will result in a LOCross instance. LogToPhyTranslationVisitor 
already has the function to visit POCross. However, the fuction works with the 
top-level POCross, but not the nested one. To see why it doesn't work for 
nested cross, here are two map/reduce plans respectively generated for the 
top-level and the nested cross operators:

1. top-level:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
A = group user by uid;
B = group session by uid;
C = cross A, B;
store C into 'test.out';

#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-38
Map Plan
Union[tuple] - scope-39
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-21
|   |   |
|   |   Project[int][0] - scope-22
|   |   |
|   |   Project[int][1] - scope-23
|   |
|   |---C: New For Each(true,true)[tuple] - scope-20
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-18
|       |   |
|       |   |---Constant(2) - scope-16
|       |   |
|       |   |---Constant(0) - scope-17
|       |   |
|       |   Project[tuple][*] - scope-19
|       |
|       |---user: New For Each(false,false)[bag] - scope-5
|           |   |
|           |   Project[bytearray][0] - scope-1
|           |   |
|           |   Project[bytearray][1] - scope-3
|           |
|           |---user: 
Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-29
    |   |
    |   Project[int][0] - scope-30
    |   |
    |   Project[int][1] - scope-31
    |
    |---C: New For Each(true,true)[tuple] - scope-28
        |   |
        |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
        |   |
        |   |---Constant(2) - scope-24
        |   |
        |   |---Constant(1) - scope-25
        |   |
        |   Project[tuple][*] - scope-27
        |
        |---session: New For Each(false,false,false)[bag] - scope-13
            |   |
            |   Project[bytearray][0] - scope-7
            |   |
            |   Project[bytearray][1] - scope-9
            |   |
            |   Project[bytearray][2] - scope-11
            |
            |---session: 
Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage)
 - scope-6--------
Reduce Plan
C: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-35
|
|---POJoinPackage(true,true)[tuple] - scope-40--------
Global sort: false
----------------

2. nested:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
C = cogroup user by uid, session by uid;
D = foreach C {
    crossed = cross user, session;
    filtered = filter crossed by user::region == session::region;
    generate group, user, session;
};
store D into 'test.out';

#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-52
Map Plan
Union[tuple] - scope-53
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-16
|   |   |
|   |   Project[bytearray][0] - scope-17
|   |
|   |---user: New For Each(false,false)[bag] - scope-5
|       |   |
|       |   Project[bytearray][0] - scope-1
|       |   |
|       |   Project[bytearray][1] - scope-3
|       |
|       |---user: 
Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-18
    |   |
    |   Project[bytearray][0] - scope-19
    |
    |---session: New For Each(false,false,false)[bag] - scope-13
        |   |
        |   Project[bytearray][0] - scope-7
        |   |
        |   Project[bytearray][1] - scope-9
        |   |
        |   Project[bytearray][2] - scope-11
        |
        |---session: 
Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage)
 - scope-6--------
Reduce Plan
D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-49
|
|---D: New For Each(false)[bag] - scope-48
    |   |
    |   RelationToExpressionProject[bag][*] - scope-20
    |   |
    |   |---filtered: Filter[bag] - scope-44
    |       |   |
    |       |   Equal To[boolean] - scope-47
    |       |   |
    |       |   |---Project[bytearray][1] - scope-45
    |       |   |
    |       |   |---Project[bytearray][3] - scope-46
    |       |
    |       |---crossed: New For Each(true,true)[tuple] - scope-43
    |           |   |
    |           |   Project[bag][1] - scope-41
    |           |   |
    |           |   Project[bag][2] - scope-42
    |           |
    |           |---Package[tuple]{tuple} - scope-24
    |               |
    |               |---crossed: Global Rearrange[tuple] - scope-23
    |                   |
    |                   |---crossed: Local Rearrange[tuple]{tuple}(false) - 
scope-30
    |                   |   |   |
    |                   |   |   Project[int][0] - scope-31
    |                   |   |   |
    |                   |   |   Project[int][1] - scope-32
    |                   |   |
    |                   |   |---crossed: New For Each(true,true)[tuple] - 
scope-29
    |                   |       |   |
    |                   |       |   
POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-27
    |                   |       |   |
    |                   |       |   |---Constant(2) - scope-25
    |                   |       |   |
    |                   |       |   |---Constant(0) - scope-26
    |                   |       |   |
    |                   |       |   Project[tuple][*] - scope-28
    |                   |       |
    |                   |       |---Project[bag][1] - scope-21
    |                   |
    |                   |---crossed: Local Rearrange[tuple]{tuple}(false) - 
scope-38
    |                       |   |
    |                       |   Project[int][0] - scope-39
    |                       |   |
    |                       |   Project[int][1] - scope-40
    |                       |
    |                       |---crossed: New For Each(true,true)[tuple] - 
scope-37
    |                           |   |
    |                           |   
POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-35
    |                           |   |
    |                           |   |---Constant(2) - scope-33
    |                           |   |
    |                           |   |---Constant(1) - scope-34
    |                           |   |
    |                           |   Project[tuple][*] - scope-36
    |                           |
    |                           |---Project[bag][2] - scope-22
    |
    |---C: Package[tuple]{bytearray} - scope-15--------
Global sort: false
----------------

The difference in the physical plan is not obvious because translation 
procedure is same: local rearrange first -> global rearrange -> package -> 
physical foreach. However, when the physical plan is translated into the 
map/reduce plan, the difference becomes obvious. The physical operators 
belonging to the top-level cross are distributed in both map and reduce stage: 
local rearrange is placed in map; global rearrange is removed because the logic 
is inherently available in map/reduce; package and foreach group together to 
form POJoinPackage which is placed in the reduce stage.

On the other hand, the physical plan of the nested cross hasn't be translated 
into the same map/reduce plan. This is because the physical operators of the 
nested commands cannot be distributed into both map and reduce stages. Instead, 
they have to be solved locally in one of either map/reduce stage. Therefore, 
the problem occurs, the global rearrange and the package operators appear 
inside the reduce stage. First, the global rearrange used to be assumed to be 
replaced by Hadoop's merge/shuffle, so that the logic of processing data is not 
implemented. Second, package reads data from Hadoop's reduce function. And in 
the current implementation, only one package can appear in the reduce stage, 
because the keyInfo member of only one POPackage instance will be set.

As to these situations, when visiting LOCross in LogToPhyTranslationVisitor, 
whether it is a top-level or a nested cross should be distinguished (perhaps 
need modification on logical plan). If it is a nested cross, some other 
operator (temporarily named POCross) need to be generated to replace 
POGlobalRearrange and POPackage. It should achieve the similar functionality, 
but can operate locally. Afterwards, MRCompiler needs a function to visit the 
POCross instance and attach it to the reduce stage.

Attached is the code I've modified up till now.


> Nested cross
> ------------
>
>                 Key: PIG-1916
>                 URL: https://issues.apache.org/jira/browse/PIG-1916
>             Project: Pig
>          Issue Type: New Feature
>          Components: impl
>            Reporter: Daniel Dai
>              Labels: gsoc2011
>             Fix For: 0.10
>
>
> It is useful to have cross inside foreach nested statement. One typical use 
> case for nested foreach is after cogroup two relations, we want to flatten 
> the records of the same key, and do some processing. This is naturally to be 
> achieved by cross. Eg:
> {code}
> C = cogroup user by uid, session by uid;
> D = foreach C {
>     crossed = cross user, session; -- To flatten two input bags
>     filtered = filter crossed by user::region == session::region;
>     result = foreach crossed generate processSession(user::age, user::gender, 
> session::ip);  --Nested foreach Jira: PIG-1631
>     generate result;
> }
> {code}
> If we don't have cross, user have to write a UDF process the bag user, 
> session. It is much harder than a UDF process flattened tuples. This is 
> especially true when we have nested foreach statement(PIG-1631).
> This is a candidate project for Google summer of code 2011. More information 
> about the program can be found at http://wiki.apache.org/pig/GSoc2011

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to