Very good catch, Matthias. I updated the KIP to state that the new DSLOperation will return a new, mutated KStream.
Thank you, Shay On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax <mj...@apache.org> wrote: > One last question. What should happen for the following case: > > KStream myStream = build.stream(...).map(...); > myStream.markAsPartiitoned().groupByKey().aggregate(...); > myStream.join(...) > > The question is about the "fan-out" pattern. `myStream`, which is marked > for partitioning, is fed into two downstream operations. Thus, it's > clear that the aggregation won't trigger a rebalance. However, the > fan-out happens before `markAsRepartiitoned` and thus I would assume > that the join would trigger a repartitioning? > > This question is important, because if we follow what I said above, > `markAsRepartiitoned` returns a new KStream object, but does mutate the > upstream KStream object, what is semantically two different things. It > also has an impact on how we need to implement the feature. The KIP > should explicitly explain this case. > > > -Matthias > > On 7/26/23 4:58 PM, Shay Lin wrote: > > Hi John, > > > > Thanks for your reply. I updated the KIP to reflect the changes we > > discussed in the thread today. > > #1 is duly noted, I learned from the examples Sophie sent earlier! =) > > > > In the new version, I also talked about why IQ and joins will not work > with > > the interface and talked about the mitigation. The proposal > > now specifically states we are solving the unneeded partition problem > when > > IQ or join does not coexist in the kafka streams. In the concerns > section, > > the proposal talks about having a reverse mapping would make this new > > interface compatible with IQ and join again but is subject to demand. > > > > Let me know what you think. Thanks! > > Shay > > > > > > > > On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org> > wrote: > > > >> Hello Shay, > >> > >> Thanks for the KIP! > >> > >> I just took a look in preparation to vote, and there are two small-ish > >> things that I'd like to fix first. Apologies if this stuff has already > come > >> up in the discussion thread; I only skimmed it. > >> > >> 1. The KIP only mentions the name of the method instead of providing a > >> code snippet showing exactly what the method signature will be in the > >> interface. Normally, KIPs do the latter because it removes all ambiguity > >> from the proposal. It also gives you an opportunity to write down the > >> Javadoc you would add to the method instead of just mentioning the > points > >> that you plan to document. > >> > >> 2. The KIP lists some concerns, but not what you will do to mitigate > them. > >> For example, the concern about IQ not behaving correctly. Will you > disable > >> the use of the implicit partitioner downstream of one of these > >> cancellations? Or provide a new interface to supply the "reverse > mapping" > >> you mentioned? Or include documentation in the Javadoc for how to deal > with > >> the situation? I think there are a range of options for each of those > >> concerns, and we should state up front what we plan to do. > >> > >> Thanks again! > >> -John > >> > >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote: > >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the > >> current > >>> proposal. I think once you make the minor > >>> updates to the KIP document this will be ready for voting again. > >>> > >>> Cheers, > >>> Sophie > >>> > >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote: > >>> > >>>> Hi Sophie and Matthias, thanks for your comments and replies. > >>>> > >>>> 1. Scope of change: KStreams only or KStreams/KTable > >>>> I took some time to digest your points, looking through how KStreams > >>>> triggers repartitions today. I noticed that `repartitionRequired`is a > >> flag > >>>> in KStreamImpl etc and not in KTableImpl etc. When I look further, in > >> the > >>>> case of KTable, instead of passing in a boolean flag, a repartition > >> node ` > >>>> TableRepartitionMapNode` is directly created. I went back and > >> referenced > >>>> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were > >>>> focused on KStreams, i.e. not to change the partition why the input > >> streams > >>>> are already correctly keyed. Is it possible that in the case of > KTable, > >>>> users always intend to repartition (change key) when they call on > >>>> aggregate? -- (this was written before I saw Matthias's comment) > >>>> > >>>> Overall, based on the tickets, I see the benefit of doing a contained > >>>> change focusing on KStreams, i.e. repartitionRequired, which would > >> solve > >>>> the pain points nicely. If we ran into similar complaints/optimization > >>>> requests for KTable down the line, we can address them on top of > >> this(let > >>>> me know if we have these requests already, I might just be negligent). > >>>> > >>>> 2. API: markAsPartitioned() vs config > >>>> If we go with the KStreams only scope, markAsPartition() is more > >>>> adequate, i.e. maps nicely to repartitionRequired. There is a list of > >>>> NamedOperations that may or may not trigger repartition based on its > >>>> context(KStreams or KTable) which would make the implementation more > >>>> confusing. > >>>> > >>>> 3. KIP documentation: Thanks for providing the links to previous KIPs. > >> I > >>>> will be adding the three use cases and javadoc. I will also document > >> the > >>>> risks when it relates to IQ and Join. > >>>> > >>>> Best, > >>>> Shay > >>>> > >>>> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>> > >>>>> I agree that it could easily be misused. There is a few Jira tickets > >> for > >>>>> cases when people want to "cancel" a repartition step. I would hope > >>>>> those tickets are linked to the KIP (if not, we should do this, and > >>>>> maybe even c&p those cases as motivation into the KIP itself)? > >>>>> > >>>>> It's always a tricky question to what extend we want to guide users, > >> and > >>>>> to what extend we need to give levers for advances case (and how to > >>>>> design those levers...) It's for sure a good idea to call out "use > >> with > >>>>> case" in the JavaDocs for the new method. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote: > >>>>>> I guess I felt a bit uneasy about how this could be used/abused > >> while > >>>>>> reading the KIP, but if we truly believe this is an advanced > >> feature, > >>>> I'm > >>>>>> fine with the way things currently are. It doesn't feel like the > >> best > >>>>> API, > >>>>>> but it does seem to be the best *possible* API given the way things > >>>> are. > >>>>>> > >>>>>> W.r.t the KTable notes, that all makes sense to me. I just wanted > >> to > >>>> lay > >>>>>> out all the potential cases to make sure we had our bases covered. > >>>>>> > >>>>>> I still think an example or two would help, but the only thing I > >> will > >>>>>> actually wait on before feeling comfortable enough to vote on this > >>>> would > >>>>> be > >>>>>> a clear method signature (and maybe sample javadocs) in the "Public > >>>>>> Interfaces" section. > >>>>>> > >>>>>> Thanks again for the KIP Shay! Hope I haven't dragged it out too > >> much > >>>>>> > >>>>>> On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> Some thought about the API question. > >>>>>>> > >>>>>>> > >>>>>>>>> A. kstream.groupBy(...).aggregate(...) > >>>>>>> > >>>>>>> This can be re-writtten as > >>>>>>> > >>>>>>> kstream.selectKey(...) > >>>>>>> .markAsRepartitioned() > >>>>>>> .groupByKey() > >>>>>>> .aggregate() > >>>>>>> > >>>>>>> Given that `markAsRepartitoned` is an advanced feature, I think it > >>>> would > >>>>>>> be ok? > >>>>>>> > >>>>>>> > >>>>>>>>> B. ktable.groupBy(...).aggregate(...) > >>>>>>> > >>>>>>> For KTable aggregation, not sure how useful it would be? In the > >> end, > >>>> an > >>>>>>> table aggregation does only make sense if we pick something from > >> the > >>>>>>> value, ie, we indeed change the key? > >>>>>>> > >>>>>>> > >>>>>>>>> C. kstream.selectKey(...).join(ktable) > >>>>>>> > >>>>>>> We can just insert a `markAsRepartitioned()` after `selectKey` to > >>>> avoid > >>>>>>> repartitioning of the left input KStream. > >>>>>>> > >>>>>>> > >>>>>>>> KStream.selectKey(...).toTable().join(...) > >>>>>>> > >>>>>>> Not sure if I understand what you try to say with this example? > >> In the > >>>>>>> end, `selectKey(...).toTable()` would repartiton. If I know that > >> one > >>>> can > >>>>>>> upsert directly, one inserts a `markAsRepartitioned()` in between. > >>>>>>> > >>>>>>> > >>>>>>> In general, the use case seems to be that the key is not in the > >> right > >>>>>>> "format", or there is no key, but data was partitioned by a > >>>>>>> value-attribute upstream and we just want to extract this > >>>>>>> value-attribute into the key. Both seems to be KStream cases? > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote: > >>>>>>>> Hey Shay, while I don't have any specific concerns about the new > >>>> public > >>>>>>> API > >>>>>>>> in this KIP, I'd like to better understand how this feature will > >> work > >>>>>>>> before I vote. We should document the behavior of this new > >> operator > >>>>>>> clearly > >>>>>>>> in the KIP as well -- you don't necessarily need to write the > >>>> complete > >>>>>>>> javadocs up front, but it should be possible for a user to read > >> the > >>>> KIP > >>>>>>> and > >>>>>>>> then understand how this feature will work and how they would > >> need to > >>>>>>> apply > >>>>>>>> it. > >>>>>>>> > >>>>>>>> To that end, I recommend framing this proposal with a few > >> examples to > >>>>>>> help > >>>>>>>> clarify the semantics. When and where can you apply the > >>>>>>> markAsPartitioned() > >>>>>>>> operator? Some suggestions below. > >>>>>>>> > >>>>>>>> Specific notes: > >>>>>>>> > >>>>>>>> 1. The KIP opens with "Each key changing operation in Kafka > >> Streams > >>>>>>>> (selectKey, map, transform, etc.) now leads to automatic > >> repartition > >>>>>>> before > >>>>>>>> an aggregation." We should change "aggregation" to "stateful > >>>> operation" > >>>>>>> as > >>>>>>>> this is true for things like joins as well as aggregations > >>>>>>>> 2. The callout on IQ makes me a bit uncomfortable -- basically it > >>>> says > >>>>>>> this > >>>>>>>> should not be a concern "if we use markAsPartitioned correctly". > >> Does > >>>>>>> this > >>>>>>>> mean if we, the devs implementing this, write the feature > >> correctly? > >>>> Or > >>>>>>> is > >>>>>>>> it saying that this won't be a problem as long as "we", the > >> users of > >>>>> this > >>>>>>>> feature, use it correctly"? Just wondering if you've put any > >> thought > >>>>> into > >>>>>>>> how this would work yet (I personally have not) > >>>>>>>> 3. The KIP should lay out the proposed API exactly, even if > >> there's > >>>>> only > >>>>>>>> one new method. Check out this KIP > >>>>>>>> < > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>>>>>> > >>>>>>>> (or this KIP > >>>>>>>> < > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808 > >>>>>>>> ) > >>>>>>>> for a good reference on what the Public Interfaces section should > >>>>> include > >>>>>>>> 4. Regarding the proposed API itself, I wonder if KStream is > >> really > >>>> the > >>>>>>>> most appropriate interface for the new operator. A repartition > >> can be > >>>>>>>> triggered on just a KTable. Here's where some examples would > >> help. > >>>>>>> Perhaps > >>>>>>>> we could focus on these three cases: > >>>>>>>> > >>>>>>>> A. kstream.groupBy(...).aggregate(...) > >>>>>>>> B. ktable.groupBy(...).aggregate(...) > >>>>>>>> C. kstream.selectKey(...).join(ktable) > >>>>>>>> > >>>>>>>> I'm sure someone will correct me if I'm missing any additional > >> vital > >>>>>>>> examples, but at the very least, these are the three to consider: > >>>>> either > >>>>>>> a > >>>>>>>> KStream or KTable followed by a groupBy/aggregation, or a KStream > >>>> with > >>>>>>>> key-changing operator followed by a join. Note that you could > >> have > >>>>>>>> something like KStream.selectKey(...).toTable().join(...) as > >> well, > >>>> but > >>>>>>>> since there are no pure key-changing operators (like #selectKey) > >> on > >>>>>>>> KTables, only groupBy() which must always be followed by > >> aggregation, > >>>>>>> this > >>>>>>>> 4th case can be reduced to an example like C of a KStream with > >>>>>>> key-changing > >>>>>>>> operation and downstream join -- ie there's no way to do this > >> without > >>>>>>>> #toTable which is more like syntactic sugar for the purposes of > >> this > >>>>>>>> repartitioning discussion. > >>>>>>>> > >>>>>>>> I worry that making this a DSL operator on KStream is too > >> generic, > >>>> and > >>>>> we > >>>>>>>> would also need to add it to KTable for example B, despite > >> KTables > >>>> not > >>>>>>>> having any true pure key-changing operators outside of #groupBy. > >>>> Would > >>>>> we > >>>>>>>> throw an exception if you invoked #markAsPartitioned on a KTable > >> that > >>>>>>>> wasn't followed by a groupBy? If you have multiple key-changing > >>>>>>> operators, > >>>>>>>> would you need to add markAsPartitioned after each one? If not, > >> what > >>>>> are > >>>>>>>> the semantics of that? These are the main questions that got me > >>>>> thinking > >>>>>>>> here, and will definitely need to be clarified in the KIP if we > >> do go > >>>>>>> with > >>>>>>>> the current proposal. But I wanted to throw out another idea for > >> an > >>>>> API I > >>>>>>>> think would help with some of this awkwardness by having clearly > >>>>> defined > >>>>>>>> semantics: > >>>>>>>> > >>>>>>>> Fundamentally it seems to me that these issues are arising from > >> that > >>>>>>> "being > >>>>>>>> partitioned" is conceptually a property of other operations > >> applied > >>>> to > >>>>> a > >>>>>>>> KStream/KTable, rather than an operation itself. So rather than > >>>> making > >>>>>>> this > >>>>>>>> a DSL operator itself, what if we added it to the Grouped and > >> various > >>>>>>>> Joined configuration classes? It would allow us to more > >> carefully hit > >>>>>>> only > >>>>>>>> the relevant parts of the DSL, so there are no questions about > >>>>>>> whether/when > >>>>>>>> to throw errors when the operator is incorrectly applied -- there > >>>> would > >>>>>>> be > >>>>>>>> no way to apply it incorrectly. The main drawback I can think of > >> is > >>>>>>> simply > >>>>>>>> that this touches on a larger surface area of the API. I > >> personally > >>>>> don't > >>>>>>>> believe this is a good enough reason to make it a DSL operator > >> as one > >>>>>>> could > >>>>>>>> make that argument for nearly any kind of KStream or KTable > >> operator > >>>>>>>> configuration going forward, and would explode the > >> KStream/KTable API > >>>>>>>> surface area instead. Perhaps this was discussed during the > >> previous > >>>>>>>> iteration of this KIP, or I'm missing something here, so I just > >>>> wanted > >>>>> to > >>>>>>>> put this out there and see what people think > >>>>>>>> > >>>>>>>> Either way, thanks for picking up this KIP. It's been a long time > >>>>> coming > >>>>>>> :) > >>>>>>>> > >>>>>>>> -Sophie > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> > >> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> It's been a few days so I went ahead with editing the KIP, the > >> main > >>>>>>> change > >>>>>>>>> is on the method name > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling > >>>>>>>>> . > >>>>>>>>> I will follow up with a VOTE separately. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Shay > >>>>>>>>> > >>>>>>>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax < > >> mj...@apache.org> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Shay, > >>>>>>>>>> > >>>>>>>>>> thanks for picking up this KIP. It's a pity that the discussion > >>>>> stalled > >>>>>>>>>> for such a long time. > >>>>>>>>>> > >>>>>>>>>> As expressed previously, I am happy with the name > >>>>> `markAsPartitioned()` > >>>>>>>>>> and also believe it's ok to just document the impact and leave > >> it > >>>> to > >>>>>>> the > >>>>>>>>>> user to do the right thing. > >>>>>>>>>> > >>>>>>>>>> If we really get a lot of users that ask about it, because > >> they did > >>>>> not > >>>>>>>>>> do the right thing, we could still add something (eg, a > >>>>> reverse-mapper > >>>>>>>>>> function) in a follow-up KIP. But we don't know if it's > >> necessary; > >>>>>>> thus, > >>>>>>>>>> making a small incremental step sounds like a good approach to > >> me. > >>>>>>>>>> > >>>>>>>>>> Let's see if others agree or not. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> On 6/28/23 5:29 PM, Shay Lin wrote: > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> Great discussion thread. May I take this KIP up? If it’s > >> alright > >>>> my > >>>>>>>>> plan > >>>>>>>>>> is > >>>>>>>>>>> to update the KIP with the operator `markAsPartitioned()`. > >>>>>>>>>>> > >>>>>>>>>>> As you have discussed and pointed out, there are implications > >> to > >>>>>>>>>> downstream > >>>>>>>>>>> joins or aggregation operations. Still, the operator is > >> intended > >>>> for > >>>>>>>>>>> advanced users so my two cents is it would be a valuable > >> addition > >>>>>>>>>>> nonetheless. We could add this as a caution/consideration as > >> part > >>>> of > >>>>>>>>> the > >>>>>>>>>>> java doc. > >>>>>>>>>>> > >>>>>>>>>>> Let me know, thanks. > >>>>>>>>>>> Shay > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >