Responding to Bruno first: (1) I actually think "KafkaStreams" is exactly right here -- for the reason you said, ultimately this is describing a literal instance of the "KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went with StreamsClient but i also prefer KafkaStreams)
(4) Rohan is right about what I was saying -- but I'm now realizing that I completely misinterpreted what your concern was. Sorry for the long-winded and ultimately irrelevant answer. I'm completely fine with having the return type be a simple Set with additional info such as TaskId in the AssignedTask class (and I see Rohan already made this change so we're all good) (5) I don't insist either way :) ApplicationState works for me On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax <mj...@apache.org> wrote: > One more thing. It might be good to clearly call out, which interfaced a > user would implement, vs the other ones Kafka Streams implements and > TaskAssignor only uses. > > My understanding is, that users would implement `TaskAssignor`, > `TaskAssignment`, and `StreamsClientAssignment`. > > For `AssignedTask` it seems that users would actually only need to > instantiate them. Should we add a public constructor? > > Also wondering if we should add an empty default implementation for > `onAssignmentComputed()` as it seems not to be strictly necessary to use > this method? > > > -Matthias > > On 4/19/24 7:30 PM, Matthias J. Sax wrote: > > Great KIP. I have some minor comments/questions: > > > > > > 100 The KIP says: "In the future, additional plugins can use the same > > partition.assignor prefix". What does this mean? > > > > > > 101 (nit) The KIP says: "Note that the thread-level assignment will > > remain an un-configurable internal implementation detail of the > > partition assignor (see "Rejected Alternatives" for further thoughts and > > reasoning)." -- When I was reading this the first time, I did not > > understand it, and it did only become clear later (eg while reading the > > discussion thread). I think it would be good to be a little bit more > > explicit, because this is not just some minor thing, but a core design > > decision (which I, btw, support). > > > > > > 102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text). > > > > > > 103 (nit): "new non-internal package" -> replace 'non-internal' with > > 'public' :) > > > > > > 104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems > > to be a little bit clumsy? I kinda like the original `finalAssignment()` > > -- I would also be happy with `onFinalAssignment` to address Bruno's > > line of thinking (which I think is a good call out). (Btw: > > `finalAssignment` is still used in the text on the KIP and should also > > be updated.) > > > > > > 105: Please remove all `private` variables. We should only show public > > stuff on the KIP. Everything else is an implementation detail. > > > > > > 106: `TaskAssignment#numStreamsClients()` -- why do we need this method? > > Seems calling `assignment()` gives as a collection and we can just call > > size() on it to get the same value? -- Also, why do we explicitly call > > out the overwrite of `toString()`; seems unnecessary? > > > > > > 107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the > > number of StreamThreads on this client, which is equal to the number of > > main consumers and represents its overall capacity." -- Given our > > planned thread refactoring, this might not hold correct for long (and I > > am sure we will forget to updated the JavaDocs later). Talking to Lucas > > the plan is to cut down `StreamsThread` to host the consumer (and there > > will be only one, and it won't be configurable any longer), and we would > > introduce a number of configurable "processing threads". Can/should we > > build this API in a forward looking manner? > > > > > > 108: Why do we need > > `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how > > this would be useful? > > > > > > 109 `StreamsClientState#consumers`: should we rename this to > > `#consumerClientIds()`? > > > > > > 110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc > > says 'owned by consumers on this node' -- Should we just say `owned by > > the Streams client`? > > > > > > 111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` > > parameter -- not clear what this is -- I guess it's a consumer's > > client.id? If yes, should we rename the parameter `consumerClientId`? > > > > > > 112 `ApplicationState`: what is the reason to have `allTasks()` and > > `stafefulTasks() -- why not have `statelessTasks()` and > > `statefulTasks()` instead? Or all three? > > > > > > 113 `ApplicationState#computeTaskLags()`: I understand the indent/reason > > why we have this one, but it seems to be somewhat difficult to use > > correctly, as it triggers an internal side-effect... Would it be > > possible to replace this method in favor of passing in a `boolean > > computeTaskLag` parameter into #streamClientState() instead, what might > > make it less error prone to use, as it seems the returned > > `StreamsClient` object would be modified when calling #computeTaskTags() > > and thus both are related to each other? > > > > > > 114 nit/typo: `ApplicationState#streamsClientStates()` returns > > `StreamsClientState` not `StreamsClient`. > > > > > > 115 `StreamsAssignorRetryableException`: not sure if I fully understand > > the purpose of this exception. > > > > > > 116 "No actual changes to functionality": allowing to plug in customer > > TaskAssignor sounds like adding new functionality. Can we rephrase this? > > > > > > > > 117: What happens if the returned assignment is "invalid" -- for > > example, a task might not have been assigned, or is assigned to two > > nodes? Or a standby is assigned to the same node as its active? Or a > > `StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if > > this list of potential issues is complete or not...) > > > > > > > > -Matthias > > > > > > > > On 4/18/24 2:05 AM, Bruno Cadonna wrote: > >> Hi Sophie, > >> > >> Thanks for the clarifications! > >> > >> (1) > >> What about replacing Node* with KafkaStreams* or StreamsClient*? I > >> prefer KafkaStreams* since that class represents the Kafka Streams > >> client. I am also fine with KafkaStreamsClient*. I really would like > >> to avoid introducing a new term in Kafka Streams for which we already > >> have an equivalent term even if it is used on the brokers since that > >> is a different level of abstraction. Additionally, I have never been a > >> big fan of the term "instance". > >> > >> (4) > >> I think the question is if we need to retrieve assignment metadata by > >> task for a Kafka client or if it is enough to iterate over the > >> assigned tasks. Could you explain why we cannot add additional > >> metadata to the class AssignedTask? > >> The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) ) > >> could be something like > >> > >> public interface NodeAssignment { > >> ProcessID processId(); > >> > >> Instant followupRebalanceDeadline(); > >> > >> Set<AssignedTask> assignment(); > >> > >> enum AssignedTaskType { > >> STATELESS, > >> STATEFUL, > >> STANDBY > >> } > >> > >> static class AssignedTask { > >> AssignedTaskType type(); > >> TaskId id(); > >> > >> ... other metadata needed in future > >> } > >> } > >> If we need to retrieve assigned task by task ID, maybe it is better to > >> add methods like assignedFor(TaskId) and not to expose the Map. > >> > >> (5) > >> I am in favor of ApplicationState but I am also fine > >> ApplicationMetadata if you insist. > >> > >> (6) > >> Is > >> > >> void finalAssignment(GroupAssignment assignment, GroupSubscription > >> subscription); > >> > >> kind of a callback? If yes, would it make sense to call it > >> onAssignmentComputed()? > >> > >> > >> (7) > >> What do you think of changing the TaskAssignmentUtils signatures to > >> > >> public static TaskAssignment default*Assignment(final ApplicationState > >> applicationState, final TaskAssignment taskAssignment, ...) {...} > >> > >> to avoid to mutate the assignment in place? > >> > >> > >> Best, > >> Bruno > >> > >> On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote: > >>> Thanks Bruno! I can provide a bit of context behind some of these > >>> decisions but I just want to say up front that I agree with every > >>> single one > >>> of your points, though I am going to push back a bit on the first one. > >>> > >>> [1] The idea here is to help avoid some confusion around the overloaded > >>> term "client", which can mean either "an instance of Kafka Streams" or > >>> "a consumer/producer client". The problem is that the former applies to > >>> the entire Streams process and therefore should be interpreted as "all > >>> of the StreamThread on an instance" whereas the latter is typically > used > >>> interchangeably to mean the consumer client in the consumer group, > >>> which implies a scope of just a single StreamThread on an instance. > >>> The "Node" name here was an attempt to clear this up, since > >>> differentiating > >>> between instance and thread level is critical to understanding and > >>> properly > >>> implementing the custom assignor. > >>> > >>> I do see what you mean about there not being a concept of Node in the > >>> Kafka Streams codebase, and that we usually do use "instance" when we > >>> need to differentiate between consumer client/one StreamThread and > >>> Kafka Streams client/all StreamThreads. As I'm typing this I'm > >>> convincing > >>> myself even more that we shouldn't just use "Client" without further > >>> distinction, but I'm not sure "Node" has to be the answer either. > >>> > >>> Could we replace "Node" with "KafkaStreamsClient" or is that too wordy? > >>> I honestly do still like Node personally, and don't see what's wrong > >>> with > >>> introducing a new term since the "node" terminology is used heavily > >>> on the broker side and it means effectively the same thing in theory. > >>> But if we can't compromise between "Node" and "Client" then maybe > >>> we can settle on "Instance"? (Does feel a bit wordy too...maybe > >>> "Process"?) > >>> > >>> [2] Good catch(es). Makes sense to me > >>> > >>> [3] Totally agree, a single enum makes way more sense > >>> > >>> [4] Here again I can provide some background -- this is actually > >>> following > >>> a pattern that we used when refactoring the old PartitionAssignor into > >>> the new (at the time) Consumer PartitionAssignor interface. The idea > was > >>> to wrap the return type to protect the assign method in case we ever > >>> wanted > >>> to add something to what was returned, such as metadata for the entire > >>> group. This way we could avoid a massively disruptive deprecation-and- > >>> migration cycle for everyone who implements a custom assignor. > >>> That said, I just checked the GroupAssignment class we added for this > >>> in the ConsumerPartitionAssignor interface, and to this day we've never > >>> added anything other that the map of consumer client to assignment. > >>> > >>> So maybe that was overly cautious. I'd be ok with flattening this map > >>> out. > >>> I guess the question is just, can we imagine any case in which we might > >>> want the custom assignor to return additional metadata? To be honest > >>> I think this might be more likely than with the plain consumer client > >>> case, > >>> but again, I'm totally fine with just flattening it to a plain map > >>> return > >>> type > >>> > >>> [5] I guess not. I think ApplicationMetadata was added during the > >>> initial > >>> KIP discussion so that's probably why it doesn't follow the same naming > >>> pattern. Personally I'm fine either way (I do think ApplicationMetadata > >>> sounds a bit better but that's not a good enough reason :P) > >>> > >>> Thanks Bruno! > >>> > >>> On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> sorry, I am late to the party. > >>>> > >>>> I have a couple of comments: > >>>> > >>>> (1) > >>>> I would prefer Client* instead of Node* in the names. In Kafka Streams > >>>> we do not really have the concept of node but we have the concept of > >>>> client (admittedly, we sometimes also use instance). I would like to > >>>> avoid introducing a new term to basically describe the Streams client. > >>>> I know that we already have a ClientState but that would be in a > >>>> different package. > >>>> > >>>> (2) > >>>> Did you consider to use Instant instead of long as return type of > >>>> followupRebalanceDeadline()? Instant is a bit more flexible and > >>>> readable > >>>> as a plain long, IMO. BTW, you list followupRebalanceDeadline() > >>>> twice in > >>>> interface NodeAssignment. > >>>> > >>>> (3) > >>>> Did you consider to use an enum instead of class AssignedTask? As > >>>> far as > >>>> I understand not all combinations are possible. A stateless standby > >>>> task > >>>> does not exist. An enum with values STATELESS, STATEFUL, STANDBY would > >>>> be clearer. Or even better instead of two methods in AssignedTask that > >>>> return a boolean you could have one method -- say type() -- that > >>>> returns > >>>> the enum. > >>>> > >>>> (4) > >>>> Does the return type of assignment need to be a map from task ID to > >>>> AssignedTask? Wouldn't it be enough to be a collection of > AssignedTasks > >>>> with AssignedTask containing the task ID? > >>>> > >>>> (5) > >>>> I there a semantic difference between *State and *Metadata? I was > >>>> wondering whether ApplicationMetadata could also be ApplicationState > >>>> for > >>>> the sake of consistency. > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> > >>>> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote: > >>>>> Cool, looks good to me! > >>>>> > >>>>> Seems like there is no further feedback, so maybe we can start to > call > >>>> for > >>>>> a vote? > >>>>> > >>>>> However, since as noted we are setting aside time to discuss this > >>>>> during > >>>>> the sync next Thursday, we can also wait until after that meeting to > >>>>> officially kick off the vote. > >>>>> > >>>>> On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <desai.p.ro...@gmail.com > > > >>>> wrote: > >>>>> > >>>>>> Thanks for the feedback Sophie! > >>>>>> > >>>>>> re1: Totally agree. The fact that it's related to the partition > >>>> assignor is > >>>>>> clear from just `task.assignor`. I'll update. > >>>>>> re3: This is a good point, and something I would find useful > >>>> personally. I > >>>>>> think its worth adding an interface that lets the plugin observe the > >>>> final > >>>>>> assignment. I'll add that. > >>>>>> re4: I like the new `NodeAssignment` type. I'll update the KIP with > >>>> that. > >>>>>> > >>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai < > desai.p.ro...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Thanks for the feedback so far! I think pretty much all of it is > >>>>>>> reasonable. I'll reply to it inline: > >>>>>>> > >>>>>>>> 1. All the API logic is granular at the Task level, except the > >>>>>>> previousOwnerForPartition func. I’m not clear what’s the motivation > >>>>>> behind > >>>>>>> it, does our controller also want to change how the > >>>>>>> partitions->tasks > >>>>>>> mapping is formed? > >>>>>>> You're right that this is out of place. I've removed this method as > >>>> it's > >>>>>>> not needed by the task assignor. > >>>>>>> > >>>>>>>> 2. Just on the API layering itself: it feels a bit weird to have > >>>>>>>> the > >>>>>>> three built-in functions (defaultStandbyTaskAssignment etc) > >>>>>>> sitting in > >>>>>> the > >>>>>>> ApplicationMetadata class. If we consider them as some default util > >>>>>>> functions, how about introducing moving those into their own static > >>>> util > >>>>>>> methods to separate from the ApplicationMetadata “fact objects” ? > >>>>>>> Agreed. Updated in the latest revision of the kip. These have been > >>>> moved > >>>>>>> to TaskAssignorUtils > >>>>>>> > >>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only object > >>>>>>> containing the decisions made by the assignor, including the > >>>>>>> requestFollowupRebalance flag. For manipulating the half-baked > >>>>>>> results > >>>>>>> inside the assignor itself, maybe we can just be flexible to let > >>>>>>> users > >>>>>> use > >>>>>>> whatever struts / their own classes even, if they like. WDYT? > >>>>>>> Agreed. Updated in the latest version of the kip. > >>>>>>> > >>>>>>>> 1. For the API, thoughts on changing the method signature to > >>>>>>>> return a > >>>>>>> (non-Optional) TaskAssignor? Then we can either have the default > >>>>>>> implementation return new HighAvailabilityTaskAssignor or just > >>>>>>> have a > >>>>>>> default implementation class that people can extend if they don't > >>>>>>> want > >>>> to > >>>>>>> implement every method. > >>>>>>> Based on some other discussion, I actually decided to get rid of > the > >>>>>>> plugin interface, and instead use config to specify individual > >>>>>>> plugin > >>>>>>> behaviour. So the method you're referring to is no longer part of > >>>>>>> the > >>>>>>> proposal. > >>>>>>> > >>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read > only > >>>> but > >>>>>>> theres methods that return void on it? It's not totally clear to > >>>>>>> me how > >>>>>>> that interface is supposed to be used by the assignor. It'd be > >>>>>>> nice if > >>>> we > >>>>>>> could flip that interface such that it becomes part of the output > >>>> instead > >>>>>>> of an input to the plugin. > >>>>>>> I've moved those methods to a util class. They're really utility > >>>> methods > >>>>>>> the assignor might want to call to do some default or optimized > >>>>>> assignment > >>>>>>> for some cases like rack-awareness. > >>>>>>> > >>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so that > we > >>>>>>> control > >>>>>>> the interface (there are a few places where UUID is directly used). > >>>>>>> I like it. Updated the proposal. > >>>>>>> > >>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I thought the > >>>>>>>> point > >>>>>>> was > >>>>>>> for the plugin to make the assignment? Is that the result of the > >>>> default > >>>>>>> logic? > >>>>>>> It doesn't need to be part of the interface. I've removed it. > >>>>>>> > >>>>>>>> re 2/6: > >>>>>>> > >>>>>>> I generally agree with these points, but I'd rather hash that out > >>>>>>> in a > >>>> PR > >>>>>>> than in the KIP review, as it'll be clearer what gets used how. It > >>>> seems > >>>>>> to > >>>>>>> me (committers please correct me if I'm wrong) that as long as > >>>>>>> we're on > >>>>>> the > >>>>>>> same page about what information the interfaces are returning, > >>>>>>> that's > >>>> ok > >>>>>> at > >>>>>>> this level of discussion. > >>>>>>> > >>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai > >>>>>>> <desai.p.ro...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hello All, > >>>>>>>> > >>>>>>>> I'd like to start a discussion on KIP-924 ( > >>>>>>>> > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams > >>>>>> ) > >>>>>>>> which proposes an interface to allow users to plug into the > streams > >>>>>>>> partition assignor. The motivation section in the KIP goes into > >>>>>>>> some > >>>>>> more > >>>>>>>> detail on why we think this is a useful addition. Thanks in > advance > >>>> for > >>>>>>>> your feedback! > >>>>>>>> > >>>>>>>> Best Regards, > >>>>>>>> > >>>>>>>> Rohan > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> >