Another reason for the serde not to be in the first cogroup call, is that
the serde should not be required if you pass a StateStoreSupplier to
aggregate()

Regarding the aggregated type <T> I don't the why initializer should be
favored over aggregator to define the type. In my mind separating the
initializer into the last aggregate call clearly indicates that the
initializer is independent of any of the aggregators or streams and that we
don't wait for grouped1 events to initialize the co-group.

On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> wrote:

> On a second thought... This is the current proposal API
>
>
> ```
>
> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, final
> Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
> aggValueSerde)
>
> ```
>
>
> If we do not have the initializer in the first co-group it might be a bit
> awkward for users to specify the aggregator that returns a typed <T> value?
> Maybe it is still better to put these two functions in the same api?
>
>
>
> Guozhang
>
> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > This suggestion lgtm. I would vote for the first alternative than adding
> > it to the `KStreamBuilder` though.
> >
> > On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io>
> > wrote:
> >
> >> I have a minor suggestion to make the API a little bit more symmetric.
> >> I feel it would make more sense to move the initializer and serde to the
> >> final aggregate statement, since the serde only applies to the state
> >> store,
> >> and the initializer doesn't bear any relation to the first group in
> >> particular. It would end up looking like this:
> >>
> >> KTable<K, CG> cogrouped =
> >>     grouped1.cogroup(aggregator1)
> >>             .cogroup(grouped2, aggregator2)
> >>             .cogroup(grouped3, aggregator3)
> >>             .aggregate(initializer1, aggValueSerde, storeName1);
> >>
> >> Alternatively, we could move the the first cogroup() method to
> >> KStreamBuilder, similar to how we have .merge()
> >> and end up with an api that would be even more symmetric.
> >>
> >> KStreamBuilder.cogroup(grouped1, aggregator1)
> >>               .cogroup(grouped2, aggregator2)
> >>               .cogroup(grouped3, aggregator3)
> >>               .aggregate(initializer1, aggValueSerde, storeName1);
> >>
> >> This doesn't have to be a blocker, but I thought it would make the API
> >> just
> >> a tad cleaner.
> >>
> >> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> > Kyle,
> >> >
> >> > Thanks a lot for the updated KIP. It looks good to me.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com>
> wrote:
> >> >
> >> > > This makes much more sense to me. +1
> >> > >
> >> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
> >> winkelman.k...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > I have updated the KIP and my PR. Let me know what you think.
> >> > > > To created a cogrouped stream just call cogroup on a
> KgroupedStream
> >> and
> >> > > > supply the initializer, aggValueSerde, and an aggregator. Then
> >> continue
> >> > > > adding kgroupedstreams and aggregators. Then call one of the many
> >> > > aggregate
> >> > > > calls to create a KTable.
> >> > > >
> >> > > > Thanks,
> >> > > > Kyle
> >> > > >
> >> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com>
> wrote:
> >> > > >
> >> > > >> Hi Kyle,
> >> > > >>
> >> > > >> Thanks for the update. I think just one initializer makes sense
> as
> >> it
> >> > > >> should only be called once per key and generally it is just going
> >> to
> >> > > create
> >> > > >> a new instance of whatever the Aggregate class is.
> >> > > >>
> >> > > >> Cheers,
> >> > > >> Damian
> >> > > >>
> >> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
> >> winkelman.k...@gmail.com
> >> > >
> >> > > >> wrote:
> >> > > >>
> >> > > >>> Hello all,
> >> > > >>>
> >> > > >>> I have spent some more time on this and the best alternative I
> >> have
> >> > > come
> >> > > >> up
> >> > > >>> with is:
> >> > > >>> KGroupedStream has a single cogroup call that takes an
> initializer
> >> > and
> >> > > an
> >> > > >>> aggregator.
> >> > > >>> CogroupedKStream has a cogroup call that takes additional
> >> > groupedStream
> >> > > >>> aggregator pairs.
> >> > > >>> CogroupedKStream has multiple aggregate methods that create the
> >> > > different
> >> > > >>> stores.
> >> > > >>>
> >> > > >>> I plan on updating the kip but I want people's input on if we
> >> should
> >> > > have
> >> > > >>> the initializer be passed in once at the beginning or if we
> should
> >> > > >> instead
> >> > > >>> have the initializer be required for each call to one of the
> >> > aggregate
> >> > > >>> calls. The first makes more sense to me but doesnt allow the
> user
> >> to
> >> > > >>> specify different initializers for different tables.
> >> > > >>>
> >> > > >>> Thanks,
> >> > > >>> Kyle
> >> > > >>>
> >> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
> >> winkelman.k...@gmail.com>
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Yea I really like that idea I'll see what I can do to update
> the
> >> kip
> >> > > >> and
> >> > > >>>> my pr when I have some time. I'm not sure how well creating the
> >> > > >>>> kstreamaggregates will go though because at that point I will
> >> have
> >> > > >> thrown
> >> > > >>>> away the type of the values. It will be type safe I just may
> >> need to
> >> > > >> do a
> >> > > >>>> little forcing.
> >> > > >>>>
> >> > > >>>> Thanks,
> >> > > >>>> Kyle
> >> > > >>>>
> >> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com>
> >> > wrote:
> >> > > >>>>
> >> > > >>>>> Kyle,
> >> > > >>>>>
> >> > > >>>>> Thanks for the explanations, my previous read on the wiki
> >> examples
> >> > > was
> >> > > >>>>> wrong.
> >> > > >>>>>
> >> > > >>>>> So I guess my motivation should be "reduced" to: can we move
> the
> >> > > >> window
> >> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to
> >> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> >> > > >>>>>
> >> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream
> from
> >> 3
> >> > to
> >> > > >> 2.
> >> > > >>>>> 2. major: this is for extensibility of the APIs, and since we
> >> are
> >> > > >>> removing
> >> > > >>>>> the "Evolving" annotations on Streams it may be harder to
> >> change it
> >> > > >>> again
> >> > > >>>>> in the future. The extended use cases are that people wanted
> to
> >> > have
> >> > > >>>>> windowed running aggregates on different granularities, e.g.
> >> "give
> >> > me
> >> > > >>> the
> >> > > >>>>> counts per-minute, per-hour, per-day and per-week", and today
> in
> >> > DSL
> >> > > >> we
> >> > > >>>>> need to specify that case in multiple aggregate operators,
> which
> >> > gets
> >> > > >> a
> >> > > >>>>> state store / changelog, etc. And it is possible to optimize
> it
> >> as
> >> > > >> well
> >> > > >>> to
> >> > > >>>>> a single state store. Its implementation would be tricky as
> you
> >> > need
> >> > > >> to
> >> > > >>>>> contain different lengthed windows within your window store
> but
> >> > just
> >> > > >>> from
> >> > > >>>>> the public API point of view, it could be specified as:
> >> > > >>>>>
> >> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> >> > > >>>>> "state-store-name");
> >> > > >>>>>
> >> > > >>>>> table1 = stream.aggregate(/*per-minute window*/)
> >> > > >>>>> table2 = stream.aggregate(/*per-hour window*/)
> >> > > >>>>> table3 = stream.aggregate(/*per-day window*/)
> >> > > >>>>>
> >> > > >>>>> while underlying we are only using a single store
> >> > "state-store-name"
> >> > > >> for
> >> > > >>>>> it.
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> Although this feature is out of the scope of this KIP, I'd
> like
> >> to
> >> > > >>> discuss
> >> > > >>>>> if we can "leave the door open" to make such changes without
> >> > > modifying
> >> > > >>> the
> >> > > >>>>> public APIs .
> >> > > >>>>>
> >> > > >>>>> Guozhang
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> >> > > >>> winkelman.k...@gmail.com
> >> > > >>>>>>
> >> > > >>>>> wrote:
> >> > > >>>>>
> >> > > >>>>>> I allow defining a single window/sessionwindow one time when
> >> you
> >> > > >> make
> >> > > >>>>> the
> >> > > >>>>>> cogroup call from a KGroupedStream. From then on you are
> using
> >> the
> >> > > >>>>> cogroup
> >> > > >>>>>> call from with in CogroupedKStream which doesnt accept any
> >> > > >> additional
> >> > > >>>>>> windows/sessionwindows.
> >> > > >>>>>>
> >> > > >>>>>> Is this what you meant by your question or did I
> misunderstand?
> >> > > >>>>>>
> >> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com
> >
> >> > > >> wrote:
> >> > > >>>>>>
> >> > > >>>>>> Another question that came to me is on "window alignment":
> from
> >> > the
> >> > > >>> KIP
> >> > > >>>>> it
> >> > > >>>>>> seems you are allowing users to specify a (potentially
> >> different)
> >> > > >>> window
> >> > > >>>>>> spec in each co-grouped input stream. So if these window
> specs
> >> are
> >> > > >>>>>> different how should we "align" them with different input
> >> > streams? I
> >> > > >>>>> think
> >> > > >>>>>> it is more natural to only specify on window spec in the
> >> > > >>>>>>
> >> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> And remove it from the cogroup() functions. WDYT?
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> Guozhang
> >> > > >>>>>>
> >> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
> >> > wangg...@gmail.com>
> >> > > >>>>> wrote:
> >> > > >>>>>>
> >> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use
> case
> >> to
> >> > > >>>>> support
> >> > > >>>>>>> such multi-way table join (i.e. N source tables with N
> >> aggregate
> >> > > >>> func)
> >> > > >>>>>> with
> >> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people
> >> using
> >> > > >> the
> >> > > >>>>>>> low-level PAPI to achieve this goal.
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> >> > > >>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>> wrote:
> >> > > >>>>>>>
> >> > > >>>>>>>> I like your point about not handling other cases such as
> >> count
> >> > > >> and
> >> > > >>>>>> reduce.
> >> > > >>>>>>>>
> >> > > >>>>>>>> I think that reduce may not make sense because reduce
> assumes
> >> > > >> that
> >> > > >>>>> the
> >> > > >>>>>>>> input values are the same as the output values. With
> cogroup
> >> > > >> there
> >> > > >>>>> may
> >> > > >>>>>> be
> >> > > >>>>>>>> multiple different input types and then your output type
> >> cant be
> >> > > >>>>>> multiple
> >> > > >>>>>>>> different things. In the case where you have all matching
> >> value
> >> > > >>> types
> >> > > >>>>>> you
> >> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce.
> >> > > >>>>>>>>
> >> > > >>>>>>>> As for count I think it is possible to call count on all
> the
> >> > > >>>>> individual
> >> > > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe
> >> make
> >> > > >> a
> >> > > >>>>>> special
> >> > > >>>>>>>> call in groupedstream for this case. Because in this case
> we
> >> > dont
> >> > > >>>>> need
> >> > > >>>>>> to
> >> > > >>>>>>>> do type checking on the values. It could be similar to the
> >> > > >> current
> >> > > >>>>> count
> >> > > >>>>>>>> methods but accept a var args of additonal grouped streams
> as
> >> > > >> well
> >> > > >>>>> and
> >> > > >>>>>>>> make
> >> > > >>>>>>>> sure they have a key type of K.
> >> > > >>>>>>>>
> >> > > >>>>>>>> The way I have put the kip together is to ensure that we do
> >> type
> >> > > >>>>>> checking.
> >> > > >>>>>>>> I don't see a way we could group them all first and then
> >> make a
> >> > > >>> call
> >> > > >>>>> to
> >> > > >>>>>>>> count, reduce, or aggregate because with aggregate they
> would
> >> > > >> need
> >> > > >>> to
> >> > > >>>>>> pass
> >> > > >>>>>>>> a list of aggregators and we would have no way of type
> >> checking
> >> > > >>> that
> >> > > >>>>>> they
> >> > > >>>>>>>> match the grouped streams.
> >> > > >>>>>>>>
> >> > > >>>>>>>> Thanks,
> >> > > >>>>>>>> Kyle
> >> > > >>>>>>>>
> >> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <
> >> xav...@confluent.io>
> >> > > >>>>> wrote:
> >> > > >>>>>>>>
> >> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a
> very
> >> > > >>> useful
> >> > > >>>>>>>>> addition and wanted to provide an additional use-case and
> >> some
> >> > > >>> more
> >> > > >>>>>>>>> comments.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> This is actually a very common analytics use-case in the
> >> > > >> ad-tech
> >> > > >>>>>>>> industry.
> >> > > >>>>>>>>> The typical setup will have an auction stream, an
> impression
> >> > > >>>>> stream,
> >> > > >>>>>>>> and a
> >> > > >>>>>>>>> click stream. Those three streams need to be combined to
> >> > > >> compute
> >> > > >>>>>>>> aggregate
> >> > > >>>>>>>>> statistics (e.g. impression statistics, and click-through
> >> > > >> rates),
> >> > > >>>>>> since
> >> > > >>>>>>>>> most of the attributes of interest are only present the
> >> auction
> >> > > >>>>>> stream.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> A simple way to do this is to co-group all the streams by
> >> the
> >> > > >>>>> auction
> >> > > >>>>>>>> key,
> >> > > >>>>>>>>> and process updates to the co-group as events for each
> >> stream
> >> > > >>> come
> >> > > >>>>> in,
> >> > > >>>>>>>>> keeping only one value from each stream before sending
> >> > > >> downstream
> >> > > >>>>> for
> >> > > >>>>>>>>> further processing / aggregation.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> One could view the result of that co-group operation as a
> >> > > >>> "KTable"
> >> > > >>>>>> with
> >> > > >>>>>>>>> multiple values per key. The key being the grouping key,
> and
> >> > > >> the
> >> > > >>>>>> values
> >> > > >>>>>>>>> consisting of one value per stream.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant
> >> > > >>>>> co-grouping
> >> > > >>>>>> of
> >> > > >>>>>>>>> multiple streams without having to worry about the number
> of
> >> > > >>>>> streams,
> >> > > >>>>>>>> and
> >> > > >>>>>>>>> avoids dealing with Tuple types or other generic
> interfaces
> >> > > >> that
> >> > > >>>>> could
> >> > > >>>>>>>> get
> >> > > >>>>>>>>> messy if we wanted to preserve all the value types in the
> >> > > >>> resulting
> >> > > >>>>>>>>> co-grouped stream.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> My only concern is that we only allow the cogroup +
> >> aggregate
> >> > > >>>>> combined
> >> > > >>>>>>>>> operation. This forces the user to build their own tuple
> >> > > >>>>> serialization
> >> > > >>>>>>>>> format if they want to preserve the individual input
> stream
> >> > > >>> values
> >> > > >>>>> as
> >> > > >>>>>> a
> >> > > >>>>>>>>> group. It also deviates quite a bit from our approach in
> >> > > >>>>>> KGroupedStream
> >> > > >>>>>>>>> which offers other operations, such as count and reduce,
> >> which
> >> > > >>>>> should
> >> > > >>>>>>>> also
> >> > > >>>>>>>>> be applicable to a co-grouped stream.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Overall I still think this is a really useful addition,
> but
> >> I
> >> > > >>> feel
> >> > > >>>>> we
> >> > > >>>>>>>>> haven't spend much time trying to explore alternative DSLs
> >> that
> >> > > >>>>> could
> >> > > >>>>>>>> maybe
> >> > > >>>>>>>>> generalize better or match our existing syntax more
> closely.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
> >> > > >>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> wrote:
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka
> >> > > >>> streams
> >> > > >>>>>> realm
> >> > > >>>>>>>>> that
> >> > > >>>>>>>>>> I should reach out to for input?
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this
> >> more
> >> > > >>> in
> >> > > >>>>>> depth
> >> > > >>>>>>>>> so I
> >> > > >>>>>>>>>> will wait for his inputs before continuing.
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
> >> > > >> eno.there...@gmail.com
> >> > > >>>>
> >> > > >>>>>>>> wrote:
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>>> Thanks Kyle, good arguments.
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>> Eno
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> >> > > >>>>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to
> what
> >> > > >>>>> Jay’s
> >> > > >>>>>>>>> example
> >> > > >>>>>>>>>>> is,
> >> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
> >> > > >>>>>> concrete?*
> >> > > >>>>>>>>>>>> I have added a more concrete example to the KIP.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this
> >> > > >> optimization
> >> > > >>>>> to
> >> > > >>>>>> the
> >> > > >>>>>>>>> DSL.
> >> > > >>>>>>>>>>> In
> >> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing
> DSL
> >> > > >>> and
> >> > > >>>>> do
> >> > > >>>>>>>> the
> >> > > >>>>>>>>>> right
> >> > > >>>>>>>>>>>> thing under the covers (create just one state store,
> >> > > >>> arrange
> >> > > >>>>> the
> >> > > >>>>>>>>> nodes
> >> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
> >> > > >>>>> pieces
> >> > > >>>>>>>>> (group,
> >> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together.
> I’d
> >> > > >>>>> like to
> >> > > >>>>>>>> hear
> >> > > >>>>>>>>>>> your
> >> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this
> optimization
> >> > > >>>>> with
> >> > > >>>>>> the
> >> > > >>>>>>>>>>> current
> >> > > >>>>>>>>>>>> DSL, at the topology builder level.*
> >> > > >>>>>>>>>>>> You would have to make a lot of checks to understand if
> >> > > >> it
> >> > > >>> is
> >> > > >>>>>> even
> >> > > >>>>>>>>>>> possible
> >> > > >>>>>>>>>>>> to make this optimization:
> >> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
> >> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for
> anything
> >> > > >>>>> else.
> >> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may
> be
> >> > > >>>>>>>> impossible
> >> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the
> >> > > >>> topology
> >> > > >>>>> has
> >> > > >>>>>>>>>> already
> >> > > >>>>>>>>>>>> been built.)
> >> > > >>>>>>>>>>>> You would then need to make decisions during the
> >> > > >>>>> optimization:
> >> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all the
> >> > > >>>>>> individual
> >> > > >>>>>>>>>>>> initializers and the valueJoiners.
> >> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would
> >> > > >>> turn
> >> > > >>>>> the
> >> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
> >> > > >> would
> >> > > >>>>> work
> >> > > >>>>>> on
> >> > > >>>>>>>>> the
> >> > > >>>>>>>>>>>> final object, but this may be possible.
> >> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared
> >> > > >> would
> >> > > >>>>> be
> >> > > >>>>>> for
> >> > > >>>>>>>>> the
> >> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be
> >> > > >>>>> guaranteed
> >> > > >>>>>> to
> >> > > >>>>>>>>> hold
> >> > > >>>>>>>>>>> the
> >> > > >>>>>>>>>>>> final object. This would mean you must created a new
> >> > > >> state
> >> > > >>>>> store
> >> > > >>>>>>>> and
> >> > > >>>>>>>>>> not
> >> > > >>>>>>>>>>>> created any of the declared ones.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> The main argument I have against it is even if it could
> >> > > >> be
> >> > > >>>>> done
> >> > > >>>>>> I
> >> > > >>>>>>>>> don't
> >> > > >>>>>>>>>>>> know that we would want to have this be an optimization
> >> > > >> in
> >> > > >>>>> the
> >> > > >>>>>>>>>> background
> >> > > >>>>>>>>>>>> because the user would still be required to think about
> >> > > >> all
> >> > > >>>>> of
> >> > > >>>>>> the
> >> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry
> >> > > >> about
> >> > > >>>>> if
> >> > > >>>>>>>> they
> >> > > >>>>>>>>>> only
> >> > > >>>>>>>>>>>> care about the final object.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it
> >> > > >>> should
> >> > > >>>>> be
> >> > > >>>>>>>> part
> >> > > >>>>>>>>> of
> >> > > >>>>>>>>>>> the
> >> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we
> >> > > >> want
> >> > > >>> to
> >> > > >>>>>>>> allow
> >> > > >>>>>>>>>>> people
> >> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way.
> Right
> >> > > >>>>> now I
> >> > > >>>>>>>> don't
> >> > > >>>>>>>>>>> think
> >> > > >>>>>>>>>>>> we give them ways of handling this use case easily.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> *-I think there will be scope for several such
> >> > > >>> optimizations
> >> > > >>>>> in
> >> > > >>>>>>>> the
> >> > > >>>>>>>>>>> future
> >> > > >>>>>>>>>>>> and perhaps at some point we need to think about
> >> > > >> decoupling
> >> > > >>>>> the
> >> > > >>>>>>>> 1:1
> >> > > >>>>>>>>>>> mapping
> >> > > >>>>>>>>>>>> from the DSL into the physical topology.*
> >> > > >>>>>>>>>>>> I would argue that cogroup is not just an optimization
> it
> >> > > >>> is
> >> > > >>>>> a
> >> > > >>>>>> new
> >> > > >>>>>>>>> way
> >> > > >>>>>>>>>>> for
> >> > > >>>>>>>>>>>> the users to look at accomplishing a problem that
> >> > > >> requires
> >> > > >>>>>>>> multiple
> >> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't
> >> > > >> think
> >> > > >>>>>> users
> >> > > >>>>>>>>>> should
> >> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with
> >> > > >>> their
> >> > > >>>>>>>>>>> initializers,
> >> > > >>>>>>>>>>>> serdes and stores if all they care about is the final
> >> > > >>> object.
> >> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
> >> > > >> supply
> >> > > >>>>>>>> additional
> >> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a
> >> > > >> single
> >> > > >>>>>>>> grouped
> >> > > >>>>>>>>>>> stream
> >> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an
> >> > > >>>>>> optimization
> >> > > >>>>>>>>> as,
> >> > > >>>>>>>>>> we
> >> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was
> >> > > >>> just
> >> > > >>>>> a
> >> > > >>>>>>>> call
> >> > > >>>>>>>>> to
> >> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling
> >> > > >>>>>>>> KGroupedStream#cogroup.
> >> > > >>>>>>>>> (I
> >> > > >>>>>>>>>>>> would prefer to just write a warning saying that this
> is
> >> > > >>> not
> >> > > >>>>> how
> >> > > >>>>>>>>>> cogroup
> >> > > >>>>>>>>>>> is
> >> > > >>>>>>>>>>>> to be used.)
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>> Kyle
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> >> > > >>>>>>>> eno.there...@gmail.com
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Hi Kyle,
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to
> what
> >> > > >>>>> Jay’s
> >> > > >>>>>>>>> example
> >> > > >>>>>>>>>>> is,
> >> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this
> super
> >> > > >>>>>> concrete?
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this
> >> > > >> optimization
> >> > > >>>>> to
> >> > > >>>>>> the
> >> > > >>>>>>>>> DSL.
> >> > > >>>>>>>>>>> In
> >> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing
> DSL
> >> > > >>>>> and do
> >> > > >>>>>>>> the
> >> > > >>>>>>>>>>> right
> >> > > >>>>>>>>>>>>> thing under the covers (create just one state store,
> >> > > >>> arrange
> >> > > >>>>>> the
> >> > > >>>>>>>>> nodes
> >> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small,
> composable
> >> > > >>>>> pieces
> >> > > >>>>>>>>> (group,
> >> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together.
> I’d
> >> > > >>>>> like
> >> > > >>>>>> to
> >> > > >>>>>>>>> hear
> >> > > >>>>>>>>>>> your
> >> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this
> >> > > >> optimization
> >> > > >>>>> with
> >> > > >>>>>>>> the
> >> > > >>>>>>>>>>> current
> >> > > >>>>>>>>>>>>> DSL, at the topology builder level.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> I think there will be scope for several such
> >> > > >> optimizations
> >> > > >>>>> in
> >> > > >>>>>> the
> >> > > >>>>>>>>>> future
> >> > > >>>>>>>>>>>>> and perhaps at some point we need to think about
> >> > > >>> decoupling
> >> > > >>>>> the
> >> > > >>>>>>>> 1:1
> >> > > >>>>>>>>>>> mapping
> >> > > >>>>>>>>>>>>> from the DSL into the physical topology.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Thanks
> >> > > >>>>>>>>>>>>> Eno
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
> >> > > >> j...@confluent.io>
> >> > > >>>>>> wrote:
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is
> >> > > >>> pretty
> >> > > >>>>>>>> common.
> >> > > >>>>>>>>> An
> >> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified
> >> > > >> customer
> >> > > >>>>>>>> profile"
> >> > > >>>>>>>>> use
> >> > > >>>>>>>>>>>>> case
> >> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen
> systems
> >> > > >>>>> each
> >> > > >>>>>> of
> >> > > >>>>>>>>> which
> >> > > >>>>>>>>>>> has
> >> > > >>>>>>>>>>>>>> some information about your customer (account
> details,
> >> > > >>>>>> settings,
> >> > > >>>>>>>>>>> billing
> >> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history,
> >> > > >> etc).
> >> > > >>>>> Your
> >> > > >>>>>>>> goal
> >> > > >>>>>>>>> is
> >> > > >>>>>>>>>>> to
> >> > > >>>>>>>>>>>>>> join/munge these into a single profile record for
> each
> >> > > >>>>>> customer
> >> > > >>>>>>>>> that
> >> > > >>>>>>>>>>> has
> >> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is
> >> > > >> up-to-date
> >> > > >>>>> with
> >> > > >>>>>>>> all
> >> > > >>>>>>>>> the
> >> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams
> as
> >> > > >> a
> >> > > >>>>>>>> sequence
> >> > > >>>>>>>>> of
> >> > > >>>>>>>>>>>>> joins
> >> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate
> >> > > >>>>> tables.
> >> > > >>>>>>>> But
> >> > > >>>>>>>>>>> clearly
> >> > > >>>>>>>>>>>>>> you only need a single stage to group all these
> things
> >> > > >>> that
> >> > > >>>>>> are
> >> > > >>>>>>>>>> already
> >> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this
> >> > > >>> under
> >> > > >>>>> the
> >> > > >>>>>>>>> covers
> >> > > >>>>>>>>>>>>> which
> >> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right
> >> > > >>> thing)
> >> > > >>>>> and
> >> > > >>>>>>>>>> perhaps
> >> > > >>>>>>>>>>> we
> >> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the
> >> > > >>>>>>>> partitioning
> >> > > >>>>>>>>> so
> >> > > >>>>>>>>>> we
> >> > > >>>>>>>>>>>>> may
> >> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they
> are
> >> > > >>>>>> already
> >> > > >>>>>>>>>>>>>> co-partitioned.
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> -Jay
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> >> > > >>>>>>>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it.
> >> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple
> >> > > >> other
> >> > > >>>>>>>> platforms
> >> > > >>>>>>>>>>> like
> >> > > >>>>>>>>>>>>>>> spark and pig.
> >> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> >> > > >>>>>>>>>>>>> PairRDDFunctions.html
> >> > > >>>>>>>>>>>>>>>
> https://www.tutorialspoint.com/apache_pig/apache_pig_
> >> > > >>>>>>>>>>>>> cogroup_operator.htm
> >> > > >>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
> >> > > >>>>> damian....@gmail.com>
> >> > > >>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>> Hi Kyle,
> >> > > >>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
> >> > > >> outer
> >> > > >>>>>> join?
> >> > > >>>>>>>> So
> >> > > >>>>>>>>> an
> >> > > >>>>>>>>>>>>> input
> >> > > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated
> >> > > >>> value
> >> > > >>>>> -
> >> > > >>>>>> is
> >> > > >>>>>>>>> that
> >> > > >>>>>>>>>>>>>>> correct?
> >> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
> >> > > >> current
> >> > > >>>>>> value,
> >> > > >>>>>>>>>>>>> aggregates
> >> > > >>>>>>>>>>>>>>>> and forwards the result.
> >> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit
> more,
> >> > > >>>>> but it
> >> > > >>>>>>>>> seems
> >> > > >>>>>>>>>>> like
> >> > > >>>>>>>>>>>>>>> it
> >> > > >>>>>>>>>>>>>>>> could be a useful optimization.
> >> > > >>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> >> > > >>>>>>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following description
> >> > > >> to
> >> > > >>> my
> >> > > >>>>>>>> KIP. If
> >> > > >>>>>>>>>>> this
> >> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more
> >> > > >>> time
> >> > > >>>>> to
> >> > > >>>>>>>>> build a
> >> > > >>>>>>>>>>>>>>>> diagram
> >> > > >>>>>>>>>>>>>>>>> and make more of a step by step description:
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> Example with Current API:
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
> >> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(
> >> > > >>>>>> initializer1
> >> > > >>>>>>>> ,
> >> > > >>>>>>>>>>>>>>>> aggregator1,
> >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
> >> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
> >> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(
> >> > > >>>>>> initializer2
> >> > > >>>>>>>> ,
> >> > > >>>>>>>>>>>>>>>> aggregator2,
> >> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
> >> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
> >> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(
> >> > > >>>>>> initializer3
> >> > > >>>>>>>> ,
> >> > > >>>>>>>>>>>>>>>> aggregator3,
> >> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
> >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> >> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
> >> > > >>>>> joinerOneTwoAndThree);
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores,
> requires
> >> > > >> 3
> >> > > >>>>>>>>>> initializers,
> >> > > >>>>>>>>>>>>>>> and 3
> >> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to
> user
> >> > > >> to
> >> > > >>>>>> define
> >> > > >>>>>>>>> what
> >> > > >>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3).
> >> > > >> They
> >> > > >>>>> are
> >> > > >>>>>>>> left
> >> > > >>>>>>>>>>> with a
> >> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all
> the
> >> > > >>>>> same
> >> > > >>>>>> as
> >> > > >>>>>>>> CG
> >> > > >>>>>>>>>> and
> >> > > >>>>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>> two
> >> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them
> >> > > >>>>>>>> intermediate
> >> > > >>>>>>>>>>> states
> >> > > >>>>>>>>>>>>>>>> such
> >> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
> >> > > >> joiners
> >> > > >>>>> use
> >> > > >>>>>>>> those
> >> > > >>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>> build
> >> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something
> the
> >> > > >>> user
> >> > > >>>>>>>> could
> >> > > >>>>>>>>>> avoid
> >> > > >>>>>>>>>>>>>>>>> thinking about with this KIP.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> >> > > >> will
> >> > > >>>>> first
> >> > > >>>>>>>> go
> >> > > >>>>>>>>>>> through
> >> > > >>>>>>>>>>>>>>> a
> >> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate
> from
> >> > > >>>>>>>> storeName1.
> >> > > >>>>>>>>>> It
> >> > > >>>>>>>>>>>>>>> will
> >> > > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate
> >> > > >>> value
> >> > > >>>>>> and
> >> > > >>>>>>>> get
> >> > > >>>>>>>>>>> sent
> >> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look
> >> > > >> up
> >> > > >>>>> the
> >> > > >>>>>>>>> current
> >> > > >>>>>>>>>>>>> value
> >> > > >>>>>>>>>>>>>>>> of
> >> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first
> joiner
> >> > > >> to
> >> > > >>>>>>>> calculate
> >> > > >>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>> second
> >> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an
> >> > > >>> additional
> >> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
> >> > > >>> current
> >> > > >>>>>>>> value of
> >> > > >>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>> key
> >> > > >>>>>>>>>>>>>>>> in
> >> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the
> >> > > >>> final
> >> > > >>>>>>>>> aggregate
> >> > > >>>>>>>>>>>>>>> value.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for
> incoming
> >> > > >>>>> topics
> >> > > >>>>>>>> you
> >> > > >>>>>>>>>> will
> >> > > >>>>>>>>>>>>> see
> >> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all
> >> > > >>> three
> >> > > >>>>>>>> stores
> >> > > >>>>>>>>>> are
> >> > > >>>>>>>>>>>>>>>> queried
> >> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates
> N
> >> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins,
> >> > > >> and
> >> > > >>>>> N-1
> >> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> Example with Proposed API:
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
> >> > > >>>>> builder.stream("topic1").
> >> > > >>>>>>>>>>>>>>> groupByKey();
> >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
> >> > > >>>>> builder.stream("topic2").
> >> > > >>>>>>>>>>>>>>> groupByKey();
> >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
> >> > > >>>>> builder.stream("topic3").
> >> > > >>>>>>>>>>>>>>> groupByKey();
> >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
> >> > > >>> grouped1.cogroup(initializer1,
> >> > > >>>>>>>>>>> aggregator1,
> >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
> >> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped2, aggregator2)
> >> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped3, aggregator3)
> >> > > >>>>>>>>>>>>>>>>>      .aggregate();
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore,
> requires 1
> >> > > >>>>>>>>> initializer,
> >> > > >>>>>>>>>>> and
> >> > > >>>>>>>>>>>>> 1
> >> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry
> about
> >> > > >>> the
> >> > > >>>>>>>>>>> intermediate
> >> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think
> about
> >> > > >>> is
> >> > > >>>>> how
> >> > > >>>>>>>> each
> >> > > >>>>>>>>>>>>> stream
> >> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> >> > > >> will
> >> > > >>>>> first
> >> > > >>>>>>>> go
> >> > > >>>>>>>>>>> through
> >> > > >>>>>>>>>>>>>>> a
> >> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate
> from
> >> > > >>>>>>>> storeName1.
> >> > > >>>>>>>>>> It
> >> > > >>>>>>>>>>>>>>> will
> >> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update
> the
> >> > > >>>>> store
> >> > > >>>>>>>> and
> >> > > >>>>>>>>>> pass
> >> > > >>>>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>> new
> >> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the
> >> > > >>>>>>>> KStreamCogroup
> >> > > >>>>>>>>>>> which
> >> > > >>>>>>>>>>>>>>> is
> >> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you
> >> > > >> are
> >> > > >>>>> done.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
> >> > > >> will
> >> > > >>>>> only
> >> > > >>>>>>>> every
> >> > > >>>>>>>>>>>>>>> create N
> >> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> >> > > >>>>>>>>>>>>> matth...@confluent.io
> >> > > >>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> Kyle,
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little
> slow,
> >> > > >>>>> but I
> >> > > >>>>>>>> could
> >> > > >>>>>>>>>> not
> >> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
> >> > > >>> concrete
> >> > > >>>>>>>> example,
> >> > > >>>>>>>>>>> like
> >> > > >>>>>>>>>>>>>>> 3
> >> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected
> result),
> >> > > >>> and
> >> > > >>>>>> show
> >> > > >>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement it
> >> > > >> and
> >> > > >>>>> the
> >> > > >>>>>>>>>> proposed
> >> > > >>>>>>>>>>>>>>> API?
> >> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to
> >> > > >> see
> >> > > >>>>> what
> >> > > >>>>>>>> store
> >> > > >>>>>>>>>>> calls
> >> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc.
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose,
> >> > > >> and
> >> > > >>> it
> >> > > >>>>>>>> would
> >> > > >>>>>>>>>> help
> >> > > >>>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>>>>> understand it better.
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> Thanks a lot!
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> -Matthias
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> >> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found
> here.
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
> >> > > >> classes
> >> > > >>>>> and
> >> > > >>>>>> get
> >> > > >>>>>>>>>> around
> >> > > >>>>>>>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api
> >> > > >> additions.
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
> >> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
> >> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
> >> > > >>>>>>>>> repartitionIfRequired
> >> > > >>>>>>>>>>>>>>>> method.
> >> > > >>>>>>>>>>>>>>>>> I
> >> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than
> >> > > >> one
> >> > > >>>>>>>> grouped
> >> > > >>>>>>>>>>> stream
> >> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
> >> > > >> name
> >> > > >>>>> that
> >> > > >>>>>>>>> someone
> >> > > >>>>>>>>>>>>>>> can
> >> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow
> it
> >> > > >> to
> >> > > >>>>> fall
> >> > > >>>>>>>> back
> >> > > >>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name?
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped
> tables?
> >> > > >>> This
> >> > > >>>>>>>> would
> >> > > >>>>>>>>> be
> >> > > >>>>>>>>>>>>>>>> pretty
> >> > > >>>>>>>>>>>>>>>>>> easy
> >> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session
> >> > > >>> stores
> >> > > >>>>>> and
> >> > > >>>>>>>>>>> windowed
> >> > > >>>>>>>>>>>>>>>>>> stores
> >> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and
> >> > > >>>>>>>>>>>>>>> KTableWindowAggregate
> >> > > >>>>>>>>>>>>>>>>>>> implementations.
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>>>>>>>>> Kyle
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> >> > > >>>>>>>>> eno.there...@gmail.com>
> >> > > >>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> >> > > >>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>> Eno
> >> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> >> > > >>>>>>>>> damian....@gmail.com>
> >> > > >>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle,
> >> > > >>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't
> >> > > >> had
> >> > > >>>>> the
> >> > > >>>>>>>>> chance
> >> > > >>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>>> look
> >> > > >>>>>>>>>>>>>>>>>> at
> >> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to
> look
> >> > > >>>>> into
> >> > > >>>>>> it
> >> > > >>>>>>>>>>>>>>> tomorrow.
> >> > > >>>>>>>>>>>>>>>>> For
> >> > > >>>>>>>>>>>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against
> kafka
> >> > > >>>>> trunk
> >> > > >>>>>>>> and
> >> > > >>>>>>>>>> mark
> >> > > >>>>>>>>>>>>>>> it
> >> > > >>>>>>>>>>>>>>>> as
> >> > > >>>>>>>>>>>>>>>>>>>> WIP?
> >> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have
> done.
> >> > > >>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>>>>>>>>>>> Damian
> >> > > >>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
> >> > > >> some
> >> > > >>>>>>>> attention
> >> > > >>>>>>>>> to
> >> > > >>>>>>>>>>> my
> >> > > >>>>>>>>>>>>>>>> KIP
> >> > > >>>>>>>>>>>>>>>>>> as
> >> > > >>>>>>>>>>>>>>>>>>>> I
> >> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
> >> > > >> This
> >> > > >>>>> is my
> >> > > >>>>>>>>> first
> >> > > >>>>>>>>>>> KIP
> >> > > >>>>>>>>>>>>>>>> and
> >> > > >>>>>>>>>>>>>>>>>> my
> >> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so
> I'm
> >> > > >>>>> sure I
> >> > > >>>>>>>> did
> >> > > >>>>>>>>>>>>>>>> something
> >> > > >>>>>>>>>>>>>>>>>>>> wrong.
> >> > > >>>>>>>>>>>>>>>>>>>>>> ;)
> >> > > >>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com>
> >> > > >>>>>>>>>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all,
> >> > > >>>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
> >> > > >> discussion
> >> > > >>>>> about
> >> > > >>>>>>>>> adding
> >> > > >>>>>>>>>>>>>>>>> cogroup
> >> > > >>>>>>>>>>>>>>>>>> to
> >> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
> >> > > >>>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
> >> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> >> > > >>>>>> confluence/display/KAFKA/KIP-
> >> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> >> > > >>>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here:
> >> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> >> > > >>>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
> >> > > >>>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> --
> >> > > >>>>>>> -- Guozhang
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> --
> >> > > >>>>>> -- Guozhang
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> --
> >> > > >>>>> -- Guozhang
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to