Damian Guy, could you let me know if you plan to review this further? There
is no rush, but if you dont have any additional comments I could start the
voting and finish my WIP PR.

Thanks,
Kyle

On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> wrote:

> Eno, is there anyone else that is an expert in the kafka streams realm
> that I should reach out to for input?
>
> I believe Damian Guy is still planning on reviewing this more in depth so
> I will wait for his inputs before continuing.
>
> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
>
>> Thanks Kyle, good arguments.
>>
>> Eno
>>
>> > On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com>
>> wrote:
>> >
>> > *- minor: could you add an exact example (similar to what Jay’s example
>> is,
>> > or like your Spark/Pig pointers had) to make this super concrete?*
>> > I have added a more concrete example to the KIP.
>> >
>> > *- my main concern is that we’re exposing this optimization to the DSL.
>> In
>> > an ideal world, an optimizer would take the existing DSL and do the
>> right
>> > thing under the covers (create just one state store, arrange the nodes
>> > etc). The original DSL had a bunch of small, composable pieces (group,
>> > aggregate, join) that this proposal groups together. I’d like to hear
>> your
>> > thoughts on whether it’s possible to do this optimization with the
>> current
>> > DSL, at the topology builder level.*
>> > You would have to make a lot of checks to understand if it is even
>> possible
>> > to make this optimization:
>> > 1. Make sure they are all KTableKTableOuterJoins
>> > 2. None of the intermediate KTables are used for anything else.
>> > 3. None of the intermediate stores are used. (This may be impossible
>> > especially if they use KafkaStreams#store after the topology has already
>> > been built.)
>> > You would then need to make decisions during the optimization:
>> > 1. Your new initializer would the composite of all the individual
>> > initializers and the valueJoiners.
>> > 2. I am having a hard time thinking about how you would turn the
>> > aggregators and valueJoiners into an aggregator that would work on the
>> > final object, but this may be possible.
>> > 3. Which state store would you use? The ones declared would be for the
>> > aggregate values. None of the declared ones would be guaranteed to hold
>> the
>> > final object. This would mean you must created a new state store and not
>> > created any of the declared ones.
>> >
>> > The main argument I have against it is even if it could be done I don't
>> > know that we would want to have this be an optimization in the
>> background
>> > because the user would still be required to think about all of the
>> > intermediate values that they shouldn't need to worry about if they only
>> > care about the final object.
>> >
>> > In my opinion cogroup is a common enough case that it should be part of
>> the
>> > composable pieces (group, aggregate, join) because we want to allow
>> people
>> > to join more than 2 or more streams in an easy way. Right now I don't
>> think
>> > we give them ways of handling this use case easily.
>> >
>> > *-I think there will be scope for several such optimizations in the
>> future
>> > and perhaps at some point we need to think about decoupling the 1:1
>> mapping
>> > from the DSL into the physical topology.*
>> > I would argue that cogroup is not just an optimization it is a new way
>> for
>> > the users to look at accomplishing a problem that requires multiple
>> > streams. I may sound like a broken record but I don't think users should
>> > have to build the N-1 intermediate tables and deal with their
>> initializers,
>> > serdes and stores if all they care about is the final object.
>> > Now if for example someone uses cogroup but doesn't supply additional
>> > streams and aggregators this case is equivalent to a single grouped
>> stream
>> > making an aggregate call. This case is what I view an optimization as,
>> we
>> > could remove the KStreamCogroup and act as if there was just a call to
>> > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
>> > would prefer to just write a warning saying that this is not how
>> cogroup is
>> > to be used.)
>> >
>> > Thanks,
>> > Kyle
>> >
>> > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>> >
>> >> Hi Kyle,
>> >>
>> >> Thanks for the KIP again. A couple of comments:
>> >>
>> >> - minor: could you add an exact example (similar to what Jay’s example
>> is,
>> >> or like your Spark/Pig pointers had) to make this super concrete?
>> >>
>> >> - my main concern is that we’re exposing this optimization to the DSL.
>> In
>> >> an ideal world, an optimizer would take the existing DSL and do the
>> right
>> >> thing under the covers (create just one state store, arrange the nodes
>> >> etc). The original DSL had a bunch of small, composable pieces (group,
>> >> aggregate, join) that this proposal groups together. I’d like to hear
>> your
>> >> thoughts on whether it’s possible to do this optimization with the
>> current
>> >> DSL, at the topology builder level.
>> >>
>> >> I think there will be scope for several such optimizations in the
>> future
>> >> and perhaps at some point we need to think about decoupling the 1:1
>> mapping
>> >> from the DSL into the physical topology.
>> >>
>> >> Thanks
>> >> Eno
>> >>
>> >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
>> >>>
>> >>> I haven't digested the proposal but the use case is pretty common. An
>> >>> example would be the "customer 360" or "unified customer profile" use
>> >> case
>> >>> we often use. In that use case you have a dozen systems each of which
>> has
>> >>> some information about your customer (account details, settings,
>> billing
>> >>> info, customer service contacts, purchase history, etc). Your goal is
>> to
>> >>> join/munge these into a single profile record for each customer that
>> has
>> >>> all the relevant info in a usable form and is up-to-date with all the
>> >>> source systems. If you implement that with kstreams as a sequence of
>> >> joins
>> >>> i think today we'd fully materialize N-1 intermediate tables. But
>> clearly
>> >>> you only need a single stage to group all these things that are
>> already
>> >>> co-partitioned. A distributed database would do this under the covers
>> >> which
>> >>> is arguably better (at least when it does the right thing) and
>> perhaps we
>> >>> could do the same thing but I'm not sure we know the partitioning so
>> we
>> >> may
>> >>> need an explicit cogroup command that impllies they are already
>> >>> co-partitioned.
>> >>>
>> >>> -Jay
>> >>>
>> >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
>> winkelman.k...@gmail.com
>> >>>
>> >>> wrote:
>> >>>
>> >>>> Yea thats a good way to look at it.
>> >>>> I have seen this type of functionality in a couple other platforms
>> like
>> >>>> spark and pig.
>> >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
>> >> PairRDDFunctions.html
>> >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
>> >> cogroup_operator.htm
>> >>>>
>> >>>>
>> >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote:
>> >>>>
>> >>>>> Hi Kyle,
>> >>>>>
>> >>>>> If i'm reading this correctly it is like an N way outer join? So an
>> >> input
>> >>>>> on any stream will always produce a new aggregated value - is that
>> >>>> correct?
>> >>>>> Effectively, each Aggregator just looks up the current value,
>> >> aggregates
>> >>>>> and forwards the result.
>> >>>>> I need to look into it and think about it a bit more, but it seems
>> like
>> >>>> it
>> >>>>> could be a useful optimization.
>> >>>>>
>> >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
>> winkelman.k...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> I sure can. I have added the following description to my KIP. If
>> this
>> >>>>>> doesn't help let me know and I will take some more time to build a
>> >>>>> diagram
>> >>>>>> and make more of a step by step description:
>> >>>>>>
>> >>>>>> Example with Current API:
>> >>>>>>
>> >>>>>> KTable<K, V1> table1 =
>> >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
>> >>>>> aggregator1,
>> >>>>>> aggValueSerde1, storeName1);
>> >>>>>> KTable<K, V2> table2 =
>> >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
>> >>>>> aggregator2,
>> >>>>>> aggValueSerde2, storeName2);
>> >>>>>> KTable<K, V3> table3 =
>> >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
>> >>>>> aggregator3,
>> >>>>>> aggValueSerde3, storeName3);
>> >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
>> >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
>> >>>>>>
>> >>>>>> As you can see this creates 3 StateStores, requires 3 initializers,
>> >>>> and 3
>> >>>>>> aggValueSerdes. This also adds the pressure to user to define what
>> the
>> >>>>>> intermediate values are going to be (V1, V2, V3). They are left
>> with a
>> >>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and
>> >> the
>> >>>>> two
>> >>>>>> joiners are more like mergers, or second make them intermediate
>> states
>> >>>>> such
>> >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to
>> >>>> build
>> >>>>>> the final aggregate CG value. This is something the user could
>> avoid
>> >>>>>> thinking about with this KIP.
>> >>>>>>
>> >>>>>> When a new input arrives lets say at "topic1" it will first go
>> through
>> >>>> a
>> >>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It
>> >>>> will
>> >>>>>> produce this in the form of the first intermediate value and get
>> sent
>> >>>>>> through a KTableKTableOuterJoin where it will look up the current
>> >> value
>> >>>>> of
>> >>>>>> the key in storeName2. It will use the first joiner to calculate
>> the
>> >>>>> second
>> >>>>>> intermediate value, which will go through an additional
>> >>>>>> KTableKTableOuterJoin. Here it will look up the current value of
>> the
>> >>>> key
>> >>>>> in
>> >>>>>> storeName3 and use the second joiner to build the final aggregate
>> >>>> value.
>> >>>>>>
>> >>>>>> If you think through all possibilities for incoming topics you will
>> >> see
>> >>>>>> that no matter which topic it comes in through all three stores are
>> >>>>> queried
>> >>>>>> and all of the joiners must get used.
>> >>>>>>
>> >>>>>> Topology wise for N incoming streams this creates N
>> >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
>> >>>>>> KTableKTableJoinMergers.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> Example with Proposed API:
>> >>>>>>
>> >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
>> >>>> groupByKey();
>> >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
>> >>>> groupByKey();
>> >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
>> >>>> groupByKey();
>> >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
>> aggregator1,
>> >>>>>> aggValueSerde1, storeName1)
>> >>>>>>       .cogroup(grouped2, aggregator2)
>> >>>>>>       .cogroup(grouped3, aggregator3)
>> >>>>>>       .aggregate();
>> >>>>>>
>> >>>>>> As you can see this creates 1 StateStore, requires 1 initializer,
>> and
>> >> 1
>> >>>>>> aggValueSerde. The user no longer has to worry about the
>> intermediate
>> >>>>>> values and the joiners. All they have to think about is how each
>> >> stream
>> >>>>>> impacts the creation of the final CG object.
>> >>>>>>
>> >>>>>> When a new input arrives lets say at "topic1" it will first go
>> through
>> >>>> a
>> >>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It
>> >>>> will
>> >>>>>> add its incoming object to the aggregate, update the store and pass
>> >> the
>> >>>>> new
>> >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup
>> which
>> >>>> is
>> >>>>>> pretty much just a pass through processor and you are done.
>> >>>>>>
>> >>>>>> Topology wise for N incoming streams the new api will only every
>> >>>> create N
>> >>>>>> KStreamAggregates and 1 KStreamCogroup.
>> >>>>>>
>> >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
>> >> matth...@confluent.io
>> >>>>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Kyle,
>> >>>>>>>
>> >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could
>> not
>> >>>>>>> follow completely. Could you maybe add a more concrete example,
>> like
>> >>>> 3
>> >>>>>>> streams with 3 records each (plus expected result), and show the
>> >>>>>>> difference between current way to to implement it and the proposed
>> >>>> API?
>> >>>>>>> This could also cover the internal processing to see what store
>> calls
>> >>>>>>> would be required for both approaches etc.
>> >>>>>>>
>> >>>>>>> I think, it's pretty advanced stuff you propose, and it would
>> help to
>> >>>>>>> understand it better.
>> >>>>>>>
>> >>>>>>> Thanks a lot!
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> -Matthias
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
>> >>>>>>>> I have made a pull request. It can be found here.
>> >>>>>>>>
>> >>>>>>>> https://github.com/apache/kafka/pull/2975
>> >>>>>>>>
>> >>>>>>>> I plan to write some more unit tests for my classes and get
>> around
>> >>>> to
>> >>>>>>>> writing documentation for the public api additions.
>> >>>>>>>>
>> >>>>>>>> One thing I was curious about is during the
>> >>>>>>> KCogroupedStreamImpl#aggregate
>> >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
>> >>>>> method.
>> >>>>>> I
>> >>>>>>>> can't supply the store name because if more than one grouped
>> stream
>> >>>>>>>> repartitions an error is thrown. Is there some name that someone
>> >>>> can
>> >>>>>>>> recommend or should I leave the null and allow it to fall back to
>> >>>> the
>> >>>>>>>> KGroupedStream.name?
>> >>>>>>>>
>> >>>>>>>> Should this be expanded to handle grouped tables? This would be
>> >>>>> pretty
>> >>>>>>> easy
>> >>>>>>>> for a normal aggregate but one allowing session stores and
>> windowed
>> >>>>>>> stores
>> >>>>>>>> would required KTableSessionWindowAggregate and
>> >>>> KTableWindowAggregate
>> >>>>>>>> implementations.
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Kyle
>> >>>>>>>>
>> >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com>
>> >>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> I’ll look as well asap, sorry, been swamped.
>> >>>>>>>>>
>> >>>>>>>>> Eno
>> >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com>
>> >>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Kyle,
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance
>> to
>> >>>>> look
>> >>>>>>> at
>> >>>>>>>>>> the KIP yet, but will schedule some time to look into it
>> >>>> tomorrow.
>> >>>>>> For
>> >>>>>>>>> the
>> >>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark
>> >>>> it
>> >>>>> as
>> >>>>>>>>> WIP?
>> >>>>>>>>>> It will be easier to review what you have done.
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks,
>> >>>>>>>>>> Damian
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
>> >>>>> winkelman.k...@gmail.com
>> >>>>>>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> I am replying to this in hopes it will draw some attention to
>> my
>> >>>>> KIP
>> >>>>>>> as
>> >>>>>>>>> I
>> >>>>>>>>>>> haven't heard from anyone in a couple days. This is my first
>> KIP
>> >>>>> and
>> >>>>>>> my
>> >>>>>>>>>>> first large contribution to the project so I'm sure I did
>> >>>>> something
>> >>>>>>>>> wrong.
>> >>>>>>>>>>> ;)
>> >>>>>>>>>>>
>> >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
>> >>>>> winkelman.k...@gmail.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hello all,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding
>> >>>>>> cogroup
>> >>>>>>> to
>> >>>>>>>>>>>> the streams DSL.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Please find the KIP here:
>> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Please find my initial implementation here:
>> >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>> Kyle Winkelman
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>
>> >>
>>
>>

Reply via email to