Hi Gouzhang,

Matthias and I did talk about overloading different a type of aggregate
methods in the cogroup that would take in the windows and returns a
windowed KTable. We decided that it would break too much with the current
pattern that was established in the normal KStream. We can revisit this if
you have a different opinion on the tradeoff.

Walker

On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Walker,
>
> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > Hi Guozhang,
> >
> > 1. I am familiar with the cogroup of spark, it is very similar to
> > their join operator but instead it makes the values iterable. I think
> that
> > the use cases are different enough that it makes sense to specify the
> > aggregator when we do.
> >
> > I like the idea of "absorb" and I think it could be useful. Although I do
> > not think it is as intuitive.
> >
> > If we were to go that route we would either use more processors or do
> > essentially the same thing but would have to store the information
> > required to cogroup inside that KTable. I think this would violate some
> > design principles. I would argue that we should consider adding absorb as
> > well and auto re-write it to use cogroup.
> >
>
> Yeah I think I agree with you about the internal design complexity with
> "absorb"; I was primarily thinking if we can save ourselves from
> introducing 3 more public classes with co-group. But it seems that without
> introducing new classes there's no easy way for us to bound the scope of
> co-grouping (like how many streams will be co-grouped together).
>
> LMK if you have some better ideas: generally speaking the less new public
> interfaces we are introducing to fulfill a new feature the better, so I'd
> push us to think twice and carefully before we go down the route.
>
>
> >
> > 2. We have not considered this thought that would be a convenient
> > operation.
> >
> > 3. There is only one processor made. We are actually having the naming
> > conversation right now in the above thread
> >
> > 4, 5. fair points
> >
> > Walker
> >
> > On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Walker, thanks for the KIP! I made a pass on the writeup and have
> some
> > > comments below:
> > >
> > > Meta:
> > >
> > > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> > with
> > > Spark's co-group syntax (I know they are targeting for different use
> > cases,
> > > but wondering if their syntax is closer to the join operator), what are
> > the
> > > syntax / semantics trade-off here?
> > >
> > > Just playing a devil's advocate here, if the main motivation is to
> > provide
> > > a more convienent multi-way join syntax, and in order to only have one
> > > materialized store we need to specify the final joined format at the
> > > beginning, then what about the following alternative (with the given
> > > example in your wiki page):
> > >
> > >
> > > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
> > > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
> > > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
> > >
> > > KTable<K, CG> aggregated = grouped1.aggregate(initializer,
> materialized,
> > > aggregator1);
> > >
> > > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> > name
> > > on top of my head here
> > >                   .absorb(grouped3, aggregator3);
> > >
> > > In this way, we just add a new API to the KTable to "absorb" new
> streams
> > as
> > > aggregated results without needing to introduce new first citizen
> > classes.
> > >
> > > 2. From the DSL optimization, have we considered if we can auto
> re-write
> > > the user written old fashioned multi-join into this new DSL operator?
> > >
> > > 3. Although it is not needed for the wiki page itself, for internal
> > > implementation how many processor nodes would we create for the new
> > > operator, and how we can allow users to name them?
> > >
> > > Minor:
> > >
> > > 4. In "Public Interfaces", better add the templated generics to
> > > "KGroupedStream" as "KGroupedStream<K, V>".
> > >
> > > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > > e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Walker,
> > > >
> > > > I am not sure if I can follow your argument. What do you exactly mean
> > by
> > > >
> > > > > I also
> > > > >> think that in this case it would be better to separate the 2
> option
> > > out
> > > > >> into separate overloads.
> > > >
> > > > Maybe you can give an example what method signature you have in mind?
> > > >
> > > > >> We could take a named parameter from upstream or add an extra
> naming
> > > > option
> > > > >> however I don't really see the advantage that would give.
> > > >
> > > > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > > > for all Processors. This makes it hard to reason about a Topology if
> > > > it's getting complex. Adding `Named` to the new co-group operator
> would
> > > > actually align with KIP-307.
> > > >
> > > > > It seems to go in
> > > > >> the opposite direction from the cogroup configuration idea you
> > > proposed.
> > > >
> > > > Can you elaborate? Not sure if I can follow.
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > > > While I like the idea Sophie I don't think that it is necessary. I
> > also
> > > > > think that in this case it would be better to separate the 2 option
> > out
> > > > > into separate overloads.
> > > > > We could take a named parameter from upstream or add an extra
> naming
> > > > option
> > > > > however I don't really see the advantage that would give. It seems
> to
> > > go
> > > > in
> > > > > the opposite direction from the cogroup configuration idea you
> > > proposed.
> > > > >
> > > > > John, I think it could be both. It depends on when you aggregate
> and
> > > what
> > > > > kind of data you have. In the example it is aggregating before
> > joining,
> > > > > there are probably some cases where you could join before
> > aggregating.
> > > > IMHO
> > > > > it would be easier to group all the streams together then perform
> the
> > > one
> > > > > operation that results in a single KTable.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> > > sop...@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > >>> I can personally not see any need to add other configuration
> > > > >> Famous last words?
> > > > >>
> > > > >> Just kidding, 95% confidence is more than enough to  me (and
> better
> > to
> > > > >> optimize for current
> > > > >> design than for hypothetical future changes).
> > > > >>
> > > > >> One last question I have then is about the
> > operator/store/repartition
> > > > >> naming -- seems like
> > > > >> we can name the underlying store/changelog through the
> Materialized
> > > > >> parameter, but do we
> > > > >> also want to include an overload taking a Named parameter for the
> > > > operator
> > > > >> name (as in the
> > > > >> KTable#join variations)?
> > > > >>
> > > > >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> > > matth...@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >>> Interesting idea, Sophie.
> > > > >>>
> > > > >>> So far, we tried to reuse existing config objects and only add
> new
> > > ones
> > > > >>> when needed to avoid creating "redundant" classes. This is of
> > course
> > > a
> > > > >>> reactive approach (with the drawback to deprecate stuff if we
> > change
> > > > it,
> > > > >>> as you described).
> > > > >>>
> > > > >>> I can personally not see any need to add other configuration
> > > parameters
> > > > >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has
> > > only
> > > > a
> > > > >>> single state store that we need to configure, and reusing
> > > > `Materialized`
> > > > >>> seems to be appropriate.
> > > > >>>
> > > > >>> Also note, that the `Initializer` is a mandatory parameter and
> not
> > a
> > > > >>> configuration and should be passed directly, and not via a
> > > > configuration
> > > > >>> object.
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> > > > >>>> Thanks for the explanation, makes sense to me! As for the API,
> one
> > > > >> other
> > > > >>>> thought I had is might we ever want or need to introduce any
> other
> > > > >>> configs
> > > > >>>> or parameters in the future? Obviously that's difficult to say
> now
> > > (or
> > > > >>>> maybe the
> > > > >>>> answer seems obviously "no") but we seem to often end up needing
> > to
> > > > add
> > > > >>> new
> > > > >>>> overloads and/or deprecate old ones as new features or
> > requirements
> > > > >> come
> > > > >>>> into
> > > > >>>> play.
> > > > >>>>
> > > > >>>> What do you (and others?) think about wrapping the config
> > parameters
> > > > >> (ie
> > > > >>>> everything
> > > > >>>> except the actual grouped streams) in a new config object? For
> > > > example,
> > > > >>> the
> > > > >>>> CogroupedStream#aggregate field could take a single Cogrouped
> > > object,
> > > > >>>> which itself would have an initializer and a materialized. If we
> > > ever
> > > > >>> need
> > > > >>>> to add
> > > > >>>> a new parameter, we can just add it to the Cogrouped class.
> > > > >>>>
> > > > >>>> Also, will the backing store be available for IQ if a
> Materialized
> > > is
> > > > >>>> passed in?
> > > > >>>>
> > > > >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> > > > wcarl...@confluent.io
> > > > >>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Hi Sophie,
> > > > >>>>>
> > > > >>>>> Thank you for your comments. As for the different methods
> > > signatures
> > > > I
> > > > >>> have
> > > > >>>>> not really considered any other options but  while I do agree
> it
> > is
> > > > >>>>> confusing, I don't see any obvious solutions. The problem is
> that
> > > the
> > > > >>>>> cogroup essentially pairs a group stream with an aggregator and
> > > when
> > > > >> it
> > > > >>> is
> > > > >>>>> first made the method is called on a groupedStream already.
> > However
> > > > >> each
> > > > >>>>> subsequent stream-aggregator pair is added on to a cogroup
> stream
> > > so
> > > > >> it
> > > > >>>>> needs both arguments.
> > > > >>>>>
> > > > >>>>> For the second question you should not need a joiner. The idea
> is
> > > > that
> > > > >>> you
> > > > >>>>> can collect many grouped streams with overlapping key spaces
> and
> > > any
> > > > >>> kind
> > > > >>>>> of value types. Once aggregated its value will be reduced into
> > one
> > > > >> type.
> > > > >>>>> This is why you need only one initializer. Each aggregator will
> > > need
> > > > >> to
> > > > >>>>> integrate the new value with the new object made in the
> > > initializer.
> > > > >>>>> Does that make sense?
> > > > >>>>>
> > > > >>>>> This is a good question and I will include this explanation in
> > the
> > > > kip
> > > > >>> as
> > > > >>>>> well.
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>> Walker
> > > > >>>>>
> > > > >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> > > > >>> sop...@confluent.io>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hey Walker,
> > > > >>>>>>
> > > > >>>>>> Thanks for the KIP! I have just a couple of questions:
> > > > >>>>>>
> > > > >>>>>> 1) It seems a little awkward to me that with the current API,
> we
> > > > >> have a
> > > > >>>>>> nearly identical
> > > > >>>>>> "add stream to cogroup" method, except for the first which
> has a
> > > > >>>>> different
> > > > >>>>>> signature
> > > > >>>>>> (ie the first stream is joined as stream.cogroup(Aggregator)
> > while
> > > > >> the
> > > > >>>>>> subsequent ones
> > > > >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure
> what
> > it
> > > > >>> would
> > > > >>>>>> look like exactly,
> > > > >>>>>> but I was just wondering if you'd considered and/or rejected
> any
> > > > >>>>>> alternative APIs?
> > > > >>>>>>
> > > > >>>>>> 2) This might just be my lack of familiarity with "cogroup"
> as a
> > > > >>> concept,
> > > > >>>>>> but with the
> > > > >>>>>> current (non-optimal) API the user seems to have some control
> > over
> > > > >> how
> > > > >>>>>> exactly
> > > > >>>>>> the different streams are joined through the ValueJoiners.
> Would
> > > > this
> > > > >>> new
> > > > >>>>>> cogroup
> > > > >>>>>> simply concatenate the values from the different cogroup
> > streams,
> > > or
> > > > >>>>> could
> > > > >>>>>> users
> > > > >>>>>> potentially pass some kind of Joiner to the cogroup/aggregate
> > > > >> methods?
> > > > >>>>> Or,
> > > > >>>>>> is the
> > > > >>>>>> whole point of cogroups that you no longer ever need to
> specify
> > a
> > > > >>> Joiner?
> > > > >>>>>> If so, you
> > > > >>>>>> should add a short line to the KIP explaining that for those
> of
> > us
> > > > >> who
> > > > >>>>>> aren't fluent
> > > > >>>>>> in cogroup semantics :)
> > > > >>>>>>
> > > > >>>>>> Cheers,
> > > > >>>>>> Sophie
> > > > >>>>>>
> > > > >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> > > > >> wcarl...@confluent.io>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Good catch I updated that.
> > > > >>>>>>>
> > > > >>>>>>> I have made a PR for this KIP
> > > > >>>>>>>
> > > > >>>>>>> I then am splitting it into 3 parts, first cogroup for a
> > > key-value
> > > > >>>>> store
> > > > >>>>>> (
> > > > >>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for
> a
> > > > >>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring
> > > > >>>>>> partitioning.
> > > > >>>>>>>
> > > > >>>>>>> Walker
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> > > > >>>>> matth...@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Walker,
> > > > >>>>>>>>
> > > > >>>>>>>> thanks for picking up the KIP and reworking it for the
> changed
> > > > API.
> > > > >>>>>>>>
> > > > >>>>>>>> Overall, the updated API suggestions make sense to me. The
> > seem
> > > to
> > > > >>>>>> align
> > > > >>>>>>>> quite nicely with our current API design.
> > > > >>>>>>>>
> > > > >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type
> > parameter
> > > > of
> > > > >>>>>>>> `Materialized` should be `V`, not `VR`?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > > >>>>>>>>> here
> > > > >>>>>>>>> is a link
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> > > > >>>>>> wcarl...@confluent.io>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hello all,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I have picked up and updated KIP-150. Due to changes to
> the
> > > > >>>>> project
> > > > >>>>>>>> since
> > > > >>>>>>>>>> KIP #150 was written there are a few items that need to be
> > > > >>>>> updated.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> First item that changed is the adoption of the
> Materialized
> > > > >>>>>> parameter.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The second item is the WindowedBy. How the old KIP handles
> > > > >>>>> windowing
> > > > >>>>>>> is
> > > > >>>>>>>>>> that it overloads the aggregate function to take in a
> Window
> > > > >>>>> object
> > > > >>>>>> as
> > > > >>>>>>>> well
> > > > >>>>>>>>>> as the other parameters. The current practice to window
> > > > >>>>>>> grouped-streams
> > > > >>>>>>>> is
> > > > >>>>>>>>>> to call windowedBy and receive a windowed stream object.
> The
> > > > >>>>>> existing
> > > > >>>>>>>>>> interface for a windowed stream made from a grouped stream
> > > will
> > > > >>>>> not
> > > > >>>>>>> work
> > > > >>>>>>>>>> for cogrouped streams. Hence, we have to make new
> interfaces
> > > for
> > > > >>>>>>>> cogrouped
> > > > >>>>>>>>>> windowed streams.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Please take a look, I would like to hear your feedback,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Walker
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to