Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams we do not really have the concept of node but we have the concept of client (admittedly, we sometimes also use instance). I would like to avoid introducing a new term to basically describe the Streams client. I know that we already have a ClientState but that would be in a different package.

(2)
Did you consider to use Instant instead of long as return type of followupRebalanceDeadline()? Instant is a bit more flexible and readable as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in interface NodeAssignment.

(3)
Did you consider to use an enum instead of class AssignedTask? As far as I understand not all combinations are possible. A stateless standby task does not exist. An enum with values STATELESS, STATEFUL, STANDBY would be clearer. Or even better instead of two methods in AssignedTask that return a boolean you could have one method -- say type() -- that returns the enum.

(4)
Does the return type of assignment need to be a map from task ID to AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks with AssignedTask containing the task ID?

(5)
I there a semantic difference between *State and *Metadata? I was wondering whether ApplicationMetadata could also be ApplicationState for the sake of consistency.

Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <desai.p.ro...@gmail.com> wrote:

Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:

1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation
behind
it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.

2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the
ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved
to TaskAssignorUtils

3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users
use
whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.

1. For the API, thoughts on changing the method signature to return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin
behaviour. So the method you're referring to is no longer part of the
proposal.

3. Speaking of ApplicationMetadata, the javadoc says it's read only but
theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized
assignment
for some cases like rack-awareness.

4. We should consider wrapping UUID in a ProcessID class so that we
control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.

5. What does NodeState#newAssignmentForNode() do? I thought the point
was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed it.

re 2/6:

I generally agree with these points, but I'd rather hash that out in a PR
than in the KIP review, as it'll be clearer what gets used how. It seems
to
me (committers please correct me if I'm wrong) that as long as we're on
the
same page about what information the interfaces are returning, that's ok
at
this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai <desai.p.ro...@gmail.com>
wrote:

Hello All,

I'd like to start a discussion on KIP-924 (

https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
)
which proposes an interface to allow users to plug into the streams
partition assignor. The motivation section in the KIP goes into some
more
detail on why we think this is a useful addition. Thanks in advance for
your feedback!

Best Regards,

Rohan




Reply via email to