Thanks Shay! You and Matthias have convinced me, I'm happy with the current
proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote:

> Hi Sophie and Matthias, thanks for your comments and replies.
>
> 1. Scope of change: KStreams only or KStreams/KTable
> I took some time to digest your points, looking through how KStreams
> triggers repartitions today. I noticed that `repartitionRequired`is a flag
> in KStreamImpl etc and not in KTableImpl etc. When I look further, in the
> case of KTable, instead of passing in a boolean flag, a repartition node `
> TableRepartitionMapNode` is directly created. I went back and referenced
> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
> focused on KStreams, i.e. not to change the partition why the input streams
> are already correctly keyed. Is it possible that in the case of KTable,
> users always intend to repartition (change key) when they call on
> aggregate? -- (this was written before I saw Matthias's comment)
>
> Overall, based on the tickets, I see the benefit of doing a contained
> change focusing on KStreams, i.e. repartitionRequired, which would solve
> the pain points nicely. If we ran into similar complaints/optimization
> requests for KTable down the line, we can address them on top of this(let
> me know if we have these requests already, I might just be negligent).
>
> 2. API: markAsPartitioned() vs config
> If we go with the KStreams only scope, markAsPartition() is more
> adequate, i.e. maps nicely to repartitionRequired. There is a list of
> NamedOperations that may or may not trigger repartition based on its
> context(KStreams or KTable) which would make the implementation more
> confusing.
>
> 3. KIP documentation: Thanks for providing the links to previous KIPs. I
> will be adding the three use cases and javadoc. I will also document the
> risks when it relates to IQ and Join.
>
> Best,
> Shay
>
> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > I agree that it could easily be misused. There is a few Jira tickets for
> > cases when people want to "cancel" a repartition step. I would hope
> > those tickets are linked to the KIP (if not, we should do this, and
> > maybe even c&p those cases as motivation into the KIP itself)?
> >
> > It's always a tricky question to what extend we want to guide users, and
> > to what extend we need to give levers for advances case (and how to
> > design those levers...) It's for sure a good idea to call out "use with
> > case" in the JavaDocs for the new method.
> >
> >
> > -Matthias
> >
> > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> > > I guess I felt a bit uneasy about how this could be used/abused while
> > > reading the KIP, but if we truly believe this is an advanced feature,
> I'm
> > > fine with the way things currently are. It doesn't feel like the best
> > API,
> > > but it does seem to be the best *possible* API given the way things
> are.
> > >
> > > W.r.t the KTable notes, that all makes sense to me. I just wanted to
> lay
> > > out all the potential cases to make sure we had our bases covered.
> > >
> > > I still think an example or two would help, but the only thing I will
> > > actually wait on before feeling comfortable enough to vote on this
> would
> > be
> > > a clear method signature (and maybe sample javadocs) in the "Public
> > > Interfaces" section.
> > >
> > > Thanks again for the KIP Shay! Hope I haven't dragged it out too much
> > >
> > > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Some thought about the API question.
> > >>
> > >>
> > >>>> A. kstream.groupBy(...).aggregate(...)
> > >>
> > >> This can be re-writtten as
> > >>
> > >> kstream.selectKey(...)
> > >>          .markAsRepartitioned()
> > >>          .groupByKey()
> > >>          .aggregate()
> > >>
> > >> Given that `markAsRepartitoned` is an advanced feature, I think it
> would
> > >> be ok?
> > >>
> > >>
> > >>>> B. ktable.groupBy(...).aggregate(...)
> > >>
> > >> For KTable aggregation, not sure how useful it would be? In the end,
> an
> > >> table aggregation does only make sense if we pick something from the
> > >> value, ie, we indeed change the key?
> > >>
> > >>
> > >>>> C. kstream.selectKey(...).join(ktable)
> > >>
> > >> We can just insert a `markAsRepartitioned()` after `selectKey` to
> avoid
> > >> repartitioning of the left input KStream.
> > >>
> > >>
> > >>> KStream.selectKey(...).toTable().join(...)
> > >>
> > >> Not sure if I understand what you try to say with this example? In the
> > >> end, `selectKey(...).toTable()` would repartiton. If I know that one
> can
> > >> upsert directly, one inserts a `markAsRepartitioned()` in between.
> > >>
> > >>
> > >> In general, the use case seems to be that the key is not in the right
> > >> "format", or there is no key, but data was partitioned by a
> > >> value-attribute upstream and we just want to extract this
> > >> value-attribute into the key. Both seems to be KStream cases?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
> > >>> Hey Shay, while I don't have any specific concerns about the new
> public
> > >> API
> > >>> in this KIP, I'd like to better understand how this feature will work
> > >>> before I vote. We should document the behavior of this new operator
> > >> clearly
> > >>> in the KIP as well -- you don't necessarily need to write the
> complete
> > >>> javadocs up front, but it should be possible for a user to read the
> KIP
> > >> and
> > >>> then understand how this feature will work and how they would need to
> > >> apply
> > >>> it.
> > >>>
> > >>> To that end, I recommend framing this proposal with a few examples to
> > >> help
> > >>> clarify the semantics. When and where can you apply the
> > >> markAsPartitioned()
> > >>> operator? Some suggestions below.
> > >>>
> > >>> Specific notes:
> > >>>
> > >>> 1. The KIP opens with "Each key changing operation in Kafka Streams
> > >>> (selectKey, map, transform, etc.) now leads to automatic repartition
> > >> before
> > >>> an aggregation." We should change "aggregation" to "stateful
> operation"
> > >> as
> > >>> this is true for things like joins as well as aggregations
> > >>> 2. The callout on IQ makes me a bit uncomfortable -- basically it
> says
> > >> this
> > >>> should not be a concern "if we use markAsPartitioned correctly". Does
> > >> this
> > >>> mean if we, the devs implementing this, write the feature correctly?
> Or
> > >> is
> > >>> it saying that this won't be a problem as long as "we", the users of
> > this
> > >>> feature, use it correctly"? Just wondering if you've put any thought
> > into
> > >>> how this would work yet (I personally have not)
> > >>> 3. The KIP should lay out the proposed API exactly, even if there's
> > only
> > >>> one new method. Check out this KIP
> > >>> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > >>>
> > >>> (or this KIP
> > >>> <
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
> > >>> )
> > >>> for a good reference on what the Public Interfaces section should
> > include
> > >>> 4. Regarding the proposed API itself, I wonder if KStream is really
> the
> > >>> most appropriate interface for the new operator. A repartition can be
> > >>> triggered on just a KTable. Here's where some examples would help.
> > >> Perhaps
> > >>> we could focus on these three cases:
> > >>>
> > >>> A. kstream.groupBy(...).aggregate(...)
> > >>> B. ktable.groupBy(...).aggregate(...)
> > >>> C. kstream.selectKey(...).join(ktable)
> > >>>
> > >>> I'm sure someone will correct me if I'm missing any additional vital
> > >>> examples, but at the very least, these are the three to consider:
> > either
> > >> a
> > >>> KStream or KTable followed by a groupBy/aggregation, or a KStream
> with
> > >>> key-changing operator followed by a join. Note that you could have
> > >>> something like KStream.selectKey(...).toTable().join(...) as well,
> but
> > >>> since there are no pure key-changing operators (like #selectKey) on
> > >>> KTables, only groupBy() which must always be followed by aggregation,
> > >> this
> > >>> 4th case can be reduced to an example like C of a KStream with
> > >> key-changing
> > >>> operation and downstream join -- ie there's no way to do this without
> > >>> #toTable which is more like syntactic sugar for the purposes of this
> > >>> repartitioning discussion.
> > >>>
> > >>> I worry that making this a DSL operator on KStream is too generic,
> and
> > we
> > >>> would also need to add it to KTable for example B, despite KTables
> not
> > >>> having any true pure key-changing operators outside of #groupBy.
> Would
> > we
> > >>> throw an exception if you invoked #markAsPartitioned on a KTable that
> > >>> wasn't followed by a groupBy? If you have multiple key-changing
> > >> operators,
> > >>> would you need to add markAsPartitioned after each one? If not, what
> > are
> > >>> the semantics of that?  These are the main questions that got me
> > thinking
> > >>> here, and will definitely need to be clarified in the KIP if we do go
> > >> with
> > >>> the current proposal. But I wanted to throw out another idea for an
> > API I
> > >>> think would help with some of this awkwardness by having clearly
> > defined
> > >>> semantics:
> > >>>
> > >>> Fundamentally it seems to me that these issues are arising from that
> > >> "being
> > >>> partitioned" is conceptually a property of other operations applied
> to
> > a
> > >>> KStream/KTable, rather than an operation itself. So rather than
> making
> > >> this
> > >>> a DSL operator itself, what if we added it to the Grouped and various
> > >>> Joined configuration classes? It would allow us to more carefully hit
> > >> only
> > >>> the relevant parts of the DSL, so there are no questions about
> > >> whether/when
> > >>> to throw errors when the operator is incorrectly applied -- there
> would
> > >> be
> > >>> no way to apply it incorrectly. The main drawback I can think of is
> > >> simply
> > >>> that this touches on a larger surface area of the API. I personally
> > don't
> > >>> believe this is a good enough reason to make it a DSL operator as one
> > >> could
> > >>> make that argument for nearly any kind of KStream or KTable operator
> > >>> configuration going forward, and would explode the KStream/KTable API
> > >>> surface area instead. Perhaps this was discussed during the previous
> > >>> iteration of this KIP, or I'm missing something here, so I just
> wanted
> > to
> > >>> put this out there and see what people think
> > >>>
> > >>> Either way, thanks for picking up this KIP. It's been a long time
> > coming
> > >> :)
> > >>>
> > >>> -Sophie
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> It's been a few days so I went ahead with editing the KIP, the main
> > >> change
> > >>>> is on the method name
> > >>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> > >>>> .
> > >>>> I will follow up with a VOTE separately.
> > >>>>
> > >>>> Best,
> > >>>> Shay
> > >>>>
> > >>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <mj...@apache.org>
> > >> wrote:
> > >>>>
> > >>>>> Shay,
> > >>>>>
> > >>>>> thanks for picking up this KIP. It's a pity that the discussion
> > stalled
> > >>>>> for such a long time.
> > >>>>>
> > >>>>> As expressed previously, I am happy with the name
> > `markAsPartitioned()`
> > >>>>> and also believe it's ok to just document the impact and leave it
> to
> > >> the
> > >>>>> user to do the right thing.
> > >>>>>
> > >>>>> If we really get a lot of users that ask about it, because they did
> > not
> > >>>>> do the right thing, we could still add something (eg, a
> > reverse-mapper
> > >>>>> function) in a follow-up KIP. But we don't know if it's necessary;
> > >> thus,
> > >>>>> making a small incremental step sounds like a good approach to me.
> > >>>>>
> > >>>>> Let's see if others agree or not.
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 6/28/23 5:29 PM, Shay Lin wrote:
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> Great discussion thread. May I take this KIP up? If it’s alright
> my
> > >>>> plan
> > >>>>> is
> > >>>>>> to update the KIP with the operator `markAsPartitioned()`.
> > >>>>>>
> > >>>>>> As you have discussed and pointed out, there are implications to
> > >>>>> downstream
> > >>>>>> joins or aggregation operations. Still, the operator is intended
> for
> > >>>>>> advanced users so my two cents is it would be a valuable addition
> > >>>>>> nonetheless. We could add this as a caution/consideration as part
> of
> > >>>> the
> > >>>>>> java doc.
> > >>>>>>
> > >>>>>> Let me know, thanks.
> > >>>>>> Shay
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to