Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the requirements ?
On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote: > For it to be considered a combiner, the function needs to be associative > and commutative. > > The issue is that from an API perspective it would be easy to have a > Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many > people in the data processing world expect that this > parallelization/optimization is performed and thus exposing such a method > would be dangerous as it would be breaking users expectations so from the > design perspective it is a hard requirement. If PCollections ever become > ordered or gain other properties, these requirements may loosen but it > seems unlikely in the short term. > > At this point, I think your looking for a MapElements which you pass in a > SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>. > Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K, > OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>, > OutputT> should be trivial. > > > On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com> > wrote: > > Thanks for the thorough explanation. I see the benefits for such a > function. > My follow-up question is whether this is a hard requirement. > There are computations that don't satisfy this (I think it's monoid rule) > but possible and easier to write with > Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not > difficult to provide an underlying CombineFn. > > > On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid> > wrote: > > Combine.perKey takes a single SerializableFunction which knows how to > convert from Iterable<V> to V. > > It turns out that many runners implement optimizations which allow them to > run the combine operation across several machines to parallelize the work > and potentially reduce the amount of data they store during a GBK. > To be able to do such an optimization, it requires you to actually have > three functions: > InputT -> AccumulatorT : Creates the intermediate representation which > allows for associative combining > Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining > AccumT -> OutputT: Extracts the output > > In the case of Combine.perKey with a SerializableFunction, your providing > Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the > identity functions. > > To be able to support a Combine.perKey which can go from Iterable<InputT> > -> OutputT would require that this occurred within a single machine > removing the parallelization benefits that runners provide and for almost > all cases is not a good idea. > > On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com> > wrote: > > > Hi all, > > > > I'm wondering why `Combine.perKey(SerializableFunction)` requires input > > and > > output to be of the same type while `Combine.PerKey` doesn't have this > > restriction. > > > > Thanks, > > Manu > > > > >