You are right, it might work - I didn't think about using maps. I'm curious what would be the overhead of using them though. I'll try it out tomorrow and let you know.
Thanks a lot, Piotr ________________________________ From: Josh Wills <[email protected]> To: [email protected]; Peter Knap <[email protected]> Sent: Wednesday, December 12, 2012 12:15 AM Subject: Re: Combiner question If your secondary key is a string (or if you wouldn't mind treating it as a string), then a combiner strategy can still work for you. Something like: PTable<K, Map<String, Pair<Integer, Collection<Float>>>> pt = ... w/a PType of tableOf(strings(), maps(pairs(ints(), collections(floats())))), and I would strongly recommend using import static o.a.c.types.avro.Avros.* in order to make that compact to express and fast to run. Then your combiner could do the aggregations on the Map<String, Pair<Integer, Collection<Float>>> entries to compute the averages for each secondary key (reducing the IO) while still passing all of the values for the same primary key to the same reducer. That was a pattern that Sawzall supported that I always really liked and would like to have in Crunch as well. What do you think? J On Tue, Dec 11, 2012 at 10:04 PM, Peter Knap <[email protected]> wrote: Hi Josh, > >Thanks for the quick reply. Here is my problem: > >My mappers will produce a lot of records with the same key which I will >aggregate in the reducers. To cut down on the i/o I wanted to apply some >aggregation on the map side. At the same time on the reducer side I want to >aggregate across mappers output and produce final aggregation & format >transformation. For example my mapper output will be: > >Key: <main key> Value: <secondary key> <val1> ... <val N> > >I can aggregate (average) data for records with the same <main key> <secondary >key> by having combiner produce: > > >Key: <main key> Value: <secondary key> <avg(val1)> ... <avg(val N)> > > >This reduces a number of i/o a lot. > > > >Now my reducer will use just <main key> to produce final output : > > ><main key> <secondary key> <avg(val1)> ... <avg(val N)> | ><secondary key> <avg(val1)> ... <avg(val N)> | ......... > > > >I was hoping to have just one M/R job to do it. But all I could come up was: > > >PTable<K, V> myTable = ...; >myTable.groupByKey() > .combineValues(CombineFn/Aggregator to do the combine step) > .groupByKey() > .parallelDo(DoFn to aggregate & transform result of CombineFn to another >format for output) > >But that's 2 M/R jobs. > > > >Thanks, >Piotr > > > > >________________________________ > From: Josh Wills <[email protected]> >To: [email protected]; Peter Knap <[email protected]> >Sent: Tuesday, December 11, 2012 11:44 PM >Subject: Re: Combiner question > > > >Hey Peter, > > >We might need some more details on what you're trying to do. You're allowed to >add additional parallelDo operations after the combineValues operation, e.g., > > >PTable<K, V> myTable = ...; >myTable.groupByKey() > .combineValues(CombineFn/Aggregator to do the combine step) > .parallelDo(DoFn to transform result of CombineFn to another format for >output) > > >is perfectly valid. > > >J > > > >On Tue, Dec 11, 2012 at 9:41 PM, Peter Knap <[email protected]> wrote: > >Hi guys, >> >> >>I started a small POC with crunch as a replacement for the current python >>implementation and I ran into a problem with using combiners. How would one >>specify a combiner which is different from the reducer? I know that's not a >>typical case but I want to have partial optimization on the map side and at >>the same time the output format from reducer is different than from the >>combiner so I need two distinct classes. From looking at the code I can't >>figure it out how to do it. Any help would be greatly appreciated. >> >> >> >>Thanks, >>Piotr >> > > > -- Director of Data Science Cloudera Twitter: @josh_wills
