Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating
between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case,
but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. Personally I'm fine either way (I do think ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi,
>
> sorry, I am late to the party.
>
> I have a couple of comments:
>
> (1)
> I would prefer Client* instead of Node* in the names. In Kafka Streams
> we do not really have the concept of node but we have the concept of
> client (admittedly, we sometimes also use instance). I would like to
> avoid introducing a new term to basically describe the Streams client.
> I know that we already have a ClientState but that would be in a
> different package.
>
> (2)
> Did you consider to use Instant instead of long as return type of
> followupRebalanceDeadline()? Instant is a bit more flexible and readable
> as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in
> interface NodeAssignment.
>
> (3)
> Did you consider to use an enum instead of class AssignedTask? As far as
> I understand not all combinations are possible. A stateless standby task
> does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
> be clearer. Or even better instead of two methods in AssignedTask that
> return a boolean you could have one method -- say type() -- that returns
> the enum.
>
> (4)
> Does the return type of assignment need to be a map from task ID to
> AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks
> with AssignedTask containing the task ID?
>
> (5)
> I there a semantic difference between *State and *Metadata? I was
> wondering whether ApplicationMetadata could also be ApplicationState for
> the sake of consistency.
>
> Best,
> Bruno
>
>
> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
> > Cool, looks good to me!
> >
> > Seems like there is no further feedback, so maybe we can start to call
> for
> > a vote?
> >
> > However, since as noted we are setting aside time to discuss this during
> > the sync next Thursday, we can also wait until after that meeting to
> > officially kick off the vote.
> >
> > On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <desai.p.ro...@gmail.com>
> wrote:
> >
> >> Thanks for the feedback Sophie!
> >>
> >> re1: Totally agree. The fact that it's related to the partition
> assignor is
> >> clear from just `task.assignor`. I'll update.
> >> re3: This is a good point, and something I would find useful
> personally. I
> >> think its worth adding an interface that lets the plugin observe the
> final
> >> assignment. I'll add that.
> >> re4: I like the new `NodeAssignment` type. I'll update the KIP with
> that.
> >>
> >> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <desai.p.ro...@gmail.com>
> >> wrote:
> >>
> >>> Thanks for the feedback so far! I think pretty much all of it is
> >>> reasonable. I'll reply to it inline:
> >>>
> >>>> 1. All the API logic is granular at the Task level, except the
> >>> previousOwnerForPartition func. I’m not clear what’s the motivation
> >> behind
> >>> it, does our controller also want to change how the partitions->tasks
> >>> mapping is formed?
> >>> You're right that this is out of place. I've removed this method as
> it's
> >>> not needed by the task assignor.
> >>>
> >>>> 2. Just on the API layering itself: it feels a bit weird to have the
> >>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
> >> the
> >>> ApplicationMetadata class. If we consider them as some default util
> >>> functions, how about introducing moving those into their own static
> util
> >>> methods to separate from the ApplicationMetadata “fact objects” ?
> >>> Agreed. Updated in the latest revision of the kip. These have been
> moved
> >>> to TaskAssignorUtils
> >>>
> >>>> 3. I personally prefer `NodeAssignment` to be a read-only object
> >>> containing the decisions made by the assignor, including the
> >>> requestFollowupRebalance flag. For manipulating the half-baked results
> >>> inside the assignor itself, maybe we can just be flexible to let users
> >> use
> >>> whatever struts / their own classes even, if they like. WDYT?
> >>> Agreed. Updated in the latest version of the kip.
> >>>
> >>>> 1. For the API, thoughts on changing the method signature to return a
> >>> (non-Optional) TaskAssignor? Then we can either have the default
> >>> implementation return new HighAvailabilityTaskAssignor or just have a
> >>> default implementation class that people can extend if they don't want
> to
> >>> implement every method.
> >>> Based on some other discussion, I actually decided to get rid of the
> >>> plugin interface, and instead use config to specify individual plugin
> >>> behaviour. So the method you're referring to is no longer part of the
> >>> proposal.
> >>>
> >>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read only
> but
> >>> theres methods that return void on it? It's not totally clear to me how
> >>> that interface is supposed to be used by the assignor. It'd be nice if
> we
> >>> could flip that interface such that it becomes part of the output
> instead
> >>> of an input to the plugin.
> >>> I've moved those methods to a util class. They're really utility
> methods
> >>> the assignor might want to call to do some default or optimized
> >> assignment
> >>> for some cases like rack-awareness.
> >>>
> >>>> 4. We should consider wrapping UUID in a ProcessID class so that we
> >>> control
> >>> the interface (there are a few places where UUID is directly used).
> >>> I like it. Updated the proposal.
> >>>
> >>>> 5. What does NodeState#newAssignmentForNode() do? I thought the point
> >>> was
> >>> for the plugin to make the assignment? Is that the result of the
> default
> >>> logic?
> >>> It doesn't need to be part of the interface. I've removed it.
> >>>
> >>>> re 2/6:
> >>>
> >>> I generally agree with these points, but I'd rather hash that out in a
> PR
> >>> than in the KIP review, as it'll be clearer what gets used how. It
> seems
> >> to
> >>> me (committers please correct me if I'm wrong) that as long as we're on
> >> the
> >>> same page about what information the interfaces are returning, that's
> ok
> >> at
> >>> this level of discussion.
> >>>
> >>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai <desai.p.ro...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello All,
> >>>>
> >>>> I'd like to start a discussion on KIP-924 (
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> >> )
> >>>> which proposes an interface to allow users to plug into the streams
> >>>> partition assignor. The motivation section in the KIP goes into some
> >> more
> >>>> detail on why we think this is a useful addition. Thanks in advance
> for
> >>>> your feedback!
> >>>>
> >>>> Best Regards,
> >>>>
> >>>> Rohan
> >>>>
> >>>>
> >>
> >
>

Reply via email to