This suggestion lgtm. I would vote for the first alternative than adding it
to the `KStreamBuilder` though.

On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> wrote:

> I have a minor suggestion to make the API a little bit more symmetric.
> I feel it would make more sense to move the initializer and serde to the
> final aggregate statement, since the serde only applies to the state store,
> and the initializer doesn't bear any relation to the first group in
> particular. It would end up looking like this:
>
> KTable<K, CG> cogrouped =
>     grouped1.cogroup(aggregator1)
>             .cogroup(grouped2, aggregator2)
>             .cogroup(grouped3, aggregator3)
>             .aggregate(initializer1, aggValueSerde, storeName1);
>
> Alternatively, we could move the the first cogroup() method to
> KStreamBuilder, similar to how we have .merge()
> and end up with an api that would be even more symmetric.
>
> KStreamBuilder.cogroup(grouped1, aggregator1)
>               .cogroup(grouped2, aggregator2)
>               .cogroup(grouped3, aggregator3)
>               .aggregate(initializer1, aggValueSerde, storeName1);
>
> This doesn't have to be a blocker, but I thought it would make the API just
> a tad cleaner.
>
> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Kyle,
> >
> > Thanks a lot for the updated KIP. It looks good to me.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote:
> >
> > > This makes much more sense to me. +1
> > >
> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
> winkelman.k...@gmail.com>
> > > wrote:
> > > >
> > > > I have updated the KIP and my PR. Let me know what you think.
> > > > To created a cogrouped stream just call cogroup on a KgroupedStream
> and
> > > > supply the initializer, aggValueSerde, and an aggregator. Then
> continue
> > > > adding kgroupedstreams and aggregators. Then call one of the many
> > > aggregate
> > > > calls to create a KTable.
> > > >
> > > > Thanks,
> > > > Kyle
> > > >
> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote:
> > > >
> > > >> Hi Kyle,
> > > >>
> > > >> Thanks for the update. I think just one initializer makes sense as
> it
> > > >> should only be called once per key and generally it is just going to
> > > create
> > > >> a new instance of whatever the Aggregate class is.
> > > >>
> > > >> Cheers,
> > > >> Damian
> > > >>
> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
> winkelman.k...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hello all,
> > > >>>
> > > >>> I have spent some more time on this and the best alternative I have
> > > come
> > > >> up
> > > >>> with is:
> > > >>> KGroupedStream has a single cogroup call that takes an initializer
> > and
> > > an
> > > >>> aggregator.
> > > >>> CogroupedKStream has a cogroup call that takes additional
> > groupedStream
> > > >>> aggregator pairs.
> > > >>> CogroupedKStream has multiple aggregate methods that create the
> > > different
> > > >>> stores.
> > > >>>
> > > >>> I plan on updating the kip but I want people's input on if we
> should
> > > have
> > > >>> the initializer be passed in once at the beginning or if we should
> > > >> instead
> > > >>> have the initializer be required for each call to one of the
> > aggregate
> > > >>> calls. The first makes more sense to me but doesnt allow the user
> to
> > > >>> specify different initializers for different tables.
> > > >>>
> > > >>> Thanks,
> > > >>> Kyle
> > > >>>
> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
> winkelman.k...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Yea I really like that idea I'll see what I can do to update the
> kip
> > > >> and
> > > >>>> my pr when I have some time. I'm not sure how well creating the
> > > >>>> kstreamaggregates will go though because at that point I will have
> > > >> thrown
> > > >>>> away the type of the values. It will be type safe I just may need
> to
> > > >> do a
> > > >>>> little forcing.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Kyle
> > > >>>>
> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>>> Kyle,
> > > >>>>>
> > > >>>>> Thanks for the explanations, my previous read on the wiki
> examples
> > > was
> > > >>>>> wrong.
> > > >>>>>
> > > >>>>> So I guess my motivation should be "reduced" to: can we move the
> > > >> window
> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to
> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> > > >>>>>
> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from 3
> > to
> > > >> 2.
> > > >>>>> 2. major: this is for extensibility of the APIs, and since we are
> > > >>> removing
> > > >>>>> the "Evolving" annotations on Streams it may be harder to change
> it
> > > >>> again
> > > >>>>> in the future. The extended use cases are that people wanted to
> > have
> > > >>>>> windowed running aggregates on different granularities, e.g.
> "give
> > me
> > > >>> the
> > > >>>>> counts per-minute, per-hour, per-day and per-week", and today in
> > DSL
> > > >> we
> > > >>>>> need to specify that case in multiple aggregate operators, which
> > gets
> > > >> a
> > > >>>>> state store / changelog, etc. And it is possible to optimize it
> as
> > > >> well
> > > >>> to
> > > >>>>> a single state store. Its implementation would be tricky as you
> > need
> > > >> to
> > > >>>>> contain different lengthed windows within your window store but
> > just
> > > >>> from
> > > >>>>> the public API point of view, it could be specified as:
> > > >>>>>
> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> > > >>>>> "state-store-name");
> > > >>>>>
> > > >>>>> table1 = stream.aggregate(/*per-minute window*/)
> > > >>>>> table2 = stream.aggregate(/*per-hour window*/)
> > > >>>>> table3 = stream.aggregate(/*per-day window*/)
> > > >>>>>
> > > >>>>> while underlying we are only using a single store
> > "state-store-name"
> > > >> for
> > > >>>>> it.
> > > >>>>>
> > > >>>>>
> > > >>>>> Although this feature is out of the scope of this KIP, I'd like
> to
> > > >>> discuss
> > > >>>>> if we can "leave the door open" to make such changes without
> > > modifying
> > > >>> the
> > > >>>>> public APIs .
> > > >>>>>
> > > >>>>> Guozhang
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> > > >>> winkelman.k...@gmail.com
> > > >>>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> I allow defining a single window/sessionwindow one time when you
> > > >> make
> > > >>>>> the
> > > >>>>>> cogroup call from a KGroupedStream. From then on you are using
> the
> > > >>>>> cogroup
> > > >>>>>> call from with in CogroupedKStream which doesnt accept any
> > > >> additional
> > > >>>>>> windows/sessionwindows.
> > > >>>>>>
> > > >>>>>> Is this what you meant by your question or did I misunderstand?
> > > >>>>>>
> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com>
> > > >> wrote:
> > > >>>>>>
> > > >>>>>> Another question that came to me is on "window alignment": from
> > the
> > > >>> KIP
> > > >>>>> it
> > > >>>>>> seems you are allowing users to specify a (potentially
> different)
> > > >>> window
> > > >>>>>> spec in each co-grouped input stream. So if these window specs
> are
> > > >>>>>> different how should we "align" them with different input
> > streams? I
> > > >>>>> think
> > > >>>>>> it is more natural to only specify on window spec in the
> > > >>>>>>
> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> And remove it from the cogroup() functions. WDYT?
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Guozhang
> > > >>>>>>
> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use case
> to
> > > >>>>> support
> > > >>>>>>> such multi-way table join (i.e. N source tables with N
> aggregate
> > > >>> func)
> > > >>>>>> with
> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people using
> > > >> the
> > > >>>>>>> low-level PAPI to achieve this goal.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> > > >>>>>> winkelman.k...@gmail.com
> > > >>>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> I like your point about not handling other cases such as count
> > > >> and
> > > >>>>>> reduce.
> > > >>>>>>>>
> > > >>>>>>>> I think that reduce may not make sense because reduce assumes
> > > >> that
> > > >>>>> the
> > > >>>>>>>> input values are the same as the output values. With cogroup
> > > >> there
> > > >>>>> may
> > > >>>>>> be
> > > >>>>>>>> multiple different input types and then your output type cant
> be
> > > >>>>>> multiple
> > > >>>>>>>> different things. In the case where you have all matching
> value
> > > >>> types
> > > >>>>>> you
> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce.
> > > >>>>>>>>
> > > >>>>>>>> As for count I think it is possible to call count on all the
> > > >>>>> individual
> > > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe
> make
> > > >> a
> > > >>>>>> special
> > > >>>>>>>> call in groupedstream for this case. Because in this case we
> > dont
> > > >>>>> need
> > > >>>>>> to
> > > >>>>>>>> do type checking on the values. It could be similar to the
> > > >> current
> > > >>>>> count
> > > >>>>>>>> methods but accept a var args of additonal grouped streams as
> > > >> well
> > > >>>>> and
> > > >>>>>>>> make
> > > >>>>>>>> sure they have a key type of K.
> > > >>>>>>>>
> > > >>>>>>>> The way I have put the kip together is to ensure that we do
> type
> > > >>>>>> checking.
> > > >>>>>>>> I don't see a way we could group them all first and then make
> a
> > > >>> call
> > > >>>>> to
> > > >>>>>>>> count, reduce, or aggregate because with aggregate they would
> > > >> need
> > > >>> to
> > > >>>>>> pass
> > > >>>>>>>> a list of aggregators and we would have no way of type
> checking
> > > >>> that
> > > >>>>>> they
> > > >>>>>>>> match the grouped streams.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Kyle
> > > >>>>>>>>
> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <
> xav...@confluent.io>
> > > >>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very
> > > >>> useful
> > > >>>>>>>>> addition and wanted to provide an additional use-case and
> some
> > > >>> more
> > > >>>>>>>>> comments.
> > > >>>>>>>>>
> > > >>>>>>>>> This is actually a very common analytics use-case in the
> > > >> ad-tech
> > > >>>>>>>> industry.
> > > >>>>>>>>> The typical setup will have an auction stream, an impression
> > > >>>>> stream,
> > > >>>>>>>> and a
> > > >>>>>>>>> click stream. Those three streams need to be combined to
> > > >> compute
> > > >>>>>>>> aggregate
> > > >>>>>>>>> statistics (e.g. impression statistics, and click-through
> > > >> rates),
> > > >>>>>> since
> > > >>>>>>>>> most of the attributes of interest are only present the
> auction
> > > >>>>>> stream.
> > > >>>>>>>>>
> > > >>>>>>>>> A simple way to do this is to co-group all the streams by the
> > > >>>>> auction
> > > >>>>>>>> key,
> > > >>>>>>>>> and process updates to the co-group as events for each stream
> > > >>> come
> > > >>>>> in,
> > > >>>>>>>>> keeping only one value from each stream before sending
> > > >> downstream
> > > >>>>> for
> > > >>>>>>>>> further processing / aggregation.
> > > >>>>>>>>>
> > > >>>>>>>>> One could view the result of that co-group operation as a
> > > >>> "KTable"
> > > >>>>>> with
> > > >>>>>>>>> multiple values per key. The key being the grouping key, and
> > > >> the
> > > >>>>>> values
> > > >>>>>>>>> consisting of one value per stream.
> > > >>>>>>>>>
> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant
> > > >>>>> co-grouping
> > > >>>>>> of
> > > >>>>>>>>> multiple streams without having to worry about the number of
> > > >>>>> streams,
> > > >>>>>>>> and
> > > >>>>>>>>> avoids dealing with Tuple types or other generic interfaces
> > > >> that
> > > >>>>> could
> > > >>>>>>>> get
> > > >>>>>>>>> messy if we wanted to preserve all the value types in the
> > > >>> resulting
> > > >>>>>>>>> co-grouped stream.
> > > >>>>>>>>>
> > > >>>>>>>>> My only concern is that we only allow the cogroup + aggregate
> > > >>>>> combined
> > > >>>>>>>>> operation. This forces the user to build their own tuple
> > > >>>>> serialization
> > > >>>>>>>>> format if they want to preserve the individual input stream
> > > >>> values
> > > >>>>> as
> > > >>>>>> a
> > > >>>>>>>>> group. It also deviates quite a bit from our approach in
> > > >>>>>> KGroupedStream
> > > >>>>>>>>> which offers other operations, such as count and reduce,
> which
> > > >>>>> should
> > > >>>>>>>> also
> > > >>>>>>>>> be applicable to a co-grouped stream.
> > > >>>>>>>>>
> > > >>>>>>>>> Overall I still think this is a really useful addition, but I
> > > >>> feel
> > > >>>>> we
> > > >>>>>>>>> haven't spend much time trying to explore alternative DSLs
> that
> > > >>>>> could
> > > >>>>>>>> maybe
> > > >>>>>>>>> generalize better or match our existing syntax more closely.
> > > >>>>>>>>>
> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
> > > >>>>>> winkelman.k...@gmail.com
> > > >>>>>>>>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka
> > > >>> streams
> > > >>>>>> realm
> > > >>>>>>>>> that
> > > >>>>>>>>>> I should reach out to for input?
> > > >>>>>>>>>>
> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this
> more
> > > >>> in
> > > >>>>>> depth
> > > >>>>>>>>> so I
> > > >>>>>>>>>> will wait for his inputs before continuing.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
> > > >> eno.there...@gmail.com
> > > >>>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Thanks Kyle, good arguments.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Eno
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> > > >>>>>>>> winkelman.k...@gmail.com
> > > >>>>>>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to what
> > > >>>>> Jay’s
> > > >>>>>>>>> example
> > > >>>>>>>>>>> is,
> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
> > > >>>>>> concrete?*
> > > >>>>>>>>>>>> I have added a more concrete example to the KIP.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this
> > > >> optimization
> > > >>>>> to
> > > >>>>>> the
> > > >>>>>>>>> DSL.
> > > >>>>>>>>>>> In
> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
> > > >>> and
> > > >>>>> do
> > > >>>>>>>> the
> > > >>>>>>>>>> right
> > > >>>>>>>>>>>> thing under the covers (create just one state store,
> > > >>> arrange
> > > >>>>> the
> > > >>>>>>>>> nodes
> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
> > > >>>>> pieces
> > > >>>>>>>>> (group,
> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
> > > >>>>> like to
> > > >>>>>>>> hear
> > > >>>>>>>>>>> your
> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization
> > > >>>>> with
> > > >>>>>> the
> > > >>>>>>>>>>> current
> > > >>>>>>>>>>>> DSL, at the topology builder level.*
> > > >>>>>>>>>>>> You would have to make a lot of checks to understand if
> > > >> it
> > > >>> is
> > > >>>>>> even
> > > >>>>>>>>>>> possible
> > > >>>>>>>>>>>> to make this optimization:
> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything
> > > >>>>> else.
> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be
> > > >>>>>>>> impossible
> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the
> > > >>> topology
> > > >>>>> has
> > > >>>>>>>>>> already
> > > >>>>>>>>>>>> been built.)
> > > >>>>>>>>>>>> You would then need to make decisions during the
> > > >>>>> optimization:
> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all the
> > > >>>>>> individual
> > > >>>>>>>>>>>> initializers and the valueJoiners.
> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would
> > > >>> turn
> > > >>>>> the
> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
> > > >> would
> > > >>>>> work
> > > >>>>>> on
> > > >>>>>>>>> the
> > > >>>>>>>>>>>> final object, but this may be possible.
> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared
> > > >> would
> > > >>>>> be
> > > >>>>>> for
> > > >>>>>>>>> the
> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be
> > > >>>>> guaranteed
> > > >>>>>> to
> > > >>>>>>>>> hold
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>> final object. This would mean you must created a new
> > > >> state
> > > >>>>> store
> > > >>>>>>>> and
> > > >>>>>>>>>> not
> > > >>>>>>>>>>>> created any of the declared ones.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> The main argument I have against it is even if it could
> > > >> be
> > > >>>>> done
> > > >>>>>> I
> > > >>>>>>>>> don't
> > > >>>>>>>>>>>> know that we would want to have this be an optimization
> > > >> in
> > > >>>>> the
> > > >>>>>>>>>> background
> > > >>>>>>>>>>>> because the user would still be required to think about
> > > >> all
> > > >>>>> of
> > > >>>>>> the
> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry
> > > >> about
> > > >>>>> if
> > > >>>>>>>> they
> > > >>>>>>>>>> only
> > > >>>>>>>>>>>> care about the final object.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it
> > > >>> should
> > > >>>>> be
> > > >>>>>>>> part
> > > >>>>>>>>> of
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we
> > > >> want
> > > >>> to
> > > >>>>>>>> allow
> > > >>>>>>>>>>> people
> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right
> > > >>>>> now I
> > > >>>>>>>> don't
> > > >>>>>>>>>>> think
> > > >>>>>>>>>>>> we give them ways of handling this use case easily.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> *-I think there will be scope for several such
> > > >>> optimizations
> > > >>>>> in
> > > >>>>>>>> the
> > > >>>>>>>>>>> future
> > > >>>>>>>>>>>> and perhaps at some point we need to think about
> > > >> decoupling
> > > >>>>> the
> > > >>>>>>>> 1:1
> > > >>>>>>>>>>> mapping
> > > >>>>>>>>>>>> from the DSL into the physical topology.*
> > > >>>>>>>>>>>> I would argue that cogroup is not just an optimization it
> > > >>> is
> > > >>>>> a
> > > >>>>>> new
> > > >>>>>>>>> way
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>> the users to look at accomplishing a problem that
> > > >> requires
> > > >>>>>>>> multiple
> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't
> > > >> think
> > > >>>>>> users
> > > >>>>>>>>>> should
> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with
> > > >>> their
> > > >>>>>>>>>>> initializers,
> > > >>>>>>>>>>>> serdes and stores if all they care about is the final
> > > >>> object.
> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
> > > >> supply
> > > >>>>>>>> additional
> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a
> > > >> single
> > > >>>>>>>> grouped
> > > >>>>>>>>>>> stream
> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an
> > > >>>>>> optimization
> > > >>>>>>>>> as,
> > > >>>>>>>>>> we
> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was
> > > >>> just
> > > >>>>> a
> > > >>>>>>>> call
> > > >>>>>>>>> to
> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling
> > > >>>>>>>> KGroupedStream#cogroup.
> > > >>>>>>>>> (I
> > > >>>>>>>>>>>> would prefer to just write a warning saying that this is
> > > >>> not
> > > >>>>> how
> > > >>>>>>>>>> cogroup
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>> to be used.)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>> Kyle
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> > > >>>>>>>> eno.there...@gmail.com
> > > >>>>>>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Kyle,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to what
> > > >>>>> Jay’s
> > > >>>>>>>>> example
> > > >>>>>>>>>>> is,
> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
> > > >>>>>> concrete?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this
> > > >> optimization
> > > >>>>> to
> > > >>>>>> the
> > > >>>>>>>>> DSL.
> > > >>>>>>>>>>> In
> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
> > > >>>>> and do
> > > >>>>>>>> the
> > > >>>>>>>>>>> right
> > > >>>>>>>>>>>>> thing under the covers (create just one state store,
> > > >>> arrange
> > > >>>>>> the
> > > >>>>>>>>> nodes
> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
> > > >>>>> pieces
> > > >>>>>>>>> (group,
> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
> > > >>>>> like
> > > >>>>>> to
> > > >>>>>>>>> hear
> > > >>>>>>>>>>> your
> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this
> > > >> optimization
> > > >>>>> with
> > > >>>>>>>> the
> > > >>>>>>>>>>> current
> > > >>>>>>>>>>>>> DSL, at the topology builder level.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I think there will be scope for several such
> > > >> optimizations
> > > >>>>> in
> > > >>>>>> the
> > > >>>>>>>>>> future
> > > >>>>>>>>>>>>> and perhaps at some point we need to think about
> > > >>> decoupling
> > > >>>>> the
> > > >>>>>>>> 1:1
> > > >>>>>>>>>>> mapping
> > > >>>>>>>>>>>>> from the DSL into the physical topology.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks
> > > >>>>>>>>>>>>> Eno
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
> > > >> j...@confluent.io>
> > > >>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is
> > > >>> pretty
> > > >>>>>>>> common.
> > > >>>>>>>>> An
> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified
> > > >> customer
> > > >>>>>>>> profile"
> > > >>>>>>>>> use
> > > >>>>>>>>>>>>> case
> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems
> > > >>>>> each
> > > >>>>>> of
> > > >>>>>>>>> which
> > > >>>>>>>>>>> has
> > > >>>>>>>>>>>>>> some information about your customer (account details,
> > > >>>>>> settings,
> > > >>>>>>>>>>> billing
> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history,
> > > >> etc).
> > > >>>>> Your
> > > >>>>>>>> goal
> > > >>>>>>>>> is
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>> join/munge these into a single profile record for each
> > > >>>>>> customer
> > > >>>>>>>>> that
> > > >>>>>>>>>>> has
> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is
> > > >> up-to-date
> > > >>>>> with
> > > >>>>>>>> all
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams as
> > > >> a
> > > >>>>>>>> sequence
> > > >>>>>>>>> of
> > > >>>>>>>>>>>>> joins
> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate
> > > >>>>> tables.
> > > >>>>>>>> But
> > > >>>>>>>>>>> clearly
> > > >>>>>>>>>>>>>> you only need a single stage to group all these things
> > > >>> that
> > > >>>>>> are
> > > >>>>>>>>>> already
> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this
> > > >>> under
> > > >>>>> the
> > > >>>>>>>>> covers
> > > >>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right
> > > >>> thing)
> > > >>>>> and
> > > >>>>>>>>>> perhaps
> > > >>>>>>>>>>> we
> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the
> > > >>>>>>>> partitioning
> > > >>>>>>>>> so
> > > >>>>>>>>>> we
> > > >>>>>>>>>>>>> may
> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are
> > > >>>>>> already
> > > >>>>>>>>>>>>>> co-partitioned.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> -Jay
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> > > >>>>>>>>>>> winkelman.k...@gmail.com
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it.
> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple
> > > >> other
> > > >>>>>>>> platforms
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>>>> spark and pig.
> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> > > >>>>>>>>>>>>> PairRDDFunctions.html
> > > >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> > > >>>>>>>>>>>>> cogroup_operator.htm
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
> > > >>>>> damian....@gmail.com>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Kyle,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
> > > >> outer
> > > >>>>>> join?
> > > >>>>>>>> So
> > > >>>>>>>>> an
> > > >>>>>>>>>>>>> input
> > > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated
> > > >>> value
> > > >>>>> -
> > > >>>>>> is
> > > >>>>>>>>> that
> > > >>>>>>>>>>>>>>> correct?
> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
> > > >> current
> > > >>>>>> value,
> > > >>>>>>>>>>>>> aggregates
> > > >>>>>>>>>>>>>>>> and forwards the result.
> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more,
> > > >>>>> but it
> > > >>>>>>>>> seems
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> could be a useful optimization.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> > > >>>>>>>>>> winkelman.k...@gmail.com
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following description
> > > >> to
> > > >>> my
> > > >>>>>>>> KIP. If
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more
> > > >>> time
> > > >>>>> to
> > > >>>>>>>>> build a
> > > >>>>>>>>>>>>>>>> diagram
> > > >>>>>>>>>>>>>>>>> and make more of a step by step description:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Example with Current API:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(
> > > >>>>>> initializer1
> > > >>>>>>>> ,
> > > >>>>>>>>>>>>>>>> aggregator1,
> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(
> > > >>>>>> initializer2
> > > >>>>>>>> ,
> > > >>>>>>>>>>>>>>>> aggregator2,
> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(
> > > >>>>>> initializer3
> > > >>>>>>>> ,
> > > >>>>>>>>>>>>>>>> aggregator3,
> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
> > > >>>>> joinerOneTwoAndThree);
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires
> > > >> 3
> > > >>>>>>>>>> initializers,
> > > >>>>>>>>>>>>>>> and 3
> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user
> > > >> to
> > > >>>>>> define
> > > >>>>>>>>> what
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3).
> > > >> They
> > > >>>>> are
> > > >>>>>>>> left
> > > >>>>>>>>>>> with a
> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the
> > > >>>>> same
> > > >>>>>> as
> > > >>>>>>>> CG
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> two
> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them
> > > >>>>>>>> intermediate
> > > >>>>>>>>>>> states
> > > >>>>>>>>>>>>>>>> such
> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
> > > >> joiners
> > > >>>>> use
> > > >>>>>>>> those
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>> build
> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the
> > > >>> user
> > > >>>>>>>> could
> > > >>>>>>>>>> avoid
> > > >>>>>>>>>>>>>>>>> thinking about with this KIP.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> > > >> will
> > > >>>>> first
> > > >>>>>>>> go
> > > >>>>>>>>>>> through
> > > >>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from
> > > >>>>>>>> storeName1.
> > > >>>>>>>>>> It
> > > >>>>>>>>>>>>>>> will
> > > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate
> > > >>> value
> > > >>>>>> and
> > > >>>>>>>> get
> > > >>>>>>>>>>> sent
> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look
> > > >> up
> > > >>>>> the
> > > >>>>>>>>> current
> > > >>>>>>>>>>>>> value
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner
> > > >> to
> > > >>>>>>>> calculate
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> second
> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an
> > > >>> additional
> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
> > > >>> current
> > > >>>>>>>> value of
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>> key
> > > >>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the
> > > >>> final
> > > >>>>>>>>> aggregate
> > > >>>>>>>>>>>>>>> value.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming
> > > >>>>> topics
> > > >>>>>>>> you
> > > >>>>>>>>>> will
> > > >>>>>>>>>>>>> see
> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all
> > > >>> three
> > > >>>>>>>> stores
> > > >>>>>>>>>> are
> > > >>>>>>>>>>>>>>>> queried
> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N
> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins,
> > > >> and
> > > >>>>> N-1
> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Example with Proposed API:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
> > > >>>>> builder.stream("topic1").
> > > >>>>>>>>>>>>>>> groupByKey();
> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
> > > >>>>> builder.stream("topic2").
> > > >>>>>>>>>>>>>>> groupByKey();
> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
> > > >>>>> builder.stream("topic3").
> > > >>>>>>>>>>>>>>> groupByKey();
> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
> > > >>> grouped1.cogroup(initializer1,
> > > >>>>>>>>>>> aggregator1,
> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped2, aggregator2)
> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped3, aggregator3)
> > > >>>>>>>>>>>>>>>>>      .aggregate();
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1
> > > >>>>>>>>> initializer,
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>> 1
> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about
> > > >>> the
> > > >>>>>>>>>>> intermediate
> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about
> > > >>> is
> > > >>>>> how
> > > >>>>>>>> each
> > > >>>>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> > > >> will
> > > >>>>> first
> > > >>>>>>>> go
> > > >>>>>>>>>>> through
> > > >>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from
> > > >>>>>>>> storeName1.
> > > >>>>>>>>>> It
> > > >>>>>>>>>>>>>>> will
> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the
> > > >>>>> store
> > > >>>>>>>> and
> > > >>>>>>>>>> pass
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the
> > > >>>>>>>> KStreamCogroup
> > > >>>>>>>>>>> which
> > > >>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you
> > > >> are
> > > >>>>> done.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
> > > >> will
> > > >>>>> only
> > > >>>>>>>> every
> > > >>>>>>>>>>>>>>> create N
> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> > > >>>>>>>>>>>>> matth...@confluent.io
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Kyle,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
> > > >>>>> but I
> > > >>>>>>>> could
> > > >>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
> > > >>> concrete
> > > >>>>>>>> example,
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>>>> 3
> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result),
> > > >>> and
> > > >>>>>> show
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement it
> > > >> and
> > > >>>>> the
> > > >>>>>>>>>> proposed
> > > >>>>>>>>>>>>>>> API?
> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to
> > > >> see
> > > >>>>> what
> > > >>>>>>>> store
> > > >>>>>>>>>>> calls
> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose,
> > > >> and
> > > >>> it
> > > >>>>>>>> would
> > > >>>>>>>>>> help
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>> understand it better.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks a lot!
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
> > > >> classes
> > > >>>>> and
> > > >>>>>> get
> > > >>>>>>>>>> around
> > > >>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api
> > > >> additions.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
> > > >>>>>>>>> repartitionIfRequired
> > > >>>>>>>>>>>>>>>> method.
> > > >>>>>>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than
> > > >> one
> > > >>>>>>>> grouped
> > > >>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
> > > >> name
> > > >>>>> that
> > > >>>>>>>>> someone
> > > >>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it
> > > >> to
> > > >>>>> fall
> > > >>>>>>>> back
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name?
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables?
> > > >>> This
> > > >>>>>>>> would
> > > >>>>>>>>> be
> > > >>>>>>>>>>>>>>>> pretty
> > > >>>>>>>>>>>>>>>>>> easy
> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session
> > > >>> stores
> > > >>>>>> and
> > > >>>>>>>>>>> windowed
> > > >>>>>>>>>>>>>>>>>> stores
> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and
> > > >>>>>>>>>>>>>>> KTableWindowAggregate
> > > >>>>>>>>>>>>>>>>>>> implementations.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>> Kyle
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> > > >>>>>>>>> eno.there...@gmail.com>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Eno
> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> > > >>>>>>>>> damian....@gmail.com>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't
> > > >> had
> > > >>>>> the
> > > >>>>>>>>> chance
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> look
> > > >>>>>>>>>>>>>>>>>> at
> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look
> > > >>>>> into
> > > >>>>>> it
> > > >>>>>>>>>>>>>>> tomorrow.
> > > >>>>>>>>>>>>>>>>> For
> > > >>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka
> > > >>>>> trunk
> > > >>>>>>>> and
> > > >>>>>>>>>> mark
> > > >>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>> WIP?
> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>> Damian
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
> > > >> some
> > > >>>>>>>> attention
> > > >>>>>>>>> to
> > > >>>>>>>>>>> my
> > > >>>>>>>>>>>>>>>> KIP
> > > >>>>>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
> > > >> This
> > > >>>>> is my
> > > >>>>>>>>> first
> > > >>>>>>>>>>> KIP
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>> my
> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm
> > > >>>>> sure I
> > > >>>>>>>> did
> > > >>>>>>>>>>>>>>>> something
> > > >>>>>>>>>>>>>>>>>>>> wrong.
> > > >>>>>>>>>>>>>>>>>>>>>> ;)
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all,
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
> > > >> discussion
> > > >>>>> about
> > > >>>>>>>>> adding
> > > >>>>>>>>>>>>>>>>> cogroup
> > > >>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > >>>>>> confluence/display/KAFKA/KIP-
> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here:
> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> -- Guozhang
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> -- Guozhang
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>> -- Guozhang
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to