Hi Josh, FYI it worked like a charm. Thanks for your help.
Piotr ________________________________ From: Josh Wills <[email protected]> To: [email protected]; Peter Knap <[email protected]> Sent: Wednesday, December 12, 2012 12:30 AM Subject: Re: Combiner question Please do, I'll be curious to know if it works. J On Tue, Dec 11, 2012 at 10:28 PM, Peter Knap <[email protected]> wrote: You are right, it might work - I didn't think about using maps. I'm curious what would be the overhead of using them though. I'll try it out tomorrow and let you know. > >Thanks a lot, >Piotr > > > > > > >________________________________ > From: Josh Wills <[email protected]> > >To: [email protected]; Peter Knap <[email protected]> >Sent: Wednesday, December 12, 2012 12:15 AM >Subject: Re: Combiner question > > > >If your secondary key is a string (or if you wouldn't mind treating it as a >string), then a combiner strategy can still work for you. Something like: > > >PTable<K, Map<String, Pair<Integer, Collection<Float>>>> pt = ... > > >w/a PType of tableOf(strings(), maps(pairs(ints(), collections(floats())))), >and I would strongly recommend using import static o.a.c.types.avro.Avros.* in >order to make that compact to express and fast to run. Then your combiner >could do the aggregations on the Map<String, Pair<Integer, Collection<Float>>> >entries to compute the averages for each secondary key (reducing the IO) while >still passing all of the values for the same primary key to the same reducer. >That was a pattern that Sawzall supported that I always really liked and would >like to have in Crunch as well. What do you think? > > >J > > > >On Tue, Dec 11, 2012 at 10:04 PM, Peter Knap <[email protected]> wrote: > >Hi Josh, >> >>Thanks for the quick reply. Here is my problem: >> >>My mappers will produce a lot of records with the same key which I will >>aggregate in the reducers. To cut down on the i/o I wanted to apply some >>aggregation on the map side. At the same time on the reducer side I want to >>aggregate across mappers output and produce final aggregation & format >>transformation. For example my mapper output will be: >> >>Key: <main key> Value: <secondary key> <val1> ... <val N> >> >>I can aggregate (average) data for records with the same <main key> >><secondary key> by having combiner produce: >> >> >>Key: <main key> Value: <secondary key> <avg(val1)> ... <avg(val N)> >> >> >>This reduces a number of i/o a lot. >> >> >> >>Now my reducer will use just <main key> to produce final output : >> >> >><main key> <secondary key> <avg(val1)> ... <avg(val N)> | >><secondary key> <avg(val1)> ... <avg(val N)> | ......... >> >> >> >>I was hoping to have just one M/R job to do it. But all I could come up was: >> >> >>PTable<K, V> myTable = ...; >>myTable.groupByKey() >> .combineValues(CombineFn/Aggregator to do the combine step) >> .groupByKey() >> .parallelDo(DoFn to aggregate & transform result of CombineFn to another >>format for output) >> >>But that's 2 M/R jobs. >> >> >> >>Thanks, >>Piotr >> >> >> >> >>________________________________ >> From: Josh Wills <[email protected]> >>To: [email protected]; Peter Knap <[email protected]> >>Sent: Tuesday, December 11, 2012 11:44 PM >>Subject: Re: Combiner question >> >> >> >>Hey Peter, >> >> >>We might need some more details on what you're trying to do. You're allowed >>to add additional parallelDo operations after the combineValues operation, >>e.g., >> >> >>PTable<K, V> myTable = ...; >>myTable.groupByKey() >> .combineValues(CombineFn/Aggregator to do the combine step) >> .parallelDo(DoFn to transform result of CombineFn to another format for >>output) >> >> >>is perfectly valid. >> >> >>J >> >> >> >>On Tue, Dec 11, 2012 at 9:41 PM, Peter Knap <[email protected]> wrote: >> >>Hi guys, >>> >>> >>>I started a small POC with crunch as a replacement for the current python >>>implementation and I ran into a problem with using combiners. How would one >>>specify a combiner which is different from the reducer? I know that's not a >>>typical case but I want to have partial optimization on the map side and at >>>the same time the output format from reducer is different than from the >>>combiner so I need two distinct classes. From looking at the code I can't >>>figure it out how to do it. Any help would be greatly appreciated. >>> >>> >>> >>>Thanks, >>>Piotr >>> >> >> >> > > > >-- > >Director of Data Science >Cloudera >Twitter: @josh_wills > > > -- Director of Data Science Cloudera Twitter: @josh_wills
