Responding to Bruno first:

(1) I actually think "KafkaStreams" is exactly right here -- for the reason
you said, ultimately this is describing a literal instance of the
"KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went with
StreamsClient but i also prefer KafkaStreams)

(4) Rohan is  right about what I was saying -- but I'm now realizing that I
completely misinterpreted what your concern was. Sorry for the long-winded
and ultimately irrelevant answer. I'm completely fine with having the
return type be a simple Set with additional info such as TaskId in the
AssignedTask class (and I see Rohan already made this change so we're all
good)

(5) I don't insist either way :)   ApplicationState works for me

On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax <mj...@apache.org> wrote:

> One more thing. It might be good to clearly call out, which interfaced a
> user would implement, vs the other ones Kafka Streams implements and
> TaskAssignor only uses.
>
> My understanding is, that users would implement `TaskAssignor`,
> `TaskAssignment`, and `StreamsClientAssignment`.
>
> For `AssignedTask` it seems that users would actually only need to
> instantiate them. Should we add a public constructor?
>
> Also wondering if we should add an empty default implementation for
> `onAssignmentComputed()` as it seems not to be strictly necessary to use
> this method?
>
>
> -Matthias
>
> On 4/19/24 7:30 PM, Matthias J. Sax wrote:
> > Great KIP. I have some minor comments/questions:
> >
> >
> > 100 The KIP says: "In the future, additional plugins can use the same
> > partition.assignor  prefix". What does this mean?
> >
> >
> > 101 (nit) The KIP says: "Note that the thread-level assignment will
> > remain an un-configurable internal implementation detail of the
> > partition assignor (see "Rejected Alternatives" for further thoughts and
> > reasoning)." -- When I was reading this the first time, I did not
> > understand it, and it did only become clear later (eg while reading the
> > discussion thread). I think it would be good to be a little bit more
> > explicit, because this is not just some minor thing, but a core design
> > decision (which I, btw, support).
> >
> >
> > 102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).
> >
> >
> > 103 (nit): "new non-internal package" -> replace 'non-internal' with
> > 'public' :)
> >
> >
> > 104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems
> > to be a little bit clumsy? I kinda like the original `finalAssignment()`
> > -- I would also be happy with `onFinalAssignment` to address Bruno's
> > line of thinking (which I think is a good call out). (Btw:
> > `finalAssignment` is still used in the text on the KIP and should also
> > be updated.)
> >
> >
> > 105: Please remove all `private` variables. We should only show public
> > stuff on the KIP. Everything else is an implementation detail.
> >
> >
> > 106: `TaskAssignment#numStreamsClients()` -- why do we need this method?
> > Seems calling `assignment()` gives as a collection and we can just call
> > size() on it to get the same value? -- Also, why do we explicitly call
> > out the overwrite of `toString()`; seems unnecessary?
> >
> >
> > 107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
> > number of StreamThreads on this client, which is equal to the number of
> > main consumers and represents its overall capacity." -- Given our
> > planned thread refactoring, this might not hold correct for long (and I
> > am sure we will forget to updated the JavaDocs later). Talking to Lucas
> > the plan is to cut down `StreamsThread` to host the consumer (and there
> > will be only one, and it won't be configurable any longer), and we would
> > introduce a number of configurable "processing threads". Can/should we
> > build this API in a forward looking manner?
> >
> >
> > 108: Why do we need
> > `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
> > this would be useful?
> >
> >
> > 109 `StreamsClientState#consumers`: should we rename this to
> > `#consumerClientIds()`?
> >
> >
> > 110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc
> > says 'owned by consumers on this node' -- Should we just say `owned by
> > the Streams client`?
> >
> >
> > 111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer`
> > parameter -- not clear what this is -- I guess it's a consumer's
> > client.id? If yes, should we rename the parameter `consumerClientId`?
> >
> >
> > 112 `ApplicationState`: what is the reason to have `allTasks()` and
> > `stafefulTasks() -- why not have `statelessTasks()` and
> > `statefulTasks()` instead? Or all three?
> >
> >
> > 113 `ApplicationState#computeTaskLags()`: I understand the indent/reason
> > why we have this one, but it seems to be somewhat difficult to use
> > correctly, as it triggers an internal side-effect... Would it be
> > possible to replace this method in favor of passing in a `boolean
> > computeTaskLag` parameter into #streamClientState() instead, what might
> > make it less error prone to use, as it seems the returned
> > `StreamsClient` object would be modified when calling #computeTaskTags()
> > and thus both are related to each other?
> >
> >
> > 114 nit/typo: `ApplicationState#streamsClientStates()` returns
> > `StreamsClientState` not `StreamsClient`.
> >
> >
> > 115 `StreamsAssignorRetryableException`: not sure if I fully understand
> > the purpose of this exception.
> >
> >
> > 116 "No actual changes to functionality": allowing to plug in customer
> > TaskAssignor sounds like adding new functionality. Can we rephrase this?
> >
> >
> >
> > 117: What happens if the returned assignment is "invalid" -- for
> > example, a task might not have been assigned, or is assigned to two
> > nodes? Or a standby is assigned to the same node as its active? Or a
> > `StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if
> > this list of potential issues is complete or not...)
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 4/18/24 2:05 AM, Bruno Cadonna wrote:
> >> Hi Sophie,
> >>
> >> Thanks for the clarifications!
> >>
> >> (1)
> >> What about replacing Node* with KafkaStreams* or StreamsClient*? I
> >> prefer KafkaStreams* since that class represents the Kafka Streams
> >> client. I am also fine with KafkaStreamsClient*. I really would like
> >> to avoid introducing a new term in Kafka Streams for which we already
> >> have an equivalent term even if it is used on the brokers since that
> >> is a different level of abstraction. Additionally, I have never been a
> >> big fan of the term "instance".
> >>
> >> (4)
> >> I think the question is if we need to retrieve assignment metadata by
> >> task for a Kafka client or if it is enough to iterate over the
> >> assigned tasks. Could you explain why we cannot add additional
> >> metadata to the class AssignedTask?
> >> The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) )
> >> could be something like
> >>
> >> public interface NodeAssignment {
> >>      ProcessID processId();
> >>
> >>      Instant followupRebalanceDeadline();
> >>
> >>      Set<AssignedTask> assignment();
> >>
> >>      enum AssignedTaskType {
> >>      STATELESS,
> >>          STATEFUL,
> >>          STANDBY
> >>      }
> >>
> >>      static class AssignedTask {
> >>          AssignedTaskType type();
> >>          TaskId id();
> >>
> >>          ... other metadata needed in future
> >>      }
> >> }
> >> If we need to retrieve assigned task by task ID, maybe it is better to
> >> add methods like assignedFor(TaskId) and not to expose the Map.
> >>
> >> (5)
> >> I am in favor of ApplicationState but I am also fine
> >> ApplicationMetadata if you insist.
> >>
> >> (6)
> >> Is
> >>
> >> void finalAssignment(GroupAssignment assignment, GroupSubscription
> >> subscription);
> >>
> >> kind of a callback? If yes, would it make sense to call it
> >> onAssignmentComputed()?
> >>
> >>
> >> (7)
> >> What do you think of changing the TaskAssignmentUtils signatures to
> >>
> >> public static TaskAssignment default*Assignment(final ApplicationState
> >> applicationState, final TaskAssignment taskAssignment, ...) {...}
> >>
> >> to avoid to mutate the assignment in place?
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >> On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Bruno! I can provide a bit of context behind some of these
> >>> decisions but I just want to say up front that I agree with every
> >>> single one
> >>> of your points, though I am going to push back a bit on the first one.
> >>>
> >>> [1] The idea here is to help avoid some confusion around the overloaded
> >>> term "client", which can mean either "an instance of Kafka Streams" or
> >>> "a consumer/producer client". The problem is that the former applies to
> >>> the entire Streams process and therefore should be interpreted as "all
> >>> of the StreamThread on an instance" whereas the latter is typically
> used
> >>> interchangeably to mean the consumer client in the consumer group,
> >>> which implies a scope of just a single StreamThread on an instance.
> >>> The "Node" name here was an attempt to clear this up, since
> >>> differentiating
> >>> between instance and thread level is critical to understanding and
> >>> properly
> >>> implementing the custom assignor.
> >>>
> >>> I do see what you mean about there not being a concept of Node in the
> >>> Kafka Streams codebase, and that we usually do use "instance" when we
> >>> need to differentiate between consumer client/one StreamThread and
> >>> Kafka Streams client/all StreamThreads. As I'm typing this I'm
> >>> convincing
> >>> myself even more that we shouldn't just use "Client" without further
> >>> distinction, but I'm not sure "Node" has to be the answer either.
> >>>
> >>> Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
> >>> I honestly do still like Node personally, and don't see what's wrong
> >>> with
> >>> introducing a new term since the "node" terminology is used heavily
> >>> on the broker side and it means effectively the same thing in theory.
> >>> But if we can't compromise between "Node" and "Client" then maybe
> >>> we can settle on "Instance"? (Does feel a bit wordy too...maybe
> >>> "Process"?)
> >>>
> >>> [2] Good catch(es). Makes sense to me
> >>>
> >>> [3] Totally agree, a single enum makes way more sense
> >>>
> >>> [4] Here again I can provide some background -- this is actually
> >>> following
> >>> a pattern that we used when refactoring the old PartitionAssignor into
> >>> the new (at the time) Consumer PartitionAssignor interface. The idea
> was
> >>> to wrap the return type to protect the assign method in case we ever
> >>> wanted
> >>> to add something to what was returned, such as metadata for the entire
> >>> group. This way we could avoid a massively disruptive deprecation-and-
> >>> migration cycle for everyone who implements a custom assignor.
> >>> That said, I just checked the GroupAssignment class we added for this
> >>> in the ConsumerPartitionAssignor interface, and to this day we've never
> >>> added anything other that the map of consumer client to assignment.
> >>>
> >>> So maybe that was overly cautious. I'd be ok with flattening this map
> >>> out.
> >>> I guess the question is just, can we imagine any case in which we might
> >>> want the custom assignor to return additional metadata? To be honest
> >>> I think this might be more likely than with the plain consumer client
> >>> case,
> >>> but again, I'm totally fine with just flattening it to a plain map
> >>> return
> >>> type
> >>>
> >>> [5] I guess not. I think ApplicationMetadata was added during the
> >>> initial
> >>> KIP discussion so that's probably why it doesn't follow the same naming
> >>> pattern. Personally I'm fine either way (I do think ApplicationMetadata
> >>> sounds a bit better but that's not a good enough reason :P)
> >>>
> >>> Thanks Bruno!
> >>>
> >>> On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> sorry, I am late to the party.
> >>>>
> >>>> I have a couple of comments:
> >>>>
> >>>> (1)
> >>>> I would prefer Client* instead of Node* in the names. In Kafka Streams
> >>>> we do not really have the concept of node but we have the concept of
> >>>> client (admittedly, we sometimes also use instance). I would like to
> >>>> avoid introducing a new term to basically describe the Streams client.
> >>>> I know that we already have a ClientState but that would be in a
> >>>> different package.
> >>>>
> >>>> (2)
> >>>> Did you consider to use Instant instead of long as return type of
> >>>> followupRebalanceDeadline()? Instant is a bit more flexible and
> >>>> readable
> >>>> as a plain long, IMO. BTW, you list followupRebalanceDeadline()
> >>>> twice in
> >>>> interface NodeAssignment.
> >>>>
> >>>> (3)
> >>>> Did you consider to use an enum instead of class AssignedTask? As
> >>>> far as
> >>>> I understand not all combinations are possible. A stateless standby
> >>>> task
> >>>> does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
> >>>> be clearer. Or even better instead of two methods in AssignedTask that
> >>>> return a boolean you could have one method -- say type() -- that
> >>>> returns
> >>>> the enum.
> >>>>
> >>>> (4)
> >>>> Does the return type of assignment need to be a map from task ID to
> >>>> AssignedTask? Wouldn't it be enough to be a collection of
> AssignedTasks
> >>>> with AssignedTask containing the task ID?
> >>>>
> >>>> (5)
> >>>> I there a semantic difference between *State and *Metadata? I was
> >>>> wondering whether ApplicationMetadata could also be ApplicationState
> >>>> for
> >>>> the sake of consistency.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>>
> >>>> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
> >>>>> Cool, looks good to me!
> >>>>>
> >>>>> Seems like there is no further feedback, so maybe we can start to
> call
> >>>> for
> >>>>> a vote?
> >>>>>
> >>>>> However, since as noted we are setting aside time to discuss this
> >>>>> during
> >>>>> the sync next Thursday, we can also wait until after that meeting to
> >>>>> officially kick off the vote.
> >>>>>
> >>>>> On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <desai.p.ro...@gmail.com
> >
> >>>> wrote:
> >>>>>
> >>>>>> Thanks for the feedback Sophie!
> >>>>>>
> >>>>>> re1: Totally agree. The fact that it's related to the partition
> >>>> assignor is
> >>>>>> clear from just `task.assignor`. I'll update.
> >>>>>> re3: This is a good point, and something I would find useful
> >>>> personally. I
> >>>>>> think its worth adding an interface that lets the plugin observe the
> >>>> final
> >>>>>> assignment. I'll add that.
> >>>>>> re4: I like the new `NodeAssignment` type. I'll update the KIP with
> >>>> that.
> >>>>>>
> >>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <
> desai.p.ro...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the feedback so far! I think pretty much all of it is
> >>>>>>> reasonable. I'll reply to it inline:
> >>>>>>>
> >>>>>>>> 1. All the API logic is granular at the Task level, except the
> >>>>>>> previousOwnerForPartition func. I’m not clear what’s the motivation
> >>>>>> behind
> >>>>>>> it, does our controller also want to change how the
> >>>>>>> partitions->tasks
> >>>>>>> mapping is formed?
> >>>>>>> You're right that this is out of place. I've removed this method as
> >>>> it's
> >>>>>>> not needed by the task assignor.
> >>>>>>>
> >>>>>>>> 2. Just on the API layering itself: it feels a bit weird to have
> >>>>>>>> the
> >>>>>>> three built-in functions (defaultStandbyTaskAssignment etc)
> >>>>>>> sitting in
> >>>>>> the
> >>>>>>> ApplicationMetadata class. If we consider them as some default util
> >>>>>>> functions, how about introducing moving those into their own static
> >>>> util
> >>>>>>> methods to separate from the ApplicationMetadata “fact objects” ?
> >>>>>>> Agreed. Updated in the latest revision of the kip. These have been
> >>>> moved
> >>>>>>> to TaskAssignorUtils
> >>>>>>>
> >>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only object
> >>>>>>> containing the decisions made by the assignor, including the
> >>>>>>> requestFollowupRebalance flag. For manipulating the half-baked
> >>>>>>> results
> >>>>>>> inside the assignor itself, maybe we can just be flexible to let
> >>>>>>> users
> >>>>>> use
> >>>>>>> whatever struts / their own classes even, if they like. WDYT?
> >>>>>>> Agreed. Updated in the latest version of the kip.
> >>>>>>>
> >>>>>>>> 1. For the API, thoughts on changing the method signature to
> >>>>>>>> return a
> >>>>>>> (non-Optional) TaskAssignor? Then we can either have the default
> >>>>>>> implementation return new HighAvailabilityTaskAssignor or just
> >>>>>>> have a
> >>>>>>> default implementation class that people can extend if they don't
> >>>>>>> want
> >>>> to
> >>>>>>> implement every method.
> >>>>>>> Based on some other discussion, I actually decided to get rid of
> the
> >>>>>>> plugin interface, and instead use config to specify individual
> >>>>>>> plugin
> >>>>>>> behaviour. So the method you're referring to is no longer part of
> >>>>>>> the
> >>>>>>> proposal.
> >>>>>>>
> >>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read
> only
> >>>> but
> >>>>>>> theres methods that return void on it? It's not totally clear to
> >>>>>>> me how
> >>>>>>> that interface is supposed to be used by the assignor. It'd be
> >>>>>>> nice if
> >>>> we
> >>>>>>> could flip that interface such that it becomes part of the output
> >>>> instead
> >>>>>>> of an input to the plugin.
> >>>>>>> I've moved those methods to a util class. They're really utility
> >>>> methods
> >>>>>>> the assignor might want to call to do some default or optimized
> >>>>>> assignment
> >>>>>>> for some cases like rack-awareness.
> >>>>>>>
> >>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so that
> we
> >>>>>>> control
> >>>>>>> the interface (there are a few places where UUID is directly used).
> >>>>>>> I like it. Updated the proposal.
> >>>>>>>
> >>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I thought the
> >>>>>>>> point
> >>>>>>> was
> >>>>>>> for the plugin to make the assignment? Is that the result of the
> >>>> default
> >>>>>>> logic?
> >>>>>>> It doesn't need to be part of the interface. I've removed it.
> >>>>>>>
> >>>>>>>> re 2/6:
> >>>>>>>
> >>>>>>> I generally agree with these points, but I'd rather hash that out
> >>>>>>> in a
> >>>> PR
> >>>>>>> than in the KIP review, as it'll be clearer what gets used how. It
> >>>> seems
> >>>>>> to
> >>>>>>> me (committers please correct me if I'm wrong) that as long as
> >>>>>>> we're on
> >>>>>> the
> >>>>>>> same page about what information the interfaces are returning,
> >>>>>>> that's
> >>>> ok
> >>>>>> at
> >>>>>>> this level of discussion.
> >>>>>>>
> >>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
> >>>>>>> <desai.p.ro...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello All,
> >>>>>>>>
> >>>>>>>> I'd like to start a discussion on KIP-924 (
> >>>>>>>>
> >>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> >>>>>> )
> >>>>>>>> which proposes an interface to allow users to plug into the
> streams
> >>>>>>>> partition assignor. The motivation section in the KIP goes into
> >>>>>>>> some
> >>>>>> more
> >>>>>>>> detail on why we think this is a useful addition. Thanks in
> advance
> >>>> for
> >>>>>>>> your feedback!
> >>>>>>>>
> >>>>>>>> Best Regards,
> >>>>>>>>
> >>>>>>>> Rohan
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
>

Reply via email to