Very good catch, Matthias. I updated the KIP to state that the new
DSLOperation will return a new, mutated KStream.

Thank you,
Shay

On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax <mj...@apache.org> wrote:

> One last question. What should happen for the following case:
>
> KStream myStream = build.stream(...).map(...);
> myStream.markAsPartiitoned().groupByKey().aggregate(...);
> myStream.join(...)
>
> The question is about the "fan-out" pattern. `myStream`, which is marked
> for partitioning, is fed into two downstream operations. Thus, it's
> clear that the aggregation won't trigger a rebalance. However, the
> fan-out happens before `markAsRepartiitoned` and thus I would assume
> that the join would trigger a repartitioning?
>
> This question is important, because if we follow what I said above,
> `markAsRepartiitoned` returns a new KStream object, but does mutate the
> upstream KStream object, what is semantically two different things. It
> also has an impact on how we need to implement the feature. The KIP
> should explicitly explain this case.
>
>
> -Matthias
>
> On 7/26/23 4:58 PM, Shay Lin wrote:
> > Hi John,
> >
> > Thanks for your reply. I updated the KIP to reflect the changes we
> > discussed in the thread today.
> > #1 is duly noted, I learned from the examples Sophie sent earlier! =)
> >
> > In the new version, I also talked about why IQ and joins will not work
> with
> > the interface and talked about the mitigation. The proposal
> > now specifically states we are solving the unneeded partition problem
> when
> > IQ or join does not coexist in the kafka streams. In the concerns
> section,
> > the proposal talks about having a reverse mapping would make this new
> > interface compatible with IQ and join again but is subject to demand.
> >
> > Let me know what you think. Thanks!
> > Shay
> >
> >
> >
> > On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> >> Hello Shay,
> >>
> >> Thanks for the KIP!
> >>
> >> I just took a look in preparation to vote, and there are two small-ish
> >> things that I'd like to fix first. Apologies if this stuff has already
> come
> >> up in the discussion thread; I only skimmed it.
> >>
> >> 1. The KIP only mentions the name of the method instead of providing a
> >> code snippet showing exactly what the method signature will be in the
> >> interface. Normally, KIPs do the latter because it removes all ambiguity
> >> from the proposal. It also gives you an opportunity to write down the
> >> Javadoc you would add to the method instead of just mentioning the
> points
> >> that you plan to document.
> >>
> >> 2. The KIP lists some concerns, but not what you will do to mitigate
> them.
> >> For example, the concern about IQ not behaving correctly. Will you
> disable
> >> the use of the implicit partitioner downstream of one of these
> >> cancellations? Or provide a new interface to supply the "reverse
> mapping"
> >> you mentioned? Or include documentation in the Javadoc for how to deal
> with
> >> the situation? I think there are a range of options for each of those
> >> concerns, and we should state up front what we plan to do.
> >>
> >> Thanks again!
> >> -John
> >>
> >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
> >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the
> >> current
> >>> proposal. I think once you make the minor
> >>> updates to the KIP document this will be ready for voting again.
> >>>
> >>> Cheers,
> >>> Sophie
> >>>
> >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote:
> >>>
> >>>> Hi Sophie and Matthias, thanks for your comments and replies.
> >>>>
> >>>> 1. Scope of change: KStreams only or KStreams/KTable
> >>>> I took some time to digest your points, looking through how KStreams
> >>>> triggers repartitions today. I noticed that `repartitionRequired`is a
> >> flag
> >>>> in KStreamImpl etc and not in KTableImpl etc. When I look further, in
> >> the
> >>>> case of KTable, instead of passing in a boolean flag, a repartition
> >> node `
> >>>> TableRepartitionMapNode` is directly created. I went back and
> >> referenced
> >>>> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
> >>>> focused on KStreams, i.e. not to change the partition why the input
> >> streams
> >>>> are already correctly keyed. Is it possible that in the case of
> KTable,
> >>>> users always intend to repartition (change key) when they call on
> >>>> aggregate? -- (this was written before I saw Matthias's comment)
> >>>>
> >>>> Overall, based on the tickets, I see the benefit of doing a contained
> >>>> change focusing on KStreams, i.e. repartitionRequired, which would
> >> solve
> >>>> the pain points nicely. If we ran into similar complaints/optimization
> >>>> requests for KTable down the line, we can address them on top of
> >> this(let
> >>>> me know if we have these requests already, I might just be negligent).
> >>>>
> >>>> 2. API: markAsPartitioned() vs config
> >>>> If we go with the KStreams only scope, markAsPartition() is more
> >>>> adequate, i.e. maps nicely to repartitionRequired. There is a list of
> >>>> NamedOperations that may or may not trigger repartition based on its
> >>>> context(KStreams or KTable) which would make the implementation more
> >>>> confusing.
> >>>>
> >>>> 3. KIP documentation: Thanks for providing the links to previous KIPs.
> >> I
> >>>> will be adding the three use cases and javadoc. I will also document
> >> the
> >>>> risks when it relates to IQ and Join.
> >>>>
> >>>> Best,
> >>>> Shay
> >>>>
> >>>> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>
> >>>>> I agree that it could easily be misused. There is a few Jira tickets
> >> for
> >>>>> cases when people want to "cancel" a repartition step. I would hope
> >>>>> those tickets are linked to the KIP (if not, we should do this, and
> >>>>> maybe even c&p those cases as motivation into the KIP itself)?
> >>>>>
> >>>>> It's always a tricky question to what extend we want to guide users,
> >> and
> >>>>> to what extend we need to give levers for advances case (and how to
> >>>>> design those levers...) It's for sure a good idea to call out "use
> >> with
> >>>>> case" in the JavaDocs for the new method.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> >>>>>> I guess I felt a bit uneasy about how this could be used/abused
> >> while
> >>>>>> reading the KIP, but if we truly believe this is an advanced
> >> feature,
> >>>> I'm
> >>>>>> fine with the way things currently are. It doesn't feel like the
> >> best
> >>>>> API,
> >>>>>> but it does seem to be the best *possible* API given the way things
> >>>> are.
> >>>>>>
> >>>>>> W.r.t the KTable notes, that all makes sense to me. I just wanted
> >> to
> >>>> lay
> >>>>>> out all the potential cases to make sure we had our bases covered.
> >>>>>>
> >>>>>> I still think an example or two would help, but the only thing I
> >> will
> >>>>>> actually wait on before feeling comfortable enough to vote on this
> >>>> would
> >>>>> be
> >>>>>> a clear method signature (and maybe sample javadocs) in the "Public
> >>>>>> Interfaces" section.
> >>>>>>
> >>>>>> Thanks again for the KIP Shay! Hope I haven't dragged it out too
> >> much
> >>>>>>
> >>>>>> On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Some thought about the API question.
> >>>>>>>
> >>>>>>>
> >>>>>>>>> A. kstream.groupBy(...).aggregate(...)
> >>>>>>>
> >>>>>>> This can be re-writtten as
> >>>>>>>
> >>>>>>> kstream.selectKey(...)
> >>>>>>>           .markAsRepartitioned()
> >>>>>>>           .groupByKey()
> >>>>>>>           .aggregate()
> >>>>>>>
> >>>>>>> Given that `markAsRepartitoned` is an advanced feature, I think it
> >>>> would
> >>>>>>> be ok?
> >>>>>>>
> >>>>>>>
> >>>>>>>>> B. ktable.groupBy(...).aggregate(...)
> >>>>>>>
> >>>>>>> For KTable aggregation, not sure how useful it would be? In the
> >> end,
> >>>> an
> >>>>>>> table aggregation does only make sense if we pick something from
> >> the
> >>>>>>> value, ie, we indeed change the key?
> >>>>>>>
> >>>>>>>
> >>>>>>>>> C. kstream.selectKey(...).join(ktable)
> >>>>>>>
> >>>>>>> We can just insert a `markAsRepartitioned()` after `selectKey` to
> >>>> avoid
> >>>>>>> repartitioning of the left input KStream.
> >>>>>>>
> >>>>>>>
> >>>>>>>> KStream.selectKey(...).toTable().join(...)
> >>>>>>>
> >>>>>>> Not sure if I understand what you try to say with this example?
> >> In the
> >>>>>>> end, `selectKey(...).toTable()` would repartiton. If I know that
> >> one
> >>>> can
> >>>>>>> upsert directly, one inserts a `markAsRepartitioned()` in between.
> >>>>>>>
> >>>>>>>
> >>>>>>> In general, the use case seems to be that the key is not in the
> >> right
> >>>>>>> "format", or there is no key, but data was partitioned by a
> >>>>>>> value-attribute upstream and we just want to extract this
> >>>>>>> value-attribute into the key. Both seems to be KStream cases?
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
> >>>>>>>> Hey Shay, while I don't have any specific concerns about the new
> >>>> public
> >>>>>>> API
> >>>>>>>> in this KIP, I'd like to better understand how this feature will
> >> work
> >>>>>>>> before I vote. We should document the behavior of this new
> >> operator
> >>>>>>> clearly
> >>>>>>>> in the KIP as well -- you don't necessarily need to write the
> >>>> complete
> >>>>>>>> javadocs up front, but it should be possible for a user to read
> >> the
> >>>> KIP
> >>>>>>> and
> >>>>>>>> then understand how this feature will work and how they would
> >> need to
> >>>>>>> apply
> >>>>>>>> it.
> >>>>>>>>
> >>>>>>>> To that end, I recommend framing this proposal with a few
> >> examples to
> >>>>>>> help
> >>>>>>>> clarify the semantics. When and where can you apply the
> >>>>>>> markAsPartitioned()
> >>>>>>>> operator? Some suggestions below.
> >>>>>>>>
> >>>>>>>> Specific notes:
> >>>>>>>>
> >>>>>>>> 1. The KIP opens with "Each key changing operation in Kafka
> >> Streams
> >>>>>>>> (selectKey, map, transform, etc.) now leads to automatic
> >> repartition
> >>>>>>> before
> >>>>>>>> an aggregation." We should change "aggregation" to "stateful
> >>>> operation"
> >>>>>>> as
> >>>>>>>> this is true for things like joins as well as aggregations
> >>>>>>>> 2. The callout on IQ makes me a bit uncomfortable -- basically it
> >>>> says
> >>>>>>> this
> >>>>>>>> should not be a concern "if we use markAsPartitioned correctly".
> >> Does
> >>>>>>> this
> >>>>>>>> mean if we, the devs implementing this, write the feature
> >> correctly?
> >>>> Or
> >>>>>>> is
> >>>>>>>> it saying that this won't be a problem as long as "we", the
> >> users of
> >>>>> this
> >>>>>>>> feature, use it correctly"? Just wondering if you've put any
> >> thought
> >>>>> into
> >>>>>>>> how this would work yet (I personally have not)
> >>>>>>>> 3. The KIP should lay out the proposed API exactly, even if
> >> there's
> >>>>> only
> >>>>>>>> one new method. Check out this KIP
> >>>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>>>>>>
> >>>>>>>> (or this KIP
> >>>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
> >>>>>>>> )
> >>>>>>>> for a good reference on what the Public Interfaces section should
> >>>>> include
> >>>>>>>> 4. Regarding the proposed API itself, I wonder if KStream is
> >> really
> >>>> the
> >>>>>>>> most appropriate interface for the new operator. A repartition
> >> can be
> >>>>>>>> triggered on just a KTable. Here's where some examples would
> >> help.
> >>>>>>> Perhaps
> >>>>>>>> we could focus on these three cases:
> >>>>>>>>
> >>>>>>>> A. kstream.groupBy(...).aggregate(...)
> >>>>>>>> B. ktable.groupBy(...).aggregate(...)
> >>>>>>>> C. kstream.selectKey(...).join(ktable)
> >>>>>>>>
> >>>>>>>> I'm sure someone will correct me if I'm missing any additional
> >> vital
> >>>>>>>> examples, but at the very least, these are the three to consider:
> >>>>> either
> >>>>>>> a
> >>>>>>>> KStream or KTable followed by a groupBy/aggregation, or a KStream
> >>>> with
> >>>>>>>> key-changing operator followed by a join. Note that you could
> >> have
> >>>>>>>> something like KStream.selectKey(...).toTable().join(...) as
> >> well,
> >>>> but
> >>>>>>>> since there are no pure key-changing operators (like #selectKey)
> >> on
> >>>>>>>> KTables, only groupBy() which must always be followed by
> >> aggregation,
> >>>>>>> this
> >>>>>>>> 4th case can be reduced to an example like C of a KStream with
> >>>>>>> key-changing
> >>>>>>>> operation and downstream join -- ie there's no way to do this
> >> without
> >>>>>>>> #toTable which is more like syntactic sugar for the purposes of
> >> this
> >>>>>>>> repartitioning discussion.
> >>>>>>>>
> >>>>>>>> I worry that making this a DSL operator on KStream is too
> >> generic,
> >>>> and
> >>>>> we
> >>>>>>>> would also need to add it to KTable for example B, despite
> >> KTables
> >>>> not
> >>>>>>>> having any true pure key-changing operators outside of #groupBy.
> >>>> Would
> >>>>> we
> >>>>>>>> throw an exception if you invoked #markAsPartitioned on a KTable
> >> that
> >>>>>>>> wasn't followed by a groupBy? If you have multiple key-changing
> >>>>>>> operators,
> >>>>>>>> would you need to add markAsPartitioned after each one? If not,
> >> what
> >>>>> are
> >>>>>>>> the semantics of that?  These are the main questions that got me
> >>>>> thinking
> >>>>>>>> here, and will definitely need to be clarified in the KIP if we
> >> do go
> >>>>>>> with
> >>>>>>>> the current proposal. But I wanted to throw out another idea for
> >> an
> >>>>> API I
> >>>>>>>> think would help with some of this awkwardness by having clearly
> >>>>> defined
> >>>>>>>> semantics:
> >>>>>>>>
> >>>>>>>> Fundamentally it seems to me that these issues are arising from
> >> that
> >>>>>>> "being
> >>>>>>>> partitioned" is conceptually a property of other operations
> >> applied
> >>>> to
> >>>>> a
> >>>>>>>> KStream/KTable, rather than an operation itself. So rather than
> >>>> making
> >>>>>>> this
> >>>>>>>> a DSL operator itself, what if we added it to the Grouped and
> >> various
> >>>>>>>> Joined configuration classes? It would allow us to more
> >> carefully hit
> >>>>>>> only
> >>>>>>>> the relevant parts of the DSL, so there are no questions about
> >>>>>>> whether/when
> >>>>>>>> to throw errors when the operator is incorrectly applied -- there
> >>>> would
> >>>>>>> be
> >>>>>>>> no way to apply it incorrectly. The main drawback I can think of
> >> is
> >>>>>>> simply
> >>>>>>>> that this touches on a larger surface area of the API. I
> >> personally
> >>>>> don't
> >>>>>>>> believe this is a good enough reason to make it a DSL operator
> >> as one
> >>>>>>> could
> >>>>>>>> make that argument for nearly any kind of KStream or KTable
> >> operator
> >>>>>>>> configuration going forward, and would explode the
> >> KStream/KTable API
> >>>>>>>> surface area instead. Perhaps this was discussed during the
> >> previous
> >>>>>>>> iteration of this KIP, or I'm missing something here, so I just
> >>>> wanted
> >>>>> to
> >>>>>>>> put this out there and see what people think
> >>>>>>>>
> >>>>>>>> Either way, thanks for picking up this KIP. It's been a long time
> >>>>> coming
> >>>>>>> :)
> >>>>>>>>
> >>>>>>>> -Sophie
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com>
> >> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> It's been a few days so I went ahead with editing the KIP, the
> >> main
> >>>>>>> change
> >>>>>>>>> is on the method name
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> >>>>>>>>> .
> >>>>>>>>> I will follow up with a VOTE separately.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Shay
> >>>>>>>>>
> >>>>>>>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <
> >> mj...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Shay,
> >>>>>>>>>>
> >>>>>>>>>> thanks for picking up this KIP. It's a pity that the discussion
> >>>>> stalled
> >>>>>>>>>> for such a long time.
> >>>>>>>>>>
> >>>>>>>>>> As expressed previously, I am happy with the name
> >>>>> `markAsPartitioned()`
> >>>>>>>>>> and also believe it's ok to just document the impact and leave
> >> it
> >>>> to
> >>>>>>> the
> >>>>>>>>>> user to do the right thing.
> >>>>>>>>>>
> >>>>>>>>>> If we really get a lot of users that ask about it, because
> >> they did
> >>>>> not
> >>>>>>>>>> do the right thing, we could still add something (eg, a
> >>>>> reverse-mapper
> >>>>>>>>>> function) in a follow-up KIP. But we don't know if it's
> >> necessary;
> >>>>>>> thus,
> >>>>>>>>>> making a small incremental step sounds like a good approach to
> >> me.
> >>>>>>>>>>
> >>>>>>>>>> Let's see if others agree or not.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 6/28/23 5:29 PM, Shay Lin wrote:
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> Great discussion thread. May I take this KIP up? If it’s
> >> alright
> >>>> my
> >>>>>>>>> plan
> >>>>>>>>>> is
> >>>>>>>>>>> to update the KIP with the operator `markAsPartitioned()`.
> >>>>>>>>>>>
> >>>>>>>>>>> As you have discussed and pointed out, there are implications
> >> to
> >>>>>>>>>> downstream
> >>>>>>>>>>> joins or aggregation operations. Still, the operator is
> >> intended
> >>>> for
> >>>>>>>>>>> advanced users so my two cents is it would be a valuable
> >> addition
> >>>>>>>>>>> nonetheless. We could add this as a caution/consideration as
> >> part
> >>>> of
> >>>>>>>>> the
> >>>>>>>>>>> java doc.
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know, thanks.
> >>>>>>>>>>> Shay
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to