On a second thought... This is the current proposal API

```

<T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, final
Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
aggValueSerde)

```


If we do not have the initializer in the first co-group it might be a bit
awkward for users to specify the aggregator that returns a typed <T> value?
Maybe it is still better to put these two functions in the same api?



Guozhang

On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> This suggestion lgtm. I would vote for the first alternative than adding
> it to the `KStreamBuilder` though.
>
> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io>
> wrote:
>
>> I have a minor suggestion to make the API a little bit more symmetric.
>> I feel it would make more sense to move the initializer and serde to the
>> final aggregate statement, since the serde only applies to the state
>> store,
>> and the initializer doesn't bear any relation to the first group in
>> particular. It would end up looking like this:
>>
>> KTable<K, CG> cogrouped =
>>     grouped1.cogroup(aggregator1)
>>             .cogroup(grouped2, aggregator2)
>>             .cogroup(grouped3, aggregator3)
>>             .aggregate(initializer1, aggValueSerde, storeName1);
>>
>> Alternatively, we could move the the first cogroup() method to
>> KStreamBuilder, similar to how we have .merge()
>> and end up with an api that would be even more symmetric.
>>
>> KStreamBuilder.cogroup(grouped1, aggregator1)
>>               .cogroup(grouped2, aggregator2)
>>               .cogroup(grouped3, aggregator3)
>>               .aggregate(initializer1, aggValueSerde, storeName1);
>>
>> This doesn't have to be a blocker, but I thought it would make the API
>> just
>> a tad cleaner.
>>
>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> > Kyle,
>> >
>> > Thanks a lot for the updated KIP. It looks good to me.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote:
>> >
>> > > This makes much more sense to me. +1
>> > >
>> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
>> winkelman.k...@gmail.com>
>> > > wrote:
>> > > >
>> > > > I have updated the KIP and my PR. Let me know what you think.
>> > > > To created a cogrouped stream just call cogroup on a KgroupedStream
>> and
>> > > > supply the initializer, aggValueSerde, and an aggregator. Then
>> continue
>> > > > adding kgroupedstreams and aggregators. Then call one of the many
>> > > aggregate
>> > > > calls to create a KTable.
>> > > >
>> > > > Thanks,
>> > > > Kyle
>> > > >
>> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote:
>> > > >
>> > > >> Hi Kyle,
>> > > >>
>> > > >> Thanks for the update. I think just one initializer makes sense as
>> it
>> > > >> should only be called once per key and generally it is just going
>> to
>> > > create
>> > > >> a new instance of whatever the Aggregate class is.
>> > > >>
>> > > >> Cheers,
>> > > >> Damian
>> > > >>
>> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
>> winkelman.k...@gmail.com
>> > >
>> > > >> wrote:
>> > > >>
>> > > >>> Hello all,
>> > > >>>
>> > > >>> I have spent some more time on this and the best alternative I
>> have
>> > > come
>> > > >> up
>> > > >>> with is:
>> > > >>> KGroupedStream has a single cogroup call that takes an initializer
>> > and
>> > > an
>> > > >>> aggregator.
>> > > >>> CogroupedKStream has a cogroup call that takes additional
>> > groupedStream
>> > > >>> aggregator pairs.
>> > > >>> CogroupedKStream has multiple aggregate methods that create the
>> > > different
>> > > >>> stores.
>> > > >>>
>> > > >>> I plan on updating the kip but I want people's input on if we
>> should
>> > > have
>> > > >>> the initializer be passed in once at the beginning or if we should
>> > > >> instead
>> > > >>> have the initializer be required for each call to one of the
>> > aggregate
>> > > >>> calls. The first makes more sense to me but doesnt allow the user
>> to
>> > > >>> specify different initializers for different tables.
>> > > >>>
>> > > >>> Thanks,
>> > > >>> Kyle
>> > > >>>
>> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
>> winkelman.k...@gmail.com>
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Yea I really like that idea I'll see what I can do to update the
>> kip
>> > > >> and
>> > > >>>> my pr when I have some time. I'm not sure how well creating the
>> > > >>>> kstreamaggregates will go though because at that point I will
>> have
>> > > >> thrown
>> > > >>>> away the type of the values. It will be type safe I just may
>> need to
>> > > >> do a
>> > > >>>> little forcing.
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Kyle
>> > > >>>>
>> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com>
>> > wrote:
>> > > >>>>
>> > > >>>>> Kyle,
>> > > >>>>>
>> > > >>>>> Thanks for the explanations, my previous read on the wiki
>> examples
>> > > was
>> > > >>>>> wrong.
>> > > >>>>>
>> > > >>>>> So I guess my motivation should be "reduced" to: can we move the
>> > > >> window
>> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to
>> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
>> > > >>>>>
>> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from
>> 3
>> > to
>> > > >> 2.
>> > > >>>>> 2. major: this is for extensibility of the APIs, and since we
>> are
>> > > >>> removing
>> > > >>>>> the "Evolving" annotations on Streams it may be harder to
>> change it
>> > > >>> again
>> > > >>>>> in the future. The extended use cases are that people wanted to
>> > have
>> > > >>>>> windowed running aggregates on different granularities, e.g.
>> "give
>> > me
>> > > >>> the
>> > > >>>>> counts per-minute, per-hour, per-day and per-week", and today in
>> > DSL
>> > > >> we
>> > > >>>>> need to specify that case in multiple aggregate operators, which
>> > gets
>> > > >> a
>> > > >>>>> state store / changelog, etc. And it is possible to optimize it
>> as
>> > > >> well
>> > > >>> to
>> > > >>>>> a single state store. Its implementation would be tricky as you
>> > need
>> > > >> to
>> > > >>>>> contain different lengthed windows within your window store but
>> > just
>> > > >>> from
>> > > >>>>> the public API point of view, it could be specified as:
>> > > >>>>>
>> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
>> > > >>>>> "state-store-name");
>> > > >>>>>
>> > > >>>>> table1 = stream.aggregate(/*per-minute window*/)
>> > > >>>>> table2 = stream.aggregate(/*per-hour window*/)
>> > > >>>>> table3 = stream.aggregate(/*per-day window*/)
>> > > >>>>>
>> > > >>>>> while underlying we are only using a single store
>> > "state-store-name"
>> > > >> for
>> > > >>>>> it.
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> Although this feature is out of the scope of this KIP, I'd like
>> to
>> > > >>> discuss
>> > > >>>>> if we can "leave the door open" to make such changes without
>> > > modifying
>> > > >>> the
>> > > >>>>> public APIs .
>> > > >>>>>
>> > > >>>>> Guozhang
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
>> > > >>> winkelman.k...@gmail.com
>> > > >>>>>>
>> > > >>>>> wrote:
>> > > >>>>>
>> > > >>>>>> I allow defining a single window/sessionwindow one time when
>> you
>> > > >> make
>> > > >>>>> the
>> > > >>>>>> cogroup call from a KGroupedStream. From then on you are using
>> the
>> > > >>>>> cogroup
>> > > >>>>>> call from with in CogroupedKStream which doesnt accept any
>> > > >> additional
>> > > >>>>>> windows/sessionwindows.
>> > > >>>>>>
>> > > >>>>>> Is this what you meant by your question or did I misunderstand?
>> > > >>>>>>
>> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com>
>> > > >> wrote:
>> > > >>>>>>
>> > > >>>>>> Another question that came to me is on "window alignment": from
>> > the
>> > > >>> KIP
>> > > >>>>> it
>> > > >>>>>> seems you are allowing users to specify a (potentially
>> different)
>> > > >>> window
>> > > >>>>>> spec in each co-grouped input stream. So if these window specs
>> are
>> > > >>>>>> different how should we "align" them with different input
>> > streams? I
>> > > >>>>> think
>> > > >>>>>> it is more natural to only specify on window spec in the
>> > > >>>>>>
>> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> And remove it from the cogroup() functions. WDYT?
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> Guozhang
>> > > >>>>>>
>> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
>> > wangg...@gmail.com>
>> > > >>>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use case
>> to
>> > > >>>>> support
>> > > >>>>>>> such multi-way table join (i.e. N source tables with N
>> aggregate
>> > > >>> func)
>> > > >>>>>> with
>> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people
>> using
>> > > >> the
>> > > >>>>>>> low-level PAPI to achieve this goal.
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
>> > > >>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> I like your point about not handling other cases such as
>> count
>> > > >> and
>> > > >>>>>> reduce.
>> > > >>>>>>>>
>> > > >>>>>>>> I think that reduce may not make sense because reduce assumes
>> > > >> that
>> > > >>>>> the
>> > > >>>>>>>> input values are the same as the output values. With cogroup
>> > > >> there
>> > > >>>>> may
>> > > >>>>>> be
>> > > >>>>>>>> multiple different input types and then your output type
>> cant be
>> > > >>>>>> multiple
>> > > >>>>>>>> different things. In the case where you have all matching
>> value
>> > > >>> types
>> > > >>>>>> you
>> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce.
>> > > >>>>>>>>
>> > > >>>>>>>> As for count I think it is possible to call count on all the
>> > > >>>>> individual
>> > > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe
>> make
>> > > >> a
>> > > >>>>>> special
>> > > >>>>>>>> call in groupedstream for this case. Because in this case we
>> > dont
>> > > >>>>> need
>> > > >>>>>> to
>> > > >>>>>>>> do type checking on the values. It could be similar to the
>> > > >> current
>> > > >>>>> count
>> > > >>>>>>>> methods but accept a var args of additonal grouped streams as
>> > > >> well
>> > > >>>>> and
>> > > >>>>>>>> make
>> > > >>>>>>>> sure they have a key type of K.
>> > > >>>>>>>>
>> > > >>>>>>>> The way I have put the kip together is to ensure that we do
>> type
>> > > >>>>>> checking.
>> > > >>>>>>>> I don't see a way we could group them all first and then
>> make a
>> > > >>> call
>> > > >>>>> to
>> > > >>>>>>>> count, reduce, or aggregate because with aggregate they would
>> > > >> need
>> > > >>> to
>> > > >>>>>> pass
>> > > >>>>>>>> a list of aggregators and we would have no way of type
>> checking
>> > > >>> that
>> > > >>>>>> they
>> > > >>>>>>>> match the grouped streams.
>> > > >>>>>>>>
>> > > >>>>>>>> Thanks,
>> > > >>>>>>>> Kyle
>> > > >>>>>>>>
>> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <
>> xav...@confluent.io>
>> > > >>>>> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very
>> > > >>> useful
>> > > >>>>>>>>> addition and wanted to provide an additional use-case and
>> some
>> > > >>> more
>> > > >>>>>>>>> comments.
>> > > >>>>>>>>>
>> > > >>>>>>>>> This is actually a very common analytics use-case in the
>> > > >> ad-tech
>> > > >>>>>>>> industry.
>> > > >>>>>>>>> The typical setup will have an auction stream, an impression
>> > > >>>>> stream,
>> > > >>>>>>>> and a
>> > > >>>>>>>>> click stream. Those three streams need to be combined to
>> > > >> compute
>> > > >>>>>>>> aggregate
>> > > >>>>>>>>> statistics (e.g. impression statistics, and click-through
>> > > >> rates),
>> > > >>>>>> since
>> > > >>>>>>>>> most of the attributes of interest are only present the
>> auction
>> > > >>>>>> stream.
>> > > >>>>>>>>>
>> > > >>>>>>>>> A simple way to do this is to co-group all the streams by
>> the
>> > > >>>>> auction
>> > > >>>>>>>> key,
>> > > >>>>>>>>> and process updates to the co-group as events for each
>> stream
>> > > >>> come
>> > > >>>>> in,
>> > > >>>>>>>>> keeping only one value from each stream before sending
>> > > >> downstream
>> > > >>>>> for
>> > > >>>>>>>>> further processing / aggregation.
>> > > >>>>>>>>>
>> > > >>>>>>>>> One could view the result of that co-group operation as a
>> > > >>> "KTable"
>> > > >>>>>> with
>> > > >>>>>>>>> multiple values per key. The key being the grouping key, and
>> > > >> the
>> > > >>>>>> values
>> > > >>>>>>>>> consisting of one value per stream.
>> > > >>>>>>>>>
>> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant
>> > > >>>>> co-grouping
>> > > >>>>>> of
>> > > >>>>>>>>> multiple streams without having to worry about the number of
>> > > >>>>> streams,
>> > > >>>>>>>> and
>> > > >>>>>>>>> avoids dealing with Tuple types or other generic interfaces
>> > > >> that
>> > > >>>>> could
>> > > >>>>>>>> get
>> > > >>>>>>>>> messy if we wanted to preserve all the value types in the
>> > > >>> resulting
>> > > >>>>>>>>> co-grouped stream.
>> > > >>>>>>>>>
>> > > >>>>>>>>> My only concern is that we only allow the cogroup +
>> aggregate
>> > > >>>>> combined
>> > > >>>>>>>>> operation. This forces the user to build their own tuple
>> > > >>>>> serialization
>> > > >>>>>>>>> format if they want to preserve the individual input stream
>> > > >>> values
>> > > >>>>> as
>> > > >>>>>> a
>> > > >>>>>>>>> group. It also deviates quite a bit from our approach in
>> > > >>>>>> KGroupedStream
>> > > >>>>>>>>> which offers other operations, such as count and reduce,
>> which
>> > > >>>>> should
>> > > >>>>>>>> also
>> > > >>>>>>>>> be applicable to a co-grouped stream.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Overall I still think this is a really useful addition, but
>> I
>> > > >>> feel
>> > > >>>>> we
>> > > >>>>>>>>> haven't spend much time trying to explore alternative DSLs
>> that
>> > > >>>>> could
>> > > >>>>>>>> maybe
>> > > >>>>>>>>> generalize better or match our existing syntax more closely.
>> > > >>>>>>>>>
>> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
>> > > >>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>>>
>> > > >>>>>>>>> wrote:
>> > > >>>>>>>>>
>> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka
>> > > >>> streams
>> > > >>>>>> realm
>> > > >>>>>>>>> that
>> > > >>>>>>>>>> I should reach out to for input?
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this
>> more
>> > > >>> in
>> > > >>>>>> depth
>> > > >>>>>>>>> so I
>> > > >>>>>>>>>> will wait for his inputs before continuing.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
>> > > >> eno.there...@gmail.com
>> > > >>>>
>> > > >>>>>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> Thanks Kyle, good arguments.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Eno
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
>> > > >>>>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to what
>> > > >>>>> Jay’s
>> > > >>>>>>>>> example
>> > > >>>>>>>>>>> is,
>> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
>> > > >>>>>> concrete?*
>> > > >>>>>>>>>>>> I have added a more concrete example to the KIP.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this
>> > > >> optimization
>> > > >>>>> to
>> > > >>>>>> the
>> > > >>>>>>>>> DSL.
>> > > >>>>>>>>>>> In
>> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
>> > > >>> and
>> > > >>>>> do
>> > > >>>>>>>> the
>> > > >>>>>>>>>> right
>> > > >>>>>>>>>>>> thing under the covers (create just one state store,
>> > > >>> arrange
>> > > >>>>> the
>> > > >>>>>>>>> nodes
>> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
>> > > >>>>> pieces
>> > > >>>>>>>>> (group,
>> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
>> > > >>>>> like to
>> > > >>>>>>>> hear
>> > > >>>>>>>>>>> your
>> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization
>> > > >>>>> with
>> > > >>>>>> the
>> > > >>>>>>>>>>> current
>> > > >>>>>>>>>>>> DSL, at the topology builder level.*
>> > > >>>>>>>>>>>> You would have to make a lot of checks to understand if
>> > > >> it
>> > > >>> is
>> > > >>>>>> even
>> > > >>>>>>>>>>> possible
>> > > >>>>>>>>>>>> to make this optimization:
>> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
>> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything
>> > > >>>>> else.
>> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be
>> > > >>>>>>>> impossible
>> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the
>> > > >>> topology
>> > > >>>>> has
>> > > >>>>>>>>>> already
>> > > >>>>>>>>>>>> been built.)
>> > > >>>>>>>>>>>> You would then need to make decisions during the
>> > > >>>>> optimization:
>> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all the
>> > > >>>>>> individual
>> > > >>>>>>>>>>>> initializers and the valueJoiners.
>> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would
>> > > >>> turn
>> > > >>>>> the
>> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
>> > > >> would
>> > > >>>>> work
>> > > >>>>>> on
>> > > >>>>>>>>> the
>> > > >>>>>>>>>>>> final object, but this may be possible.
>> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared
>> > > >> would
>> > > >>>>> be
>> > > >>>>>> for
>> > > >>>>>>>>> the
>> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be
>> > > >>>>> guaranteed
>> > > >>>>>> to
>> > > >>>>>>>>> hold
>> > > >>>>>>>>>>> the
>> > > >>>>>>>>>>>> final object. This would mean you must created a new
>> > > >> state
>> > > >>>>> store
>> > > >>>>>>>> and
>> > > >>>>>>>>>> not
>> > > >>>>>>>>>>>> created any of the declared ones.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> The main argument I have against it is even if it could
>> > > >> be
>> > > >>>>> done
>> > > >>>>>> I
>> > > >>>>>>>>> don't
>> > > >>>>>>>>>>>> know that we would want to have this be an optimization
>> > > >> in
>> > > >>>>> the
>> > > >>>>>>>>>> background
>> > > >>>>>>>>>>>> because the user would still be required to think about
>> > > >> all
>> > > >>>>> of
>> > > >>>>>> the
>> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry
>> > > >> about
>> > > >>>>> if
>> > > >>>>>>>> they
>> > > >>>>>>>>>> only
>> > > >>>>>>>>>>>> care about the final object.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it
>> > > >>> should
>> > > >>>>> be
>> > > >>>>>>>> part
>> > > >>>>>>>>> of
>> > > >>>>>>>>>>> the
>> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we
>> > > >> want
>> > > >>> to
>> > > >>>>>>>> allow
>> > > >>>>>>>>>>> people
>> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right
>> > > >>>>> now I
>> > > >>>>>>>> don't
>> > > >>>>>>>>>>> think
>> > > >>>>>>>>>>>> we give them ways of handling this use case easily.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> *-I think there will be scope for several such
>> > > >>> optimizations
>> > > >>>>> in
>> > > >>>>>>>> the
>> > > >>>>>>>>>>> future
>> > > >>>>>>>>>>>> and perhaps at some point we need to think about
>> > > >> decoupling
>> > > >>>>> the
>> > > >>>>>>>> 1:1
>> > > >>>>>>>>>>> mapping
>> > > >>>>>>>>>>>> from the DSL into the physical topology.*
>> > > >>>>>>>>>>>> I would argue that cogroup is not just an optimization it
>> > > >>> is
>> > > >>>>> a
>> > > >>>>>> new
>> > > >>>>>>>>> way
>> > > >>>>>>>>>>> for
>> > > >>>>>>>>>>>> the users to look at accomplishing a problem that
>> > > >> requires
>> > > >>>>>>>> multiple
>> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't
>> > > >> think
>> > > >>>>>> users
>> > > >>>>>>>>>> should
>> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with
>> > > >>> their
>> > > >>>>>>>>>>> initializers,
>> > > >>>>>>>>>>>> serdes and stores if all they care about is the final
>> > > >>> object.
>> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
>> > > >> supply
>> > > >>>>>>>> additional
>> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a
>> > > >> single
>> > > >>>>>>>> grouped
>> > > >>>>>>>>>>> stream
>> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an
>> > > >>>>>> optimization
>> > > >>>>>>>>> as,
>> > > >>>>>>>>>> we
>> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was
>> > > >>> just
>> > > >>>>> a
>> > > >>>>>>>> call
>> > > >>>>>>>>> to
>> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling
>> > > >>>>>>>> KGroupedStream#cogroup.
>> > > >>>>>>>>> (I
>> > > >>>>>>>>>>>> would prefer to just write a warning saying that this is
>> > > >>> not
>> > > >>>>> how
>> > > >>>>>>>>>> cogroup
>> > > >>>>>>>>>>> is
>> > > >>>>>>>>>>>> to be used.)
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>>> Kyle
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
>> > > >>>>>>>> eno.there...@gmail.com
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hi Kyle,
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to what
>> > > >>>>> Jay’s
>> > > >>>>>>>>> example
>> > > >>>>>>>>>>> is,
>> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
>> > > >>>>>> concrete?
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this
>> > > >> optimization
>> > > >>>>> to
>> > > >>>>>> the
>> > > >>>>>>>>> DSL.
>> > > >>>>>>>>>>> In
>> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
>> > > >>>>> and do
>> > > >>>>>>>> the
>> > > >>>>>>>>>>> right
>> > > >>>>>>>>>>>>> thing under the covers (create just one state store,
>> > > >>> arrange
>> > > >>>>>> the
>> > > >>>>>>>>> nodes
>> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
>> > > >>>>> pieces
>> > > >>>>>>>>> (group,
>> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
>> > > >>>>> like
>> > > >>>>>> to
>> > > >>>>>>>>> hear
>> > > >>>>>>>>>>> your
>> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this
>> > > >> optimization
>> > > >>>>> with
>> > > >>>>>>>> the
>> > > >>>>>>>>>>> current
>> > > >>>>>>>>>>>>> DSL, at the topology builder level.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> I think there will be scope for several such
>> > > >> optimizations
>> > > >>>>> in
>> > > >>>>>> the
>> > > >>>>>>>>>> future
>> > > >>>>>>>>>>>>> and perhaps at some point we need to think about
>> > > >>> decoupling
>> > > >>>>> the
>> > > >>>>>>>> 1:1
>> > > >>>>>>>>>>> mapping
>> > > >>>>>>>>>>>>> from the DSL into the physical topology.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Thanks
>> > > >>>>>>>>>>>>> Eno
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
>> > > >> j...@confluent.io>
>> > > >>>>>> wrote:
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is
>> > > >>> pretty
>> > > >>>>>>>> common.
>> > > >>>>>>>>> An
>> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified
>> > > >> customer
>> > > >>>>>>>> profile"
>> > > >>>>>>>>> use
>> > > >>>>>>>>>>>>> case
>> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems
>> > > >>>>> each
>> > > >>>>>> of
>> > > >>>>>>>>> which
>> > > >>>>>>>>>>> has
>> > > >>>>>>>>>>>>>> some information about your customer (account details,
>> > > >>>>>> settings,
>> > > >>>>>>>>>>> billing
>> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history,
>> > > >> etc).
>> > > >>>>> Your
>> > > >>>>>>>> goal
>> > > >>>>>>>>> is
>> > > >>>>>>>>>>> to
>> > > >>>>>>>>>>>>>> join/munge these into a single profile record for each
>> > > >>>>>> customer
>> > > >>>>>>>>> that
>> > > >>>>>>>>>>> has
>> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is
>> > > >> up-to-date
>> > > >>>>> with
>> > > >>>>>>>> all
>> > > >>>>>>>>> the
>> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams as
>> > > >> a
>> > > >>>>>>>> sequence
>> > > >>>>>>>>> of
>> > > >>>>>>>>>>>>> joins
>> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate
>> > > >>>>> tables.
>> > > >>>>>>>> But
>> > > >>>>>>>>>>> clearly
>> > > >>>>>>>>>>>>>> you only need a single stage to group all these things
>> > > >>> that
>> > > >>>>>> are
>> > > >>>>>>>>>> already
>> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this
>> > > >>> under
>> > > >>>>> the
>> > > >>>>>>>>> covers
>> > > >>>>>>>>>>>>> which
>> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right
>> > > >>> thing)
>> > > >>>>> and
>> > > >>>>>>>>>> perhaps
>> > > >>>>>>>>>>> we
>> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the
>> > > >>>>>>>> partitioning
>> > > >>>>>>>>> so
>> > > >>>>>>>>>> we
>> > > >>>>>>>>>>>>> may
>> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are
>> > > >>>>>> already
>> > > >>>>>>>>>>>>>> co-partitioned.
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> -Jay
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
>> > > >>>>>>>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it.
>> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple
>> > > >> other
>> > > >>>>>>>> platforms
>> > > >>>>>>>>>>> like
>> > > >>>>>>>>>>>>>>> spark and pig.
>> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
>> > > >>>>>>>>>>>>> PairRDDFunctions.html
>> > > >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
>> > > >>>>>>>>>>>>> cogroup_operator.htm
>> > > >>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
>> > > >>>>> damian....@gmail.com>
>> > > >>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>> Hi Kyle,
>> > > >>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
>> > > >> outer
>> > > >>>>>> join?
>> > > >>>>>>>> So
>> > > >>>>>>>>> an
>> > > >>>>>>>>>>>>> input
>> > > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated
>> > > >>> value
>> > > >>>>> -
>> > > >>>>>> is
>> > > >>>>>>>>> that
>> > > >>>>>>>>>>>>>>> correct?
>> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
>> > > >> current
>> > > >>>>>> value,
>> > > >>>>>>>>>>>>> aggregates
>> > > >>>>>>>>>>>>>>>> and forwards the result.
>> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more,
>> > > >>>>> but it
>> > > >>>>>>>>> seems
>> > > >>>>>>>>>>> like
>> > > >>>>>>>>>>>>>>> it
>> > > >>>>>>>>>>>>>>>> could be a useful optimization.
>> > > >>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
>> > > >>>>>>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following description
>> > > >> to
>> > > >>> my
>> > > >>>>>>>> KIP. If
>> > > >>>>>>>>>>> this
>> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more
>> > > >>> time
>> > > >>>>> to
>> > > >>>>>>>>> build a
>> > > >>>>>>>>>>>>>>>> diagram
>> > > >>>>>>>>>>>>>>>>> and make more of a step by step description:
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> Example with Current API:
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
>> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(
>> > > >>>>>> initializer1
>> > > >>>>>>>> ,
>> > > >>>>>>>>>>>>>>>> aggregator1,
>> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
>> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
>> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(
>> > > >>>>>> initializer2
>> > > >>>>>>>> ,
>> > > >>>>>>>>>>>>>>>> aggregator2,
>> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
>> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
>> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(
>> > > >>>>>> initializer3
>> > > >>>>>>>> ,
>> > > >>>>>>>>>>>>>>>> aggregator3,
>> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
>> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
>> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
>> > > >>>>> joinerOneTwoAndThree);
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires
>> > > >> 3
>> > > >>>>>>>>>> initializers,
>> > > >>>>>>>>>>>>>>> and 3
>> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user
>> > > >> to
>> > > >>>>>> define
>> > > >>>>>>>>> what
>> > > >>>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3).
>> > > >> They
>> > > >>>>> are
>> > > >>>>>>>> left
>> > > >>>>>>>>>>> with a
>> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the
>> > > >>>>> same
>> > > >>>>>> as
>> > > >>>>>>>> CG
>> > > >>>>>>>>>> and
>> > > >>>>>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>> two
>> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them
>> > > >>>>>>>> intermediate
>> > > >>>>>>>>>>> states
>> > > >>>>>>>>>>>>>>>> such
>> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
>> > > >> joiners
>> > > >>>>> use
>> > > >>>>>>>> those
>> > > >>>>>>>>>> to
>> > > >>>>>>>>>>>>>>> build
>> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the
>> > > >>> user
>> > > >>>>>>>> could
>> > > >>>>>>>>>> avoid
>> > > >>>>>>>>>>>>>>>>> thinking about with this KIP.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
>> > > >> will
>> > > >>>>> first
>> > > >>>>>>>> go
>> > > >>>>>>>>>>> through
>> > > >>>>>>>>>>>>>>> a
>> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from
>> > > >>>>>>>> storeName1.
>> > > >>>>>>>>>> It
>> > > >>>>>>>>>>>>>>> will
>> > > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate
>> > > >>> value
>> > > >>>>>> and
>> > > >>>>>>>> get
>> > > >>>>>>>>>>> sent
>> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look
>> > > >> up
>> > > >>>>> the
>> > > >>>>>>>>> current
>> > > >>>>>>>>>>>>> value
>> > > >>>>>>>>>>>>>>>> of
>> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner
>> > > >> to
>> > > >>>>>>>> calculate
>> > > >>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>> second
>> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an
>> > > >>> additional
>> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
>> > > >>> current
>> > > >>>>>>>> value of
>> > > >>>>>>>>>> the
>> > > >>>>>>>>>>>>>>> key
>> > > >>>>>>>>>>>>>>>> in
>> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the
>> > > >>> final
>> > > >>>>>>>>> aggregate
>> > > >>>>>>>>>>>>>>> value.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming
>> > > >>>>> topics
>> > > >>>>>>>> you
>> > > >>>>>>>>>> will
>> > > >>>>>>>>>>>>> see
>> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all
>> > > >>> three
>> > > >>>>>>>> stores
>> > > >>>>>>>>>> are
>> > > >>>>>>>>>>>>>>>> queried
>> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N
>> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins,
>> > > >> and
>> > > >>>>> N-1
>> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> Example with Proposed API:
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
>> > > >>>>> builder.stream("topic1").
>> > > >>>>>>>>>>>>>>> groupByKey();
>> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
>> > > >>>>> builder.stream("topic2").
>> > > >>>>>>>>>>>>>>> groupByKey();
>> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
>> > > >>>>> builder.stream("topic3").
>> > > >>>>>>>>>>>>>>> groupByKey();
>> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
>> > > >>> grouped1.cogroup(initializer1,
>> > > >>>>>>>>>>> aggregator1,
>> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
>> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped2, aggregator2)
>> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped3, aggregator3)
>> > > >>>>>>>>>>>>>>>>>      .aggregate();
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1
>> > > >>>>>>>>> initializer,
>> > > >>>>>>>>>>> and
>> > > >>>>>>>>>>>>> 1
>> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about
>> > > >>> the
>> > > >>>>>>>>>>> intermediate
>> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about
>> > > >>> is
>> > > >>>>> how
>> > > >>>>>>>> each
>> > > >>>>>>>>>>>>> stream
>> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
>> > > >> will
>> > > >>>>> first
>> > > >>>>>>>> go
>> > > >>>>>>>>>>> through
>> > > >>>>>>>>>>>>>>> a
>> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from
>> > > >>>>>>>> storeName1.
>> > > >>>>>>>>>> It
>> > > >>>>>>>>>>>>>>> will
>> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the
>> > > >>>>> store
>> > > >>>>>>>> and
>> > > >>>>>>>>>> pass
>> > > >>>>>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>> new
>> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the
>> > > >>>>>>>> KStreamCogroup
>> > > >>>>>>>>>>> which
>> > > >>>>>>>>>>>>>>> is
>> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you
>> > > >> are
>> > > >>>>> done.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
>> > > >> will
>> > > >>>>> only
>> > > >>>>>>>> every
>> > > >>>>>>>>>>>>>>> create N
>> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
>> > > >>>>>>>>>>>>> matth...@confluent.io
>> > > >>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> Kyle,
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
>> > > >>>>> but I
>> > > >>>>>>>> could
>> > > >>>>>>>>>> not
>> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
>> > > >>> concrete
>> > > >>>>>>>> example,
>> > > >>>>>>>>>>> like
>> > > >>>>>>>>>>>>>>> 3
>> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result),
>> > > >>> and
>> > > >>>>>> show
>> > > >>>>>>>>> the
>> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement it
>> > > >> and
>> > > >>>>> the
>> > > >>>>>>>>>> proposed
>> > > >>>>>>>>>>>>>>> API?
>> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to
>> > > >> see
>> > > >>>>> what
>> > > >>>>>>>> store
>> > > >>>>>>>>>>> calls
>> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc.
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose,
>> > > >> and
>> > > >>> it
>> > > >>>>>>>> would
>> > > >>>>>>>>>> help
>> > > >>>>>>>>>>> to
>> > > >>>>>>>>>>>>>>>>>> understand it better.
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> Thanks a lot!
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> -Matthias
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
>> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here.
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
>> > > >> classes
>> > > >>>>> and
>> > > >>>>>> get
>> > > >>>>>>>>>> around
>> > > >>>>>>>>>>>>>>> to
>> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api
>> > > >> additions.
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
>> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
>> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
>> > > >>>>>>>>> repartitionIfRequired
>> > > >>>>>>>>>>>>>>>> method.
>> > > >>>>>>>>>>>>>>>>> I
>> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than
>> > > >> one
>> > > >>>>>>>> grouped
>> > > >>>>>>>>>>> stream
>> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
>> > > >> name
>> > > >>>>> that
>> > > >>>>>>>>> someone
>> > > >>>>>>>>>>>>>>> can
>> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it
>> > > >> to
>> > > >>>>> fall
>> > > >>>>>>>> back
>> > > >>>>>>>>>> to
>> > > >>>>>>>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name?
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables?
>> > > >>> This
>> > > >>>>>>>> would
>> > > >>>>>>>>> be
>> > > >>>>>>>>>>>>>>>> pretty
>> > > >>>>>>>>>>>>>>>>>> easy
>> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session
>> > > >>> stores
>> > > >>>>>> and
>> > > >>>>>>>>>>> windowed
>> > > >>>>>>>>>>>>>>>>>> stores
>> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and
>> > > >>>>>>>>>>>>>>> KTableWindowAggregate
>> > > >>>>>>>>>>>>>>>>>>> implementations.
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>>>>>>>>>> Kyle
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
>> > > >>>>>>>>> eno.there...@gmail.com>
>> > > >>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
>> > > >>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>> Eno
>> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
>> > > >>>>>>>>> damian....@gmail.com>
>> > > >>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle,
>> > > >>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't
>> > > >> had
>> > > >>>>> the
>> > > >>>>>>>>> chance
>> > > >>>>>>>>>> to
>> > > >>>>>>>>>>>>>>>> look
>> > > >>>>>>>>>>>>>>>>>> at
>> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look
>> > > >>>>> into
>> > > >>>>>> it
>> > > >>>>>>>>>>>>>>> tomorrow.
>> > > >>>>>>>>>>>>>>>>> For
>> > > >>>>>>>>>>>>>>>>>>>> the
>> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka
>> > > >>>>> trunk
>> > > >>>>>>>> and
>> > > >>>>>>>>>> mark
>> > > >>>>>>>>>>>>>>> it
>> > > >>>>>>>>>>>>>>>> as
>> > > >>>>>>>>>>>>>>>>>>>> WIP?
>> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done.
>> > > >>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>>>>>>>>>>>> Damian
>> > > >>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
>> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
>> > > >> some
>> > > >>>>>>>> attention
>> > > >>>>>>>>> to
>> > > >>>>>>>>>>> my
>> > > >>>>>>>>>>>>>>>> KIP
>> > > >>>>>>>>>>>>>>>>>> as
>> > > >>>>>>>>>>>>>>>>>>>> I
>> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
>> > > >> This
>> > > >>>>> is my
>> > > >>>>>>>>> first
>> > > >>>>>>>>>>> KIP
>> > > >>>>>>>>>>>>>>>> and
>> > > >>>>>>>>>>>>>>>>>> my
>> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm
>> > > >>>>> sure I
>> > > >>>>>>>> did
>> > > >>>>>>>>>>>>>>>> something
>> > > >>>>>>>>>>>>>>>>>>>> wrong.
>> > > >>>>>>>>>>>>>>>>>>>>>> ;)
>> > > >>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
>> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com>
>> > > >>>>>>>>>>>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all,
>> > > >>>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
>> > > >> discussion
>> > > >>>>> about
>> > > >>>>>>>>> adding
>> > > >>>>>>>>>>>>>>>>> cogroup
>> > > >>>>>>>>>>>>>>>>>> to
>> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
>> > > >>>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
>> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>> > > >>>>>> confluence/display/KAFKA/KIP-
>> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
>> > > >>>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here:
>> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
>> > > >>>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
>> > > >>>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> --
>> > > >>>>>>> -- Guozhang
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> --
>> > > >>>>>> -- Guozhang
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> --
>> > > >>>>> -- Guozhang
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to