Hi John, Thanks for your reply. I updated the KIP to reflect the changes we discussed in the thread today. #1 is duly noted, I learned from the examples Sophie sent earlier! =)
In the new version, I also talked about why IQ and joins will not work with the interface and talked about the mitigation. The proposal now specifically states we are solving the unneeded partition problem when IQ or join does not coexist in the kafka streams. In the concerns section, the proposal talks about having a reverse mapping would make this new interface compatible with IQ and join again but is subject to demand. Let me know what you think. Thanks! Shay On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org> wrote: > Hello Shay, > > Thanks for the KIP! > > I just took a look in preparation to vote, and there are two small-ish > things that I'd like to fix first. Apologies if this stuff has already come > up in the discussion thread; I only skimmed it. > > 1. The KIP only mentions the name of the method instead of providing a > code snippet showing exactly what the method signature will be in the > interface. Normally, KIPs do the latter because it removes all ambiguity > from the proposal. It also gives you an opportunity to write down the > Javadoc you would add to the method instead of just mentioning the points > that you plan to document. > > 2. The KIP lists some concerns, but not what you will do to mitigate them. > For example, the concern about IQ not behaving correctly. Will you disable > the use of the implicit partitioner downstream of one of these > cancellations? Or provide a new interface to supply the "reverse mapping" > you mentioned? Or include documentation in the Javadoc for how to deal with > the situation? I think there are a range of options for each of those > concerns, and we should state up front what we plan to do. > > Thanks again! > -John > > On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote: > > Thanks Shay! You and Matthias have convinced me, I'm happy with the > current > > proposal. I think once you make the minor > > updates to the KIP document this will be ready for voting again. > > > > Cheers, > > Sophie > > > > On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote: > > > > > Hi Sophie and Matthias, thanks for your comments and replies. > > > > > > 1. Scope of change: KStreams only or KStreams/KTable > > > I took some time to digest your points, looking through how KStreams > > > triggers repartitions today. I noticed that `repartitionRequired`is a > flag > > > in KStreamImpl etc and not in KTableImpl etc. When I look further, in > the > > > case of KTable, instead of passing in a boolean flag, a repartition > node ` > > > TableRepartitionMapNode` is directly created. I went back and > referenced > > > the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were > > > focused on KStreams, i.e. not to change the partition why the input > streams > > > are already correctly keyed. Is it possible that in the case of KTable, > > > users always intend to repartition (change key) when they call on > > > aggregate? -- (this was written before I saw Matthias's comment) > > > > > > Overall, based on the tickets, I see the benefit of doing a contained > > > change focusing on KStreams, i.e. repartitionRequired, which would > solve > > > the pain points nicely. If we ran into similar complaints/optimization > > > requests for KTable down the line, we can address them on top of > this(let > > > me know if we have these requests already, I might just be negligent). > > > > > > 2. API: markAsPartitioned() vs config > > > If we go with the KStreams only scope, markAsPartition() is more > > > adequate, i.e. maps nicely to repartitionRequired. There is a list of > > > NamedOperations that may or may not trigger repartition based on its > > > context(KStreams or KTable) which would make the implementation more > > > confusing. > > > > > > 3. KIP documentation: Thanks for providing the links to previous KIPs. > I > > > will be adding the three use cases and javadoc. I will also document > the > > > risks when it relates to IQ and Join. > > > > > > Best, > > > Shay > > > > > > On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > > > I agree that it could easily be misused. There is a few Jira tickets > for > > > > cases when people want to "cancel" a repartition step. I would hope > > > > those tickets are linked to the KIP (if not, we should do this, and > > > > maybe even c&p those cases as motivation into the KIP itself)? > > > > > > > > It's always a tricky question to what extend we want to guide users, > and > > > > to what extend we need to give levers for advances case (and how to > > > > design those levers...) It's for sure a good idea to call out "use > with > > > > case" in the JavaDocs for the new method. > > > > > > > > > > > > -Matthias > > > > > > > > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote: > > > > > I guess I felt a bit uneasy about how this could be used/abused > while > > > > > reading the KIP, but if we truly believe this is an advanced > feature, > > > I'm > > > > > fine with the way things currently are. It doesn't feel like the > best > > > > API, > > > > > but it does seem to be the best *possible* API given the way things > > > are. > > > > > > > > > > W.r.t the KTable notes, that all makes sense to me. I just wanted > to > > > lay > > > > > out all the potential cases to make sure we had our bases covered. > > > > > > > > > > I still think an example or two would help, but the only thing I > will > > > > > actually wait on before feeling comfortable enough to vote on this > > > would > > > > be > > > > > a clear method signature (and maybe sample javadocs) in the "Public > > > > > Interfaces" section. > > > > > > > > > > Thanks again for the KIP Shay! Hope I haven't dragged it out too > much > > > > > > > > > > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org> > > > > wrote: > > > > > > > > > >> Some thought about the API question. > > > > >> > > > > >> > > > > >>>> A. kstream.groupBy(...).aggregate(...) > > > > >> > > > > >> This can be re-writtten as > > > > >> > > > > >> kstream.selectKey(...) > > > > >> .markAsRepartitioned() > > > > >> .groupByKey() > > > > >> .aggregate() > > > > >> > > > > >> Given that `markAsRepartitoned` is an advanced feature, I think it > > > would > > > > >> be ok? > > > > >> > > > > >> > > > > >>>> B. ktable.groupBy(...).aggregate(...) > > > > >> > > > > >> For KTable aggregation, not sure how useful it would be? In the > end, > > > an > > > > >> table aggregation does only make sense if we pick something from > the > > > > >> value, ie, we indeed change the key? > > > > >> > > > > >> > > > > >>>> C. kstream.selectKey(...).join(ktable) > > > > >> > > > > >> We can just insert a `markAsRepartitioned()` after `selectKey` to > > > avoid > > > > >> repartitioning of the left input KStream. > > > > >> > > > > >> > > > > >>> KStream.selectKey(...).toTable().join(...) > > > > >> > > > > >> Not sure if I understand what you try to say with this example? > In the > > > > >> end, `selectKey(...).toTable()` would repartiton. If I know that > one > > > can > > > > >> upsert directly, one inserts a `markAsRepartitioned()` in between. > > > > >> > > > > >> > > > > >> In general, the use case seems to be that the key is not in the > right > > > > >> "format", or there is no key, but data was partitioned by a > > > > >> value-attribute upstream and we just want to extract this > > > > >> value-attribute into the key. Both seems to be KStream cases? > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> > > > > >> > > > > >> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote: > > > > >>> Hey Shay, while I don't have any specific concerns about the new > > > public > > > > >> API > > > > >>> in this KIP, I'd like to better understand how this feature will > work > > > > >>> before I vote. We should document the behavior of this new > operator > > > > >> clearly > > > > >>> in the KIP as well -- you don't necessarily need to write the > > > complete > > > > >>> javadocs up front, but it should be possible for a user to read > the > > > KIP > > > > >> and > > > > >>> then understand how this feature will work and how they would > need to > > > > >> apply > > > > >>> it. > > > > >>> > > > > >>> To that end, I recommend framing this proposal with a few > examples to > > > > >> help > > > > >>> clarify the semantics. When and where can you apply the > > > > >> markAsPartitioned() > > > > >>> operator? Some suggestions below. > > > > >>> > > > > >>> Specific notes: > > > > >>> > > > > >>> 1. The KIP opens with "Each key changing operation in Kafka > Streams > > > > >>> (selectKey, map, transform, etc.) now leads to automatic > repartition > > > > >> before > > > > >>> an aggregation." We should change "aggregation" to "stateful > > > operation" > > > > >> as > > > > >>> this is true for things like joins as well as aggregations > > > > >>> 2. The callout on IQ makes me a bit uncomfortable -- basically it > > > says > > > > >> this > > > > >>> should not be a concern "if we use markAsPartitioned correctly". > Does > > > > >> this > > > > >>> mean if we, the devs implementing this, write the feature > correctly? > > > Or > > > > >> is > > > > >>> it saying that this won't be a problem as long as "we", the > users of > > > > this > > > > >>> feature, use it correctly"? Just wondering if you've put any > thought > > > > into > > > > >>> how this would work yet (I personally have not) > > > > >>> 3. The KIP should lay out the proposed API exactly, even if > there's > > > > only > > > > >>> one new method. Check out this KIP > > > > >>> < > > > > >> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > > > >>> > > > > >>> (or this KIP > > > > >>> < > > > > >> > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808 > > > > >>> ) > > > > >>> for a good reference on what the Public Interfaces section should > > > > include > > > > >>> 4. Regarding the proposed API itself, I wonder if KStream is > really > > > the > > > > >>> most appropriate interface for the new operator. A repartition > can be > > > > >>> triggered on just a KTable. Here's where some examples would > help. > > > > >> Perhaps > > > > >>> we could focus on these three cases: > > > > >>> > > > > >>> A. kstream.groupBy(...).aggregate(...) > > > > >>> B. ktable.groupBy(...).aggregate(...) > > > > >>> C. kstream.selectKey(...).join(ktable) > > > > >>> > > > > >>> I'm sure someone will correct me if I'm missing any additional > vital > > > > >>> examples, but at the very least, these are the three to consider: > > > > either > > > > >> a > > > > >>> KStream or KTable followed by a groupBy/aggregation, or a KStream > > > with > > > > >>> key-changing operator followed by a join. Note that you could > have > > > > >>> something like KStream.selectKey(...).toTable().join(...) as > well, > > > but > > > > >>> since there are no pure key-changing operators (like #selectKey) > on > > > > >>> KTables, only groupBy() which must always be followed by > aggregation, > > > > >> this > > > > >>> 4th case can be reduced to an example like C of a KStream with > > > > >> key-changing > > > > >>> operation and downstream join -- ie there's no way to do this > without > > > > >>> #toTable which is more like syntactic sugar for the purposes of > this > > > > >>> repartitioning discussion. > > > > >>> > > > > >>> I worry that making this a DSL operator on KStream is too > generic, > > > and > > > > we > > > > >>> would also need to add it to KTable for example B, despite > KTables > > > not > > > > >>> having any true pure key-changing operators outside of #groupBy. > > > Would > > > > we > > > > >>> throw an exception if you invoked #markAsPartitioned on a KTable > that > > > > >>> wasn't followed by a groupBy? If you have multiple key-changing > > > > >> operators, > > > > >>> would you need to add markAsPartitioned after each one? If not, > what > > > > are > > > > >>> the semantics of that? These are the main questions that got me > > > > thinking > > > > >>> here, and will definitely need to be clarified in the KIP if we > do go > > > > >> with > > > > >>> the current proposal. But I wanted to throw out another idea for > an > > > > API I > > > > >>> think would help with some of this awkwardness by having clearly > > > > defined > > > > >>> semantics: > > > > >>> > > > > >>> Fundamentally it seems to me that these issues are arising from > that > > > > >> "being > > > > >>> partitioned" is conceptually a property of other operations > applied > > > to > > > > a > > > > >>> KStream/KTable, rather than an operation itself. So rather than > > > making > > > > >> this > > > > >>> a DSL operator itself, what if we added it to the Grouped and > various > > > > >>> Joined configuration classes? It would allow us to more > carefully hit > > > > >> only > > > > >>> the relevant parts of the DSL, so there are no questions about > > > > >> whether/when > > > > >>> to throw errors when the operator is incorrectly applied -- there > > > would > > > > >> be > > > > >>> no way to apply it incorrectly. The main drawback I can think of > is > > > > >> simply > > > > >>> that this touches on a larger surface area of the API. I > personally > > > > don't > > > > >>> believe this is a good enough reason to make it a DSL operator > as one > > > > >> could > > > > >>> make that argument for nearly any kind of KStream or KTable > operator > > > > >>> configuration going forward, and would explode the > KStream/KTable API > > > > >>> surface area instead. Perhaps this was discussed during the > previous > > > > >>> iteration of this KIP, or I'm missing something here, so I just > > > wanted > > > > to > > > > >>> put this out there and see what people think > > > > >>> > > > > >>> Either way, thanks for picking up this KIP. It's been a long time > > > > coming > > > > >> :) > > > > >>> > > > > >>> -Sophie > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> > wrote: > > > > >>> > > > > >>>> Hi all, > > > > >>>> > > > > >>>> It's been a few days so I went ahead with editing the KIP, the > main > > > > >> change > > > > >>>> is on the method name > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling > > > > >>>> . > > > > >>>> I will follow up with a VOTE separately. > > > > >>>> > > > > >>>> Best, > > > > >>>> Shay > > > > >>>> > > > > >>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax < > mj...@apache.org> > > > > >> wrote: > > > > >>>> > > > > >>>>> Shay, > > > > >>>>> > > > > >>>>> thanks for picking up this KIP. It's a pity that the discussion > > > > stalled > > > > >>>>> for such a long time. > > > > >>>>> > > > > >>>>> As expressed previously, I am happy with the name > > > > `markAsPartitioned()` > > > > >>>>> and also believe it's ok to just document the impact and leave > it > > > to > > > > >> the > > > > >>>>> user to do the right thing. > > > > >>>>> > > > > >>>>> If we really get a lot of users that ask about it, because > they did > > > > not > > > > >>>>> do the right thing, we could still add something (eg, a > > > > reverse-mapper > > > > >>>>> function) in a follow-up KIP. But we don't know if it's > necessary; > > > > >> thus, > > > > >>>>> making a small incremental step sounds like a good approach to > me. > > > > >>>>> > > > > >>>>> Let's see if others agree or not. > > > > >>>>> > > > > >>>>> > > > > >>>>> -Matthias > > > > >>>>> > > > > >>>>> On 6/28/23 5:29 PM, Shay Lin wrote: > > > > >>>>>> Hi all, > > > > >>>>>> > > > > >>>>>> Great discussion thread. May I take this KIP up? If it’s > alright > > > my > > > > >>>> plan > > > > >>>>> is > > > > >>>>>> to update the KIP with the operator `markAsPartitioned()`. > > > > >>>>>> > > > > >>>>>> As you have discussed and pointed out, there are implications > to > > > > >>>>> downstream > > > > >>>>>> joins or aggregation operations. Still, the operator is > intended > > > for > > > > >>>>>> advanced users so my two cents is it would be a valuable > addition > > > > >>>>>> nonetheless. We could add this as a caution/consideration as > part > > > of > > > > >>>> the > > > > >>>>>> java doc. > > > > >>>>>> > > > > >>>>>> Let me know, thanks. > > > > >>>>>> Shay > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > >