This suggestion lgtm. I would vote for the first alternative than adding it to the `KStreamBuilder` though.
On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> wrote: > I have a minor suggestion to make the API a little bit more symmetric. > I feel it would make more sense to move the initializer and serde to the > final aggregate statement, since the serde only applies to the state store, > and the initializer doesn't bear any relation to the first group in > particular. It would end up looking like this: > > KTable<K, CG> cogrouped = > grouped1.cogroup(aggregator1) > .cogroup(grouped2, aggregator2) > .cogroup(grouped3, aggregator3) > .aggregate(initializer1, aggValueSerde, storeName1); > > Alternatively, we could move the the first cogroup() method to > KStreamBuilder, similar to how we have .merge() > and end up with an api that would be even more symmetric. > > KStreamBuilder.cogroup(grouped1, aggregator1) > .cogroup(grouped2, aggregator2) > .cogroup(grouped3, aggregator3) > .aggregate(initializer1, aggValueSerde, storeName1); > > This doesn't have to be a blocker, but I thought it would make the API just > a tad cleaner. > > On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Kyle, > > > > Thanks a lot for the updated KIP. It looks good to me. > > > > > > Guozhang > > > > > > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote: > > > > > This makes much more sense to me. +1 > > > > > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < > winkelman.k...@gmail.com> > > > wrote: > > > > > > > > I have updated the KIP and my PR. Let me know what you think. > > > > To created a cogrouped stream just call cogroup on a KgroupedStream > and > > > > supply the initializer, aggValueSerde, and an aggregator. Then > continue > > > > adding kgroupedstreams and aggregators. Then call one of the many > > > aggregate > > > > calls to create a KTable. > > > > > > > > Thanks, > > > > Kyle > > > > > > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote: > > > > > > > >> Hi Kyle, > > > >> > > > >> Thanks for the update. I think just one initializer makes sense as > it > > > >> should only be called once per key and generally it is just going to > > > create > > > >> a new instance of whatever the Aggregate class is. > > > >> > > > >> Cheers, > > > >> Damian > > > >> > > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < > winkelman.k...@gmail.com > > > > > > >> wrote: > > > >> > > > >>> Hello all, > > > >>> > > > >>> I have spent some more time on this and the best alternative I have > > > come > > > >> up > > > >>> with is: > > > >>> KGroupedStream has a single cogroup call that takes an initializer > > and > > > an > > > >>> aggregator. > > > >>> CogroupedKStream has a cogroup call that takes additional > > groupedStream > > > >>> aggregator pairs. > > > >>> CogroupedKStream has multiple aggregate methods that create the > > > different > > > >>> stores. > > > >>> > > > >>> I plan on updating the kip but I want people's input on if we > should > > > have > > > >>> the initializer be passed in once at the beginning or if we should > > > >> instead > > > >>> have the initializer be required for each call to one of the > > aggregate > > > >>> calls. The first makes more sense to me but doesnt allow the user > to > > > >>> specify different initializers for different tables. > > > >>> > > > >>> Thanks, > > > >>> Kyle > > > >>> > > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < > winkelman.k...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Yea I really like that idea I'll see what I can do to update the > kip > > > >> and > > > >>>> my pr when I have some time. I'm not sure how well creating the > > > >>>> kstreamaggregates will go though because at that point I will have > > > >> thrown > > > >>>> away the type of the values. It will be type safe I just may need > to > > > >> do a > > > >>>> little forcing. > > > >>>> > > > >>>> Thanks, > > > >>>> Kyle > > > >>>> > > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> > > wrote: > > > >>>> > > > >>>>> Kyle, > > > >>>>> > > > >>>>> Thanks for the explanations, my previous read on the wiki > examples > > > was > > > >>>>> wrong. > > > >>>>> > > > >>>>> So I guess my motivation should be "reduced" to: can we move the > > > >> window > > > >>>>> specs param from "KGroupedStream#cogroup(..)" to > > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: > > > >>>>> > > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 > > to > > > >> 2. > > > >>>>> 2. major: this is for extensibility of the APIs, and since we are > > > >>> removing > > > >>>>> the "Evolving" annotations on Streams it may be harder to change > it > > > >>> again > > > >>>>> in the future. The extended use cases are that people wanted to > > have > > > >>>>> windowed running aggregates on different granularities, e.g. > "give > > me > > > >>> the > > > >>>>> counts per-minute, per-hour, per-day and per-week", and today in > > DSL > > > >> we > > > >>>>> need to specify that case in multiple aggregate operators, which > > gets > > > >> a > > > >>>>> state store / changelog, etc. And it is possible to optimize it > as > > > >> well > > > >>> to > > > >>>>> a single state store. Its implementation would be tricky as you > > need > > > >> to > > > >>>>> contain different lengthed windows within your window store but > > just > > > >>> from > > > >>>>> the public API point of view, it could be specified as: > > > >>>>> > > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... > > > >>>>> "state-store-name"); > > > >>>>> > > > >>>>> table1 = stream.aggregate(/*per-minute window*/) > > > >>>>> table2 = stream.aggregate(/*per-hour window*/) > > > >>>>> table3 = stream.aggregate(/*per-day window*/) > > > >>>>> > > > >>>>> while underlying we are only using a single store > > "state-store-name" > > > >> for > > > >>>>> it. > > > >>>>> > > > >>>>> > > > >>>>> Although this feature is out of the scope of this KIP, I'd like > to > > > >>> discuss > > > >>>>> if we can "leave the door open" to make such changes without > > > modifying > > > >>> the > > > >>>>> public APIs . > > > >>>>> > > > >>>>> Guozhang > > > >>>>> > > > >>>>> > > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > > > >>> winkelman.k...@gmail.com > > > >>>>>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> I allow defining a single window/sessionwindow one time when you > > > >> make > > > >>>>> the > > > >>>>>> cogroup call from a KGroupedStream. From then on you are using > the > > > >>>>> cogroup > > > >>>>>> call from with in CogroupedKStream which doesnt accept any > > > >> additional > > > >>>>>> windows/sessionwindows. > > > >>>>>> > > > >>>>>> Is this what you meant by your question or did I misunderstand? > > > >>>>>> > > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> > > > >> wrote: > > > >>>>>> > > > >>>>>> Another question that came to me is on "window alignment": from > > the > > > >>> KIP > > > >>>>> it > > > >>>>>> seems you are allowing users to specify a (potentially > different) > > > >>> window > > > >>>>>> spec in each co-grouped input stream. So if these window specs > are > > > >>>>>> different how should we "align" them with different input > > streams? I > > > >>>>> think > > > >>>>>> it is more natural to only specify on window spec in the > > > >>>>>> > > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); > > > >>>>>> > > > >>>>>> > > > >>>>>> And remove it from the cogroup() functions. WDYT? > > > >>>>>> > > > >>>>>> > > > >>>>>> Guozhang > > > >>>>>> > > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < > > wangg...@gmail.com> > > > >>>>> wrote: > > > >>>>>> > > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use case > to > > > >>>>> support > > > >>>>>>> such multi-way table join (i.e. N source tables with N > aggregate > > > >>> func) > > > >>>>>> with > > > >>>>>>> a single store and N+1 serdes, I have seen lots of people using > > > >> the > > > >>>>>>> low-level PAPI to achieve this goal. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > > > >>>>>> winkelman.k...@gmail.com > > > >>>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> I like your point about not handling other cases such as count > > > >> and > > > >>>>>> reduce. > > > >>>>>>>> > > > >>>>>>>> I think that reduce may not make sense because reduce assumes > > > >> that > > > >>>>> the > > > >>>>>>>> input values are the same as the output values. With cogroup > > > >> there > > > >>>>> may > > > >>>>>> be > > > >>>>>>>> multiple different input types and then your output type cant > be > > > >>>>>> multiple > > > >>>>>>>> different things. In the case where you have all matching > value > > > >>> types > > > >>>>>> you > > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce. > > > >>>>>>>> > > > >>>>>>>> As for count I think it is possible to call count on all the > > > >>>>> individual > > > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe > make > > > >> a > > > >>>>>> special > > > >>>>>>>> call in groupedstream for this case. Because in this case we > > dont > > > >>>>> need > > > >>>>>> to > > > >>>>>>>> do type checking on the values. It could be similar to the > > > >> current > > > >>>>> count > > > >>>>>>>> methods but accept a var args of additonal grouped streams as > > > >> well > > > >>>>> and > > > >>>>>>>> make > > > >>>>>>>> sure they have a key type of K. > > > >>>>>>>> > > > >>>>>>>> The way I have put the kip together is to ensure that we do > type > > > >>>>>> checking. > > > >>>>>>>> I don't see a way we could group them all first and then make > a > > > >>> call > > > >>>>> to > > > >>>>>>>> count, reduce, or aggregate because with aggregate they would > > > >> need > > > >>> to > > > >>>>>> pass > > > >>>>>>>> a list of aggregators and we would have no way of type > checking > > > >>> that > > > >>>>>> they > > > >>>>>>>> match the grouped streams. > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> Kyle > > > >>>>>>>> > > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" < > xav...@confluent.io> > > > >>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very > > > >>> useful > > > >>>>>>>>> addition and wanted to provide an additional use-case and > some > > > >>> more > > > >>>>>>>>> comments. > > > >>>>>>>>> > > > >>>>>>>>> This is actually a very common analytics use-case in the > > > >> ad-tech > > > >>>>>>>> industry. > > > >>>>>>>>> The typical setup will have an auction stream, an impression > > > >>>>> stream, > > > >>>>>>>> and a > > > >>>>>>>>> click stream. Those three streams need to be combined to > > > >> compute > > > >>>>>>>> aggregate > > > >>>>>>>>> statistics (e.g. impression statistics, and click-through > > > >> rates), > > > >>>>>> since > > > >>>>>>>>> most of the attributes of interest are only present the > auction > > > >>>>>> stream. > > > >>>>>>>>> > > > >>>>>>>>> A simple way to do this is to co-group all the streams by the > > > >>>>> auction > > > >>>>>>>> key, > > > >>>>>>>>> and process updates to the co-group as events for each stream > > > >>> come > > > >>>>> in, > > > >>>>>>>>> keeping only one value from each stream before sending > > > >> downstream > > > >>>>> for > > > >>>>>>>>> further processing / aggregation. > > > >>>>>>>>> > > > >>>>>>>>> One could view the result of that co-group operation as a > > > >>> "KTable" > > > >>>>>> with > > > >>>>>>>>> multiple values per key. The key being the grouping key, and > > > >> the > > > >>>>>> values > > > >>>>>>>>> consisting of one value per stream. > > > >>>>>>>>> > > > >>>>>>>>> What I like about Kyle's approach is that allows elegant > > > >>>>> co-grouping > > > >>>>>> of > > > >>>>>>>>> multiple streams without having to worry about the number of > > > >>>>> streams, > > > >>>>>>>> and > > > >>>>>>>>> avoids dealing with Tuple types or other generic interfaces > > > >> that > > > >>>>> could > > > >>>>>>>> get > > > >>>>>>>>> messy if we wanted to preserve all the value types in the > > > >>> resulting > > > >>>>>>>>> co-grouped stream. > > > >>>>>>>>> > > > >>>>>>>>> My only concern is that we only allow the cogroup + aggregate > > > >>>>> combined > > > >>>>>>>>> operation. This forces the user to build their own tuple > > > >>>>> serialization > > > >>>>>>>>> format if they want to preserve the individual input stream > > > >>> values > > > >>>>> as > > > >>>>>> a > > > >>>>>>>>> group. It also deviates quite a bit from our approach in > > > >>>>>> KGroupedStream > > > >>>>>>>>> which offers other operations, such as count and reduce, > which > > > >>>>> should > > > >>>>>>>> also > > > >>>>>>>>> be applicable to a co-grouped stream. > > > >>>>>>>>> > > > >>>>>>>>> Overall I still think this is a really useful addition, but I > > > >>> feel > > > >>>>> we > > > >>>>>>>>> haven't spend much time trying to explore alternative DSLs > that > > > >>>>> could > > > >>>>>>>> maybe > > > >>>>>>>>> generalize better or match our existing syntax more closely. > > > >>>>>>>>> > > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > > > >>>>>> winkelman.k...@gmail.com > > > >>>>>>>>> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka > > > >>> streams > > > >>>>>> realm > > > >>>>>>>>> that > > > >>>>>>>>>> I should reach out to for input? > > > >>>>>>>>>> > > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this > more > > > >>> in > > > >>>>>> depth > > > >>>>>>>>> so I > > > >>>>>>>>>> will wait for his inputs before continuing. > > > >>>>>>>>>> > > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" < > > > >> eno.there...@gmail.com > > > >>>> > > > >>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Thanks Kyle, good arguments. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Eno > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < > > > >>>>>>>> winkelman.k...@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to what > > > >>>>> Jay’s > > > >>>>>>>>> example > > > >>>>>>>>>>> is, > > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super > > > >>>>>> concrete?* > > > >>>>>>>>>>>> I have added a more concrete example to the KIP. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> *- my main concern is that we’re exposing this > > > >> optimization > > > >>>>> to > > > >>>>>> the > > > >>>>>>>>> DSL. > > > >>>>>>>>>>> In > > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL > > > >>> and > > > >>>>> do > > > >>>>>>>> the > > > >>>>>>>>>> right > > > >>>>>>>>>>>> thing under the covers (create just one state store, > > > >>> arrange > > > >>>>> the > > > >>>>>>>>> nodes > > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable > > > >>>>> pieces > > > >>>>>>>>> (group, > > > >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d > > > >>>>> like to > > > >>>>>>>> hear > > > >>>>>>>>>>> your > > > >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization > > > >>>>> with > > > >>>>>> the > > > >>>>>>>>>>> current > > > >>>>>>>>>>>> DSL, at the topology builder level.* > > > >>>>>>>>>>>> You would have to make a lot of checks to understand if > > > >> it > > > >>> is > > > >>>>>> even > > > >>>>>>>>>>> possible > > > >>>>>>>>>>>> to make this optimization: > > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins > > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything > > > >>>>> else. > > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be > > > >>>>>>>> impossible > > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the > > > >>> topology > > > >>>>> has > > > >>>>>>>>>> already > > > >>>>>>>>>>>> been built.) > > > >>>>>>>>>>>> You would then need to make decisions during the > > > >>>>> optimization: > > > >>>>>>>>>>>> 1. Your new initializer would the composite of all the > > > >>>>>> individual > > > >>>>>>>>>>>> initializers and the valueJoiners. > > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would > > > >>> turn > > > >>>>> the > > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that > > > >> would > > > >>>>> work > > > >>>>>> on > > > >>>>>>>>> the > > > >>>>>>>>>>>> final object, but this may be possible. > > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared > > > >> would > > > >>>>> be > > > >>>>>> for > > > >>>>>>>>> the > > > >>>>>>>>>>>> aggregate values. None of the declared ones would be > > > >>>>> guaranteed > > > >>>>>> to > > > >>>>>>>>> hold > > > >>>>>>>>>>> the > > > >>>>>>>>>>>> final object. This would mean you must created a new > > > >> state > > > >>>>> store > > > >>>>>>>> and > > > >>>>>>>>>> not > > > >>>>>>>>>>>> created any of the declared ones. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> The main argument I have against it is even if it could > > > >> be > > > >>>>> done > > > >>>>>> I > > > >>>>>>>>> don't > > > >>>>>>>>>>>> know that we would want to have this be an optimization > > > >> in > > > >>>>> the > > > >>>>>>>>>> background > > > >>>>>>>>>>>> because the user would still be required to think about > > > >> all > > > >>>>> of > > > >>>>>> the > > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry > > > >> about > > > >>>>> if > > > >>>>>>>> they > > > >>>>>>>>>> only > > > >>>>>>>>>>>> care about the final object. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it > > > >>> should > > > >>>>> be > > > >>>>>>>> part > > > >>>>>>>>> of > > > >>>>>>>>>>> the > > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we > > > >> want > > > >>> to > > > >>>>>>>> allow > > > >>>>>>>>>>> people > > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right > > > >>>>> now I > > > >>>>>>>> don't > > > >>>>>>>>>>> think > > > >>>>>>>>>>>> we give them ways of handling this use case easily. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> *-I think there will be scope for several such > > > >>> optimizations > > > >>>>> in > > > >>>>>>>> the > > > >>>>>>>>>>> future > > > >>>>>>>>>>>> and perhaps at some point we need to think about > > > >> decoupling > > > >>>>> the > > > >>>>>>>> 1:1 > > > >>>>>>>>>>> mapping > > > >>>>>>>>>>>> from the DSL into the physical topology.* > > > >>>>>>>>>>>> I would argue that cogroup is not just an optimization it > > > >>> is > > > >>>>> a > > > >>>>>> new > > > >>>>>>>>> way > > > >>>>>>>>>>> for > > > >>>>>>>>>>>> the users to look at accomplishing a problem that > > > >> requires > > > >>>>>>>> multiple > > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't > > > >> think > > > >>>>>> users > > > >>>>>>>>>> should > > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with > > > >>> their > > > >>>>>>>>>>> initializers, > > > >>>>>>>>>>>> serdes and stores if all they care about is the final > > > >>> object. > > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't > > > >> supply > > > >>>>>>>> additional > > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a > > > >> single > > > >>>>>>>> grouped > > > >>>>>>>>>>> stream > > > >>>>>>>>>>>> making an aggregate call. This case is what I view an > > > >>>>>> optimization > > > >>>>>>>>> as, > > > >>>>>>>>>> we > > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was > > > >>> just > > > >>>>> a > > > >>>>>>>> call > > > >>>>>>>>> to > > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling > > > >>>>>>>> KGroupedStream#cogroup. > > > >>>>>>>>> (I > > > >>>>>>>>>>>> would prefer to just write a warning saying that this is > > > >>> not > > > >>>>> how > > > >>>>>>>>>> cogroup > > > >>>>>>>>>>> is > > > >>>>>>>>>>>> to be used.) > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>> Kyle > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > > > >>>>>>>> eno.there...@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Kyle, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to what > > > >>>>> Jay’s > > > >>>>>>>>> example > > > >>>>>>>>>>> is, > > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super > > > >>>>>> concrete? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> - my main concern is that we’re exposing this > > > >> optimization > > > >>>>> to > > > >>>>>> the > > > >>>>>>>>> DSL. > > > >>>>>>>>>>> In > > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL > > > >>>>> and do > > > >>>>>>>> the > > > >>>>>>>>>>> right > > > >>>>>>>>>>>>> thing under the covers (create just one state store, > > > >>> arrange > > > >>>>>> the > > > >>>>>>>>> nodes > > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable > > > >>>>> pieces > > > >>>>>>>>> (group, > > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d > > > >>>>> like > > > >>>>>> to > > > >>>>>>>>> hear > > > >>>>>>>>>>> your > > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this > > > >> optimization > > > >>>>> with > > > >>>>>>>> the > > > >>>>>>>>>>> current > > > >>>>>>>>>>>>> DSL, at the topology builder level. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I think there will be scope for several such > > > >> optimizations > > > >>>>> in > > > >>>>>> the > > > >>>>>>>>>> future > > > >>>>>>>>>>>>> and perhaps at some point we need to think about > > > >>> decoupling > > > >>>>> the > > > >>>>>>>> 1:1 > > > >>>>>>>>>>> mapping > > > >>>>>>>>>>>>> from the DSL into the physical topology. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks > > > >>>>>>>>>>>>> Eno > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps < > > > >> j...@confluent.io> > > > >>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is > > > >>> pretty > > > >>>>>>>> common. > > > >>>>>>>>> An > > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified > > > >> customer > > > >>>>>>>> profile" > > > >>>>>>>>> use > > > >>>>>>>>>>>>> case > > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems > > > >>>>> each > > > >>>>>> of > > > >>>>>>>>> which > > > >>>>>>>>>>> has > > > >>>>>>>>>>>>>> some information about your customer (account details, > > > >>>>>> settings, > > > >>>>>>>>>>> billing > > > >>>>>>>>>>>>>> info, customer service contacts, purchase history, > > > >> etc). > > > >>>>> Your > > > >>>>>>>> goal > > > >>>>>>>>> is > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>> join/munge these into a single profile record for each > > > >>>>>> customer > > > >>>>>>>>> that > > > >>>>>>>>>>> has > > > >>>>>>>>>>>>>> all the relevant info in a usable form and is > > > >> up-to-date > > > >>>>> with > > > >>>>>>>> all > > > >>>>>>>>> the > > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams as > > > >> a > > > >>>>>>>> sequence > > > >>>>>>>>> of > > > >>>>>>>>>>>>> joins > > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate > > > >>>>> tables. > > > >>>>>>>> But > > > >>>>>>>>>>> clearly > > > >>>>>>>>>>>>>> you only need a single stage to group all these things > > > >>> that > > > >>>>>> are > > > >>>>>>>>>> already > > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this > > > >>> under > > > >>>>> the > > > >>>>>>>>> covers > > > >>>>>>>>>>>>> which > > > >>>>>>>>>>>>>> is arguably better (at least when it does the right > > > >>> thing) > > > >>>>> and > > > >>>>>>>>>> perhaps > > > >>>>>>>>>>> we > > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the > > > >>>>>>>> partitioning > > > >>>>>>>>> so > > > >>>>>>>>>> we > > > >>>>>>>>>>>>> may > > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are > > > >>>>>> already > > > >>>>>>>>>>>>>> co-partitioned. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> -Jay > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > > > >>>>>>>>>>> winkelman.k...@gmail.com > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Yea thats a good way to look at it. > > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple > > > >> other > > > >>>>>>>> platforms > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>>>> spark and pig. > > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > > > >>>>>>>>>>>>> PairRDDFunctions.html > > > >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > > > >>>>>>>>>>>>> cogroup_operator.htm > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" < > > > >>>>> damian....@gmail.com> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi Kyle, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way > > > >> outer > > > >>>>>> join? > > > >>>>>>>> So > > > >>>>>>>>> an > > > >>>>>>>>>>>>> input > > > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated > > > >>> value > > > >>>>> - > > > >>>>>> is > > > >>>>>>>>> that > > > >>>>>>>>>>>>>>> correct? > > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the > > > >> current > > > >>>>>> value, > > > >>>>>>>>>>>>> aggregates > > > >>>>>>>>>>>>>>>> and forwards the result. > > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more, > > > >>>>> but it > > > >>>>>>>>> seems > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>> could be a useful optimization. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > > > >>>>>>>>>> winkelman.k...@gmail.com > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I sure can. I have added the following description > > > >> to > > > >>> my > > > >>>>>>>> KIP. If > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more > > > >>> time > > > >>>>> to > > > >>>>>>>>> build a > > > >>>>>>>>>>>>>>>> diagram > > > >>>>>>>>>>>>>>>>> and make more of a step by step description: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Example with Current API: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 = > > > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate( > > > >>>>>> initializer1 > > > >>>>>>>> , > > > >>>>>>>>>>>>>>>> aggregator1, > > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1); > > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 = > > > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate( > > > >>>>>> initializer2 > > > >>>>>>>> , > > > >>>>>>>>>>>>>>>> aggregator2, > > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2); > > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 = > > > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate( > > > >>>>>> initializer3 > > > >>>>>>>> , > > > >>>>>>>>>>>>>>>> aggregator3, > > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3); > > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3, > > > >>>>> joinerOneTwoAndThree); > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires > > > >> 3 > > > >>>>>>>>>> initializers, > > > >>>>>>>>>>>>>>> and 3 > > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user > > > >> to > > > >>>>>> define > > > >>>>>>>>> what > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3). > > > >> They > > > >>>>> are > > > >>>>>>>> left > > > >>>>>>>>>>> with a > > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the > > > >>>>> same > > > >>>>>> as > > > >>>>>>>> CG > > > >>>>>>>>>> and > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> two > > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them > > > >>>>>>>> intermediate > > > >>>>>>>>>>> states > > > >>>>>>>>>>>>>>>> such > > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the > > > >> joiners > > > >>>>> use > > > >>>>>>>> those > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> build > > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the > > > >>> user > > > >>>>>>>> could > > > >>>>>>>>>> avoid > > > >>>>>>>>>>>>>>>>> thinking about with this KIP. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > > > >> will > > > >>>>> first > > > >>>>>>>> go > > > >>>>>>>>>>> through > > > >>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from > > > >>>>>>>> storeName1. > > > >>>>>>>>>> It > > > >>>>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate > > > >>> value > > > >>>>>> and > > > >>>>>>>> get > > > >>>>>>>>>>> sent > > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look > > > >> up > > > >>>>> the > > > >>>>>>>>> current > > > >>>>>>>>>>>>> value > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner > > > >> to > > > >>>>>>>> calculate > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>>> second > > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an > > > >>> additional > > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the > > > >>> current > > > >>>>>>>> value of > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> key > > > >>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the > > > >>> final > > > >>>>>>>>> aggregate > > > >>>>>>>>>>>>>>> value. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming > > > >>>>> topics > > > >>>>>>>> you > > > >>>>>>>>>> will > > > >>>>>>>>>>>>> see > > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all > > > >>> three > > > >>>>>>>> stores > > > >>>>>>>>>> are > > > >>>>>>>>>>>>>>>> queried > > > >>>>>>>>>>>>>>>>> and all of the joiners must get used. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N > > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, > > > >> and > > > >>>>> N-1 > > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Example with Proposed API: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 = > > > >>>>> builder.stream("topic1"). > > > >>>>>>>>>>>>>>> groupByKey(); > > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 = > > > >>>>> builder.stream("topic2"). > > > >>>>>>>>>>>>>>> groupByKey(); > > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 = > > > >>>>> builder.stream("topic3"). > > > >>>>>>>>>>>>>>> groupByKey(); > > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = > > > >>> grouped1.cogroup(initializer1, > > > >>>>>>>>>>> aggregator1, > > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1) > > > >>>>>>>>>>>>>>>>> .cogroup(grouped2, aggregator2) > > > >>>>>>>>>>>>>>>>> .cogroup(grouped3, aggregator3) > > > >>>>>>>>>>>>>>>>> .aggregate(); > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1 > > > >>>>>>>>> initializer, > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>> 1 > > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about > > > >>> the > > > >>>>>>>>>>> intermediate > > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about > > > >>> is > > > >>>>> how > > > >>>>>>>> each > > > >>>>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > > > >> will > > > >>>>> first > > > >>>>>>>> go > > > >>>>>>>>>>> through > > > >>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from > > > >>>>>>>> storeName1. > > > >>>>>>>>>> It > > > >>>>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the > > > >>>>> store > > > >>>>>>>> and > > > >>>>>>>>>> pass > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> new > > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the > > > >>>>>>>> KStreamCogroup > > > >>>>>>>>>>> which > > > >>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you > > > >> are > > > >>>>> done. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api > > > >> will > > > >>>>> only > > > >>>>>>>> every > > > >>>>>>>>>>>>>>> create N > > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > > > >>>>>>>>>>>>> matth...@confluent.io > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Kyle, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, > > > >>>>> but I > > > >>>>>>>> could > > > >>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more > > > >>> concrete > > > >>>>>>>> example, > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>>>> 3 > > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result), > > > >>> and > > > >>>>>> show > > > >>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> difference between current way to to implement it > > > >> and > > > >>>>> the > > > >>>>>>>>>> proposed > > > >>>>>>>>>>>>>>> API? > > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to > > > >> see > > > >>>>> what > > > >>>>>>>> store > > > >>>>>>>>>>> calls > > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose, > > > >> and > > > >>> it > > > >>>>>>>> would > > > >>>>>>>>>> help > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>> understand it better. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Thanks a lot! > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> -Matthias > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my > > > >> classes > > > >>>>> and > > > >>>>>> get > > > >>>>>>>>>> around > > > >>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api > > > >> additions. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the > > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate > > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream# > > > >>>>>>>>> repartitionIfRequired > > > >>>>>>>>>>>>>>>> method. > > > >>>>>>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than > > > >> one > > > >>>>>>>> grouped > > > >>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some > > > >> name > > > >>>>> that > > > >>>>>>>>> someone > > > >>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it > > > >> to > > > >>>>> fall > > > >>>>>>>> back > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name? > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables? > > > >>> This > > > >>>>>>>> would > > > >>>>>>>>> be > > > >>>>>>>>>>>>>>>> pretty > > > >>>>>>>>>>>>>>>>>> easy > > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session > > > >>> stores > > > >>>>>> and > > > >>>>>>>>>>> windowed > > > >>>>>>>>>>>>>>>>>> stores > > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and > > > >>>>>>>>>>>>>>> KTableWindowAggregate > > > >>>>>>>>>>>>>>>>>>> implementations. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>> Kyle > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > > > >>>>>>>>> eno.there...@gmail.com> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Eno > > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > > > >>>>>>>>> damian....@gmail.com> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle, > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't > > > >> had > > > >>>>> the > > > >>>>>>>>> chance > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>> look > > > >>>>>>>>>>>>>>>>>> at > > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look > > > >>>>> into > > > >>>>>> it > > > >>>>>>>>>>>>>>> tomorrow. > > > >>>>>>>>>>>>>>>>> For > > > >>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka > > > >>>>> trunk > > > >>>>>>>> and > > > >>>>>>>>>> mark > > > >>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>> WIP? > > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>> Damian > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw > > > >> some > > > >>>>>>>> attention > > > >>>>>>>>> to > > > >>>>>>>>>>> my > > > >>>>>>>>>>>>>>>> KIP > > > >>>>>>>>>>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days. > > > >> This > > > >>>>> is my > > > >>>>>>>>> first > > > >>>>>>>>>>> KIP > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>> my > > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm > > > >>>>> sure I > > > >>>>>>>> did > > > >>>>>>>>>>>>>>>> something > > > >>>>>>>>>>>>>>>>>>>> wrong. > > > >>>>>>>>>>>>>>>>>>>>>> ;) > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Hello all, > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate > > > >> discussion > > > >>>>> about > > > >>>>>>>>> adding > > > >>>>>>>>>>>>>>>>> cogroup > > > >>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here: > > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > > > >>>>>> confluence/display/KAFKA/KIP- > > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here: > > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> -- Guozhang > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> -- > > > >>>>>> -- Guozhang > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> -- > > > >>>>> -- Guozhang > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang