[
https://issues.apache.org/jira/browse/PIG-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12977587#action_12977587
]
Thejas M Nair commented on PIG-1735:
------------------------------------
In existing implementation, the optimizer (CombinerOptimizer.java) adds
combiner to the plan when (co)group has single input. The combiner code is also
written to handle only a single input (see POCombinerPackage.java) .
Some rough/initial/incomplete thoughts about implementation for (co)group with
multiple inputs:
Combiner should(/can) be used only when the expressions in the foreach
following (co)group get inputs from algebraic udfs or group key columns, and
the algebraic udf arguments are from only one of the input relations.
In the combiner, tuples from each input stream should be grouped/combined
separately. ie, as if the group-by column is 'group-key-columns + input-index'
. POPackage (or a new subclass) will need to support this grouping. A
POForEach statement will follow the POPackage, it can use a conditional
operator (POBinCond) with condition on input-index to evaluate corresponding
udfs.
CombinerOptimizer will need to identify the relational inputs that each
algebraic udfs gets input from, and add a POForeach with 'Initial' calls to the
udf for each input in the map physical plan.
> Use combiner in cogroup
> -----------------------
>
> Key: PIG-1735
> URL: https://issues.apache.org/jira/browse/PIG-1735
> Project: Pig
> Issue Type: Improvement
> Reporter: Thejas M Nair
> Assignee: Thejas M Nair
> Fix For: 0.9.0
>
>
> As reported by Scott Carey in PIG-479, combiner does not get used for
> co-group, even if the functions applied on the bags are algebraic . -
> Quoting from the comment -
> "For example, I'm not quite sure why this one doesn't use a combiner - it
> reads ~350x as much input bytes from HDFS as its reduce output, a combiner
> would be very effective:
> J = COGROUP
> UV BY (s, d, h, g, p, pa, st) OUTER,
> UC BY (s, d, h, g, p, pa, st) OUTER,
> AT BY (s, d, h, g, p, pa, st) OUTER,
> V BY (s, d, h, g, p, pa, st) OUTER,
> C BY (s, d, h, g, p, pa, st) OUTER;
> OUTPUT = FOREACH J GENERATE
> FLATTEN(group) as (s, d, h, g, p, pa, st),
> COUNT_STAR(C) as c,
> COUNT_STAR(V) as v,
> SUM(AT.p1) as p1,
> SUM(AT.p2) as p2,
> SUM(AT.p3) as p3,
> SUM(UC.q) as ucq,
> SUM(UC.r) as ucr,
> SUM(UV.q) as uvq,
> SUM(UV.r) as uvr;
> "
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.