One more thing. It might be good to clearly call out, which interfaced a user would implement, vs the other ones Kafka Streams implements and TaskAssignor only uses.

My understanding is, that users would implement `TaskAssignor`, `TaskAssignment`, and `StreamsClientAssignment`.

For `AssignedTask` it seems that users would actually only need to instantiate them. Should we add a public constructor?

Also wondering if we should add an empty default implementation for `onAssignmentComputed()` as it seems not to be strictly necessary to use this method?


-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:
Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same partition.assignor  prefix". What does this mean?


101 (nit) The KIP says: "Note that the thread-level assignment will remain an un-configurable internal implementation detail of the partition assignor (see "Rejected Alternatives" for further thoughts and reasoning)." -- When I was reading this the first time, I did not understand it, and it did only become clear later (eg while reading the discussion thread). I think it would be good to be a little bit more explicit, because this is not just some minor thing, but a core design decision (which I, btw, support).


102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 'public' :)


104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems to be a little bit clumsy? I kinda like the original `finalAssignment()` -- I would also be happy with `onFinalAssignment` to address Bruno's line of thinking (which I think is a good call out). (Btw: `finalAssignment` is still used in the text on the KIP and should also be updated.)


105: Please remove all `private` variables. We should only show public stuff on the KIP. Everything else is an implementation detail.


106: `TaskAssignment#numStreamsClients()` -- why do we need this method? Seems calling `assignment()` gives as a collection and we can just call size() on it to get the same value? -- Also, why do we explicitly call out the overwrite of `toString()`; seems unnecessary?


107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the number of StreamThreads on this client, which is equal to the number of main consumers and represents its overall capacity." -- Given our planned thread refactoring, this might not hold correct for long (and I am sure we will forget to updated the JavaDocs later). Talking to Lucas the plan is to cut down `StreamsThread` to host the consumer (and there will be only one, and it won't be configurable any longer), and we would introduce a number of configurable "processing threads". Can/should we build this API in a forward looking manner?


108: Why do we need `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how this would be useful?


109 `StreamsClientState#consumers`: should we rename this to `#consumerClientIds()`?


110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc says 'owned by consumers on this node' -- Should we just say `owned by the Streams client`?


111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` parameter -- not clear what this is -- I guess it's a consumer's client.id? If yes, should we rename the parameter `consumerClientId`?


112 `ApplicationState`: what is the reason to have `allTasks()` and `stafefulTasks() -- why not have `statelessTasks()` and `statefulTasks()` instead? Or all three?


113 `ApplicationState#computeTaskLags()`: I understand the indent/reason why we have this one, but it seems to be somewhat difficult to use correctly, as it triggers an internal side-effect... Would it be possible to replace this method in favor of passing in a `boolean computeTaskLag` parameter into #streamClientState() instead, what might make it less error prone to use, as it seems the returned `StreamsClient` object would be modified when calling #computeTaskTags() and thus both are related to each other?


114 nit/typo: `ApplicationState#streamsClientStates()` returns `StreamsClientState` not `StreamsClient`.


115 `StreamsAssignorRetryableException`: not sure if I fully understand the purpose of this exception.


116 "No actual changes to functionality": allowing to plug in customer TaskAssignor sounds like adding new functionality. Can we rephrase this?



117: What happens if the returned assignment is "invalid" -- for example, a task might not have been assigned, or is assigned to two nodes? Or a standby is assigned to the same node as its active? Or a `StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if this list of potential issues is complete or not...)



-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:
Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I prefer KafkaStreams* since that class represents the Kafka Streams client. I am also fine with KafkaStreamsClient*. I really would like to avoid introducing a new term in Kafka Streams for which we already have an equivalent term even if it is used on the brokers since that is a different level of abstraction. Additionally, I have never been a big fan of the term "instance".

(4)
I think the question is if we need to retrieve assignment metadata by task for a Kafka client or if it is enough to iterate over the assigned tasks. Could you explain why we cannot add additional metadata to the class AssignedTask? The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) ) could be something like

public interface NodeAssignment {
     ProcessID processId();

     Instant followupRebalanceDeadline();

     Set<AssignedTask> assignment();

     enum AssignedTaskType {
     STATELESS,
         STATEFUL,
         STANDBY
     }

     static class AssignedTask {
         AssignedTaskType type();
         TaskId id();

         ... other metadata needed in future
     }
}
If we need to retrieve assigned task by task ID, maybe it is better to add methods like assignedFor(TaskId) and not to expose the Map.

(5)
I am in favor of ApplicationState but I am also fine ApplicationMetadata if you insist.

(6)
Is

void finalAssignment(GroupAssignment assignment, GroupSubscription subscription);

kind of a callback? If yes, would it make sense to call it onAssignmentComputed()?


(7)
What do you think of changing the TaskAssignmentUtils signatures to

public static TaskAssignment default*Assignment(final ApplicationState applicationState, final TaskAssignment taskAssignment, ...) {...}

to avoid to mutate the assignment in place?


Best,
Bruno

On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:
Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case, but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. Personally I'm fine either way (I do think ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams
we do not really have the concept of node but we have the concept of
client (admittedly, we sometimes also use instance). I would like to
avoid introducing a new term to basically describe the Streams client.
I know that we already have a ClientState but that would be in a
different package.

(2)
Did you consider to use Instant instead of long as return type of
followupRebalanceDeadline()? Instant is a bit more flexible and readable as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in
interface NodeAssignment.

(3)
Did you consider to use an enum instead of class AssignedTask? As far as I understand not all combinations are possible. A stateless standby task
does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
be clearer. Or even better instead of two methods in AssignedTask that
return a boolean you could have one method -- say type() -- that returns
the enum.

(4)
Does the return type of assignment need to be a map from task ID to
AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks
with AssignedTask containing the task ID?

(5)
I there a semantic difference between *State and *Metadata? I was
wondering whether ApplicationMetadata could also be ApplicationState for
the sake of consistency.

Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call
for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition
assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful
personally. I
think its worth adding an interface that lets the plugin observe the
final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with
that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:

1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation
behind
it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as
it's
not needed by the task assignor.

2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the
ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static
util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been
moved
to TaskAssignorUtils

3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results inside the assignor itself, maybe we can just be flexible to let users
use
whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.

1. For the API, thoughts on changing the method signature to return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a default implementation class that people can extend if they don't want
to
implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin behaviour. So the method you're referring to is no longer part of the
proposal.

3. Speaking of ApplicationMetadata, the javadoc says it's read only
but
theres methods that return void on it? It's not totally clear to me how that interface is supposed to be used by the assignor. It'd be nice if
we
could flip that interface such that it becomes part of the output
instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility
methods
the assignor might want to call to do some default or optimized
assignment
for some cases like rack-awareness.

4. We should consider wrapping UUID in a ProcessID class so that we
control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.

5. What does NodeState#newAssignmentForNode() do? I thought the point
was
for the plugin to make the assignment? Is that the result of the
default
logic?
It doesn't need to be part of the interface. I've removed it.

re 2/6:

I generally agree with these points, but I'd rather hash that out in a
PR
than in the KIP review, as it'll be clearer what gets used how. It
seems
to
me (committers please correct me if I'm wrong) that as long as we're on
the
same page about what information the interfaces are returning, that's
ok
at
this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai <desai.p.ro...@gmail.com>
wrote:

Hello All,

I'd like to start a discussion on KIP-924 (


https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
)
which proposes an interface to allow users to plug into the streams
partition assignor. The motivation section in the KIP goes into some
more
detail on why we think this is a useful addition. Thanks in advance
for
your feedback!

Best Regards,

Rohan






Reply via email to