[ 
https://issues.apache.org/jira/browse/PIG-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12977587#action_12977587
 ] 

Thejas M Nair commented on PIG-1735:
------------------------------------

In existing implementation, the optimizer (CombinerOptimizer.java) adds 
combiner to the plan when (co)group has single input. The combiner code is also 
written to handle only a single input (see POCombinerPackage.java) .

Some rough/initial/incomplete thoughts about implementation for (co)group with 
multiple inputs:

Combiner should(/can) be used only when the expressions in the foreach 
following (co)group get inputs from algebraic udfs or group key columns, and 
the algebraic udf arguments are from only one of the input relations.

In the combiner, tuples from each input stream should be grouped/combined 
separately. ie, as if the group-by column is 'group-key-columns + input-index' 
.  POPackage (or a new subclass) will need to support this grouping. A 
POForEach statement will follow the POPackage, it can use a conditional 
operator (POBinCond) with condition on input-index to evaluate corresponding 
udfs.

CombinerOptimizer will need to identify the relational inputs that each 
algebraic udfs gets input from, and add a POForeach with 'Initial' calls to the 
udf for each input in the map physical plan. 

> Use combiner in cogroup
> -----------------------
>
>                 Key: PIG-1735
>                 URL: https://issues.apache.org/jira/browse/PIG-1735
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Thejas M Nair
>            Assignee: Thejas M Nair
>             Fix For: 0.9.0
>
>
> As reported by Scott Carey in PIG-479, combiner does not get used for 
> co-group, even if the functions applied on the bags are algebraic . -
> Quoting from the comment  - 
> "For example, I'm not quite sure why this one doesn't use a combiner - it 
> reads ~350x as much input bytes from HDFS as its reduce output, a combiner 
> would be very effective:
> J = COGROUP
> UV BY (s, d, h, g, p, pa, st) OUTER,
> UC BY (s, d, h, g, p, pa, st) OUTER,
> AT BY (s, d, h, g, p, pa, st) OUTER,
> V BY (s, d, h, g, p, pa, st) OUTER,
> C BY (s, d, h, g, p, pa, st) OUTER;
> OUTPUT = FOREACH J GENERATE
> FLATTEN(group) as (s, d, h, g, p, pa, st),
> COUNT_STAR(C) as c,
> COUNT_STAR(V) as v,
> SUM(AT.p1) as p1,
> SUM(AT.p2) as p2,
> SUM(AT.p3) as p3,
> SUM(UC.q) as ucq,
> SUM(UC.r) as ucr,
> SUM(UV.q) as uvq,
> SUM(UV.r) as uvr;
> "

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to