[
https://issues.apache.org/jira/browse/PIG-1916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13050201#comment-13050201
]
Zhijie Shen commented on PIG-1916:
----------------------------------
Make a report of my progress and the issue I've met.
First, by adding the nested cross syntax (e.g., changing grammar source files
and the hooked functions), Pig can now accept the nested cross statement,
generate the logic plan and reach the step of translating the logic plan into
the physical plan. The major issue here is multiple inputs. The existing nested
operators only have one input, hence current grammar seems to assume one input
for the nested operators. However, cross can accept multiple inputs. Therefore
the input syntax in the nested block has to been changed as well.
How to translate the logic nested cross into the correct physical operators
looks like the essence part of this project. I've spent time to investigating
into the translation, as well as the map/reduce plan compiling and map/reduce
job execution.
During the stage of logic plan generation, both top-level and nested cross
statements will result in a LOCross instance. LogToPhyTranslationVisitor
already has the function to visit POCross. However, the fuction works with the
top-level POCross, but not the nested one. To see why it doesn't work for
nested cross, here are two map/reduce plans respectively generated for the
top-level and the nested cross operators:
1. top-level:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
A = group user by uid;
B = group session by uid;
C = cross A, B;
store C into 'test.out';
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-38
Map Plan
Union[tuple] - scope-39
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-21
| | |
| | Project[int][0] - scope-22
| | |
| | Project[int][1] - scope-23
| |
| |---C: New For Each(true,true)[tuple] - scope-20
| | |
| | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-18
| | |
| | |---Constant(2) - scope-16
| | |
| | |---Constant(0) - scope-17
| | |
| | Project[tuple][*] - scope-19
| |
| |---user: New For Each(false,false)[bag] - scope-5
| | |
| | Project[bytearray][0] - scope-1
| | |
| | Project[bytearray][1] - scope-3
| |
| |---user:
Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-29
| |
| Project[int][0] - scope-30
| |
| Project[int][1] - scope-31
|
|---C: New For Each(true,true)[tuple] - scope-28
| |
| POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
| |
| |---Constant(2) - scope-24
| |
| |---Constant(1) - scope-25
| |
| Project[tuple][*] - scope-27
|
|---session: New For Each(false,false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
| Project[bytearray][2] - scope-11
|
|---session:
Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage)
- scope-6--------
Reduce Plan
C: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-35
|
|---POJoinPackage(true,true)[tuple] - scope-40--------
Global sort: false
----------------
2. nested:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
C = cogroup user by uid, session by uid;
D = foreach C {
crossed = cross user, session;
filtered = filter crossed by user::region == session::region;
generate group, user, session;
};
store D into 'test.out';
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-52
Map Plan
Union[tuple] - scope-53
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-16
| | |
| | Project[bytearray][0] - scope-17
| |
| |---user: New For Each(false,false)[bag] - scope-5
| | |
| | Project[bytearray][0] - scope-1
| | |
| | Project[bytearray][1] - scope-3
| |
| |---user:
Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-18
| |
| Project[bytearray][0] - scope-19
|
|---session: New For Each(false,false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
| Project[bytearray][2] - scope-11
|
|---session:
Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage)
- scope-6--------
Reduce Plan
D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-49
|
|---D: New For Each(false)[bag] - scope-48
| |
| RelationToExpressionProject[bag][*] - scope-20
| |
| |---filtered: Filter[bag] - scope-44
| | |
| | Equal To[boolean] - scope-47
| | |
| | |---Project[bytearray][1] - scope-45
| | |
| | |---Project[bytearray][3] - scope-46
| |
| |---crossed: New For Each(true,true)[tuple] - scope-43
| | |
| | Project[bag][1] - scope-41
| | |
| | Project[bag][2] - scope-42
| |
| |---Package[tuple]{tuple} - scope-24
| |
| |---crossed: Global Rearrange[tuple] - scope-23
| |
| |---crossed: Local Rearrange[tuple]{tuple}(false) -
scope-30
| | | |
| | | Project[int][0] - scope-31
| | | |
| | | Project[int][1] - scope-32
| | |
| | |---crossed: New For Each(true,true)[tuple] -
scope-29
| | | |
| | |
POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-27
| | | |
| | | |---Constant(2) - scope-25
| | | |
| | | |---Constant(0) - scope-26
| | | |
| | | Project[tuple][*] - scope-28
| | |
| | |---Project[bag][1] - scope-21
| |
| |---crossed: Local Rearrange[tuple]{tuple}(false) -
scope-38
| | |
| | Project[int][0] - scope-39
| | |
| | Project[int][1] - scope-40
| |
| |---crossed: New For Each(true,true)[tuple] -
scope-37
| | |
| |
POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-35
| | |
| | |---Constant(2) - scope-33
| | |
| | |---Constant(1) - scope-34
| | |
| | Project[tuple][*] - scope-36
| |
| |---Project[bag][2] - scope-22
|
|---C: Package[tuple]{bytearray} - scope-15--------
Global sort: false
----------------
The difference in the physical plan is not obvious because translation
procedure is same: local rearrange first -> global rearrange -> package ->
physical foreach. However, when the physical plan is translated into the
map/reduce plan, the difference becomes obvious. The physical operators
belonging to the top-level cross are distributed in both map and reduce stage:
local rearrange is placed in map; global rearrange is removed because the logic
is inherently available in map/reduce; package and foreach group together to
form POJoinPackage which is placed in the reduce stage.
On the other hand, the physical plan of the nested cross hasn't be translated
into the same map/reduce plan. This is because the physical operators of the
nested commands cannot be distributed into both map and reduce stages. Instead,
they have to be solved locally in one of either map/reduce stage. Therefore,
the problem occurs, the global rearrange and the package operators appear
inside the reduce stage. First, the global rearrange used to be assumed to be
replaced by Hadoop's merge/shuffle, so that the logic of processing data is not
implemented. Second, package reads data from Hadoop's reduce function. And in
the current implementation, only one package can appear in the reduce stage,
because the keyInfo member of only one POPackage instance will be set.
As to these situations, when visiting LOCross in LogToPhyTranslationVisitor,
whether it is a top-level or a nested cross should be distinguished (perhaps
need modification on logical plan). If it is a nested cross, some other
operator (temporarily named POCross) need to be generated to replace
POGlobalRearrange and POPackage. It should achieve the similar functionality,
but can operate locally. Afterwards, MRCompiler needs a function to visit the
POCross instance and attach it to the reduce stage.
Attached is the code I've modified up till now.
> Nested cross
> ------------
>
> Key: PIG-1916
> URL: https://issues.apache.org/jira/browse/PIG-1916
> Project: Pig
> Issue Type: New Feature
> Components: impl
> Reporter: Daniel Dai
> Labels: gsoc2011
> Fix For: 0.10
>
>
> It is useful to have cross inside foreach nested statement. One typical use
> case for nested foreach is after cogroup two relations, we want to flatten
> the records of the same key, and do some processing. This is naturally to be
> achieved by cross. Eg:
> {code}
> C = cogroup user by uid, session by uid;
> D = foreach C {
> crossed = cross user, session; -- To flatten two input bags
> filtered = filter crossed by user::region == session::region;
> result = foreach crossed generate processSession(user::age, user::gender,
> session::ip); --Nested foreach Jira: PIG-1631
> generate result;
> }
> {code}
> If we don't have cross, user have to write a UDF process the bag user,
> session. It is much harder than a UDF process flattened tuples. This is
> especially true when we have nested foreach statement(PIG-1631).
> This is a candidate project for Google summer of code 2011. More information
> about the program can be found at http://wiki.apache.org/pig/GSoc2011
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira