If your secondary key is a string (or if you wouldn't mind treating it as a
string), then a combiner strategy can still work for you. Something like:

PTable<K, Map<String, Pair<Integer, Collection<Float>>>> pt = ...

w/a PType of tableOf(strings(), maps(pairs(ints(),
collections(floats())))), and I would strongly recommend using import
static o.a.c.types.avro.Avros.* in order to make that compact to express
and fast to run. Then your combiner could do the aggregations on the
Map<String, Pair<Integer, Collection<Float>>> entries to compute the
averages for each secondary key (reducing the IO) while still passing all
of the values for the same primary key to the same reducer. That was a
pattern that Sawzall supported that I always really liked and would like to
have in Crunch as well. What do you think?

J


On Tue, Dec 11, 2012 at 10:04 PM, Peter Knap <[email protected]> wrote:

> Hi Josh,
>
> Thanks for the quick reply. Here is my problem:
>
> My mappers will produce a lot of records with the same key which I will
> aggregate in the reducers. To cut down on the i/o I wanted to apply some
> aggregation on the map side. At the same time on the reducer side I want to
> aggregate across mappers output and produce final aggregation & format
> transformation. For example my mapper output will be:
>
> Key: <main key>           Value: <secondary key> <val1> ... <val N>
>
> I can aggregate (average) data for records with the same <main key>
> <secondary key> by having combiner produce:
>
> Key: <main key>           Value: <secondary key> <avg(val1)> ... <avg(val
> N)>
>
> This reduces a number of i/o a lot.
>
> Now my reducer will use just <main key> to produce final output :
>
> <main key>                  <secondary key> <avg(val1)> ... <avg(val N)> |
> <secondary key> <avg(val1)> ... <avg(val N)> | .........
>
> I was hoping to have just one M/R job to do it. But all I could come up
> was:
>
> PTable<K, V> myTable = ...;
> myTable.groupByKey()
>     .combineValues(CombineFn/Aggregator to do the combine step)
>     .groupByKey()
>     .parallelDo(DoFn to aggregate & transform result of CombineFn to
> another format for output)
>
> But that's 2 M/R jobs.
>
> Thanks,
> Piotr
>
>    ------------------------------
> *From:* Josh Wills <[email protected]>
> *To:* [email protected]; Peter Knap <[email protected]>
> *Sent:* Tuesday, December 11, 2012 11:44 PM
> *Subject:* Re: Combiner question
>
> Hey Peter,
>
> We might need some more details on what you're trying to do. You're
> allowed to add additional parallelDo operations after the combineValues
> operation, e.g.,
>
> PTable<K, V> myTable = ...;
> myTable.groupByKey()
>     .combineValues(CombineFn/Aggregator to do the combine step)
>     .parallelDo(DoFn to transform result of CombineFn to another format
> for output)
>
> is perfectly valid.
>
> J
>
>
> On Tue, Dec 11, 2012 at 9:41 PM, Peter Knap <[email protected]> wrote:
>
> Hi guys,
>
> I started a small POC with crunch as a replacement for the current python
> implementation and I ran into a problem with using combiners. How would one
> specify a combiner which is different from the reducer? I know that's not a
> typical case but I want to have partial optimization on the map side and at
> the same time the output format from reducer is different than from the
> combiner so I need two distinct classes. From looking at the code I can't
> figure it out how to do it. Any help would be greatly appreciated.
>
> Thanks,
> Piotr
>
>
>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to