Thanks for the details!

107: I like `numProcessingThreads()` proposal.


108(a): it was not really a concern, but it was rather a question if we could/should simplify it, so it's easier to implement a custom task assignor. But if we believe that it's an integral component, I am fine with leaving the responsibility to request follow-up rebalances to the user.

108(b): Similar to 108(a). But I am ok to give control to the user too.


115: SGTM.



-Matthias

On 4/25/24 3:45 PM, Sophie Blee-Goldman wrote:
104. Fair enough -- also happy to defer to Rohan on this (or Bruno if he
feels super strongly)

107. That's a good point . Ultimately the task load should reflect the
processing capacity, and that's something that will exist in both the new
and old threading model. I like #processingCapacity for the name. And for
the javadocs, I think we can just say something generic enough that it will
be accurate in both the old and new model and won't need to be updated
(since, as you mentioned before, we always forget to update random
javadocs).

As for the NOTE, however, I do feel that is necessary. At the very least,
it's necessary if we rename the method to something with the word
"capacity", since that tends to imply a "maximum limit" rather than a
"minimum load". In fact I think the original method was named capacity and
that's why I added this NOTE to the javadoc in the first place, to avoid
confusion. I can see why it doesn't make sense if the name doesn't have the
word "capacity" in it though.

But I do think #processingCapacity feels appropriate. I guess a reasonable
alternative would just be #numProcessingThreads, which is both descriptive
enough to not need the NOTE in the javadocs and also accurately describes
both the old one-consumer-per-StreamThread model and the new one (it's just
a matter of what "processing thread" means will change). So how about we
just call it #numProcessingThreads and in the javadocs we can say it's like
a capacity that should correspond to the relative "weight" of assigned
tasks relative to other KafkaStreams clients? WDYT?

108. Yes, I 100% believe that a custom assignor should be able to schedule
a followup rebalance:

108(1) For one thing, imo the HATaskAssignor shouldn't be a "special case"
but should effectively look just like any other custom assignor. And it
definitely needs the ability to schedule followups. How would the
StreamsPartitionAssignor figure out whether to schedule a followup
rebalance for the HAAssignor? From the StreamsPArtitionAssignor POV it just
sees "active" and "standby" tasks -- It doesn't know whether some of those
standby tasks are actually "warmup tasks". And anyways, I think users
should be able to request a followup rebalance. What is the concern over
giving custom assignors control over this? Imo if people abuse this and
shoot themselves in the foot by scheduling followup rebalances for no
reason, that's on them. As always -- "simple things should be easy,
difficult things should be possible"

108(2) This is a fair question, although I still believe we should give the
custom assignor full control over the scheduled followup rebalance
timeline. Again, because more advanced things should be possible -- and
it's not like having this API return an Instant makes things more
complicated for people who want to do simple things, users are free to
ignore this completely and returning an Instant doesn't feel more difficult
than returning an enum. So why restrict this?

To take a more specific example: let's say users have a complicated set of
metrics they use to determine task placement. Sometimes these metrics are
unavailable, in which case they want to schedule a followup rebalance but
may want to implement a backoff/retry rather than simply scheduling an
immediate followup. I know for a fact that immediate followup rebalances
triggered by Kafka Streams can actually cause issues in some cases (see
KAFKA-14382 <https://issues.apache.org/jira/browse/KAFKA-14382> and
KAFKA-14419 <https://issues.apache.org/jira/browse/KAFKA-14419> -- same
root cause but note that the second issue is still unresolved to this day,
and I know someone besides the issue reporter who has repeatedly been
affected by it). Giving users the ability to back off and schedule smarter
"immediate" followups if/when they run into issues seems like a sufficient
motivation to me. For another: perhaps these complex metrics are advanced
enough to be able to predict when a given task will be "warmed up" (to take
the HAAssignor example) or otherwise be able to compute an exact time at
which to cut over to a new task assignment. In this case it would be
necessary to have full flexibility over when the followup was triggered.

115. I see what you mean here. I was originally thinking that way, but was
worried that users might "accidentally" catch whatever exception we throw
if the task lag computation fails and not know how to handle it. But I
suppose we can just say in the javadocs that you can/should not catch it
and/or rethrow to allow Streams to recover and re-attempt. I agree we
should have Streams just handle this transparently for users and not
require them to rebuild the assignment on their own. I'll add this to the
javadocs -- and I don't think we need to introduce a new exception type
even. We have the "TaskAssignmentException" already which behaves similarly
now -- ie if there's an error during assignment, we throw this and return
the same assignment back and schedule a followup.

So how about we (a) put in the TaskAssignor#assign javadocs that if a
TaskAssignmentException is thrown, Streams will return the same assignment
and automatically schedule an immediate followup rebalance, and (b) note in
the #kafkaStreamsStates(boolean computeTaskLags) javadocs that this can
throw a TaskAssignmentException, which should be rethrown in order to retry
the rebalance. I think this is basically what you're saying here so I'll go
ahead and update the KIP with this but lmk if there's anything else

But coming back to what I said in 108(2) -- I do think it would be valuable
for users to be able to determine their own followup rebalance schedule,
even for "immediate" followup rebalances (again, as a way to mitigate
issues like KAFKA-14419 <https://issues.apache.org/jira/browse/KAFKA-14419>).
Let's say there's an error in someone's own assignor code, or again, it's
trying to make a decision based on some custom metrics that are
unavailable. In this case I feel the utility method would still be helpful,
even if we would handle it transparently for errors in things like the task
lag computation. It's a utility method -- there for people who want to use
it, everyone else can ignore it (and most users probably will). But for
those who want ultimate control (mwahaha) it's a nice-to-have so they don't
have to do *everything* from scratch. WDYT?

117. That's fair. I agree this is probably the most tricky of the open
questions, and I'm happy to defer. Although I do strongly believe that we
shouldn't call certain kinds of assignments "invalid" (such as a task not
being assigned to anyone). For the clearly-invalid assignment cases, I'd
err on the side of not holding users hands too much for now, but again:
would be happy to defer if anyone has another suggestion and/or strong
opinion.

On Wed, Apr 24, 2024 at 10:13 PM Matthias J. Sax <mj...@apache.org> wrote:

104: I also don't feel super strong about it. Not sure if
`onAssignment()` might overload the name in a confusing way? In the end,
when the method is called, we don't assign anything? -- Guess, I am fine
with whatever Rohan picks as a name from the suggestions we have so far.


107: Did not think about how to do it yet. Just raised the question to
see if I am even heading into the right direction or not... I did not
propose to remove the method; it's clear that we need it.

Thinking about it a little more, what we actually want to convey is a
certain "processing capacity" an instance has? Thus,
`numConsumerClients()` might not reflect this in the future? Should we
just generically call it `processingCapacity()` or similar and for now
explain in the JavaDocs that it maps to number of (currently running)
`StreamsThread` (currently running, because users can dynamically
add/remove them...). We can later update the JavaDocs when we have
"processing threads" and point to number of processing threads? Maybe
Lucas/Bruno can provide more input on what/how we plan the future
threading model and configs.

Nit: not sure if we need the "NOTE" section at all? If we think we want
it, maybe remove from the KIP and we can discuss in more detail on the
PR (think it could be improved). Don't think the JavaDocs on he KIP but
be 100% accurate to what we put into the code later.


108: I guess my question is two-fold. (1) Does user-code need to decide
to schedule a probing rebalance to begin with? Or could the
non-customizable part of `StreamsPartitionAssignor` decide it? (2) If
custom code really need to make this decision, why would it not just
return a boolean? It seems unnecessary to compute a deadline, given that
the probing rebalance interval is a config? -- Or maybe I am missing
something? If it's about regular probing rebalance vs immediate
rebalance vs no follow up, maybe an enum would do the trick?


115: Thanks for the explanation. This does make sense. I am wondering if
we need the new utility method though? Would it not be possible to
encapsulate all this inside the non-customizable code? The method
`kafkaStreamsStates(boolean computeTaskLags)` will be provided by us and
called by the user code. Thus, if we cannot compute the lag, we could
still throw an exception -- the user code does not need to know anything
about it, and is not supposed to catch this exception. Hence, it should
bubble up and get back to our code from
`TaskAssingor#assign(ApplicationState applicationState)` which is called
by us, and we can catch our own exception here, and do what we do
currently: we return the old assignment, and request an immediate follow
up rebalance? For this case, the user code does not need to know
anything about it, and does not need to do anything special, and it
would become a provided built-in feature what seems desirable?


117: not sure myself... Let's see what others think. I'll think about it
a little bit more and follow up again later. Its a tricky one.


-Matthias


On 4/24/24 5:08 PM, Sophie Blee-Goldman wrote:
Now to respond to Matthias:

FYI, I'm following the numbering scheme from your email but added **** to
mark responses with further questions or feedback and/or aren't yet
addressed in the KIP and need to be followed up on. You can more or less
just skip over the ones without stars to save time

100: I think this is leftover from a previous approach we considered.
Removed this line

101: I agree. Perhaps we hadn't fully committed to this decision when the
KIP was first written ;P Added a "Consumer Assignments" section under
"Public Changes" to address this more carefully and explicitly

102: fixed

103: fixed

****104: I do agree with Bruno on the structure of this callback name, ie
that it should start with "on", but any of the suggestions with that
sound
good to me. I really don't feel too strongly but just to throw in a bit
of
extra context, there is an analogous callback on the
ConsumerPartitionAssignor that is called #onAssignment. So personally I
would slightly prefer to just call it #onAssignment. However I'm happy to
go with whatever the consensus is -- @Bruno/@Matthias WDYT?

105: done (fyi I did leave the config doc string which is technically a
private variable but is part of the public contract)

106: good point about the numKafkaStreamsClients and toString methods --
removed those

****107: I guess this reflects how long it's been since the KIP was first
written :P But that's a fair point -- and yes, we should write this in a
forward looking manner. However, we can't build it in a way that's so
forward-looking that it doesn't work for the current version. Are you
proposing to remove this API altogether, or just rename it to
#numConsumerClients (or something like that) and update the javadocs
accordingly?  Assuming the latter, I totally agree, and have made the
change. But we definitely can't just remove it altogether (it may even be
relevant in the new threading model if we eventually allow configuring
the
number of consumer clients independently of the processing threads -- but
that's a different conversation. The important thing is that this KIP be
compatible with the old/current threading model, in which case we need
this
API).
Anyways, please take a look at the new javadocs and method name and lmk
if
that makes sense to you

****108: The #followupRebalanceDeadline allows the custom assignor to
request followup rebalances, for example in order to probe for
restoration
progress or other conditions for task assignment. This is fundamental to
the HighAvailabilityTaskAssignor (ie the default assignor) and may be
useful to custom assignors with similar such approaches. So it's
definitely
necessary -- are you asking why we have it at all, or why it's an API on
the KafkaStreamsAssignment class and not, say, the TaskAssignment class?

109: Makes sense to me -- done

110: ack -- updated all mentions of "node" to "KafkaStreams client"

111: ack -- updated name from "consumer" to "consumerClientId"

112: that's fair -- I do think it's valuable to have "#allTasks" since
some
assignors may not care about the stateful vs stateless distinction, but
it's weird to have #statefulTasks without #statelessTasks. Let's just
have
all three. Added to the KIP

113: this makes sense to me. Updated computeTaskLags  to be an input
parameter instead of a mutating API. Also noted that it can throw a
TimeoutException in this case (this is relevant for point 115 below)

114: fixed

****115: Reasonable question -- I think the way it's described right now
is
a bit awkward, and there's a better way to approach the issue. Ultimately
the "issue" is how we can handle failures in things like the task lag
computation, which notably makes a remote call to the brokers and can/has
been known to fail at times. Right now if this API fails, the
StreamsPartitionAssignor will just return the same assignment as the
previous one and trigger an immediate followup rebalance. This exception
was meant to be a "utility" that can be thrown to indicate to the
StreamsPartitionAssignor to just return the old assignment and trigger an
immediate followup.
That said, this exception does feel like an awkward way to do it,
especially since the TaskAssignor can already do all of this via native
APIs: it can request a followup rebalance (and better yet, determine for
itself what a reasonable retry/backoff interval would be). It can also
just
return the same assignment -- the only issue is that implementing this is
kind of annoying. So I would propose that instead of throwing a
RetryableException, we should just add an additional utility method to
the
TaskAssignmentUtils that does this "fallback" assignment and returns the
same tasks to their previous clients.
Added TaskAssignmentUtils#identityAssignment for this (though I'm happy
to
take other name suggestions)

That's a long paragraph, sorry -- but hopefully it makes sense?

116: fair point. Updated the wording -- hopefully it makes more sense
now.

****117: This is an interesting question. I think it's worth looking at
each of the cases you listed individually (and thinking about other cases
we might want to allow/disallow):

117a. Task assigned to two different clients: invalid assignment (but
should we enforce this?)
Imo we should look to the analogous ConsumerPartitionAssignor for this
question, which does not do any verification of the basic "each partition
is assigned to exactly one consumer" requirement) In the end this is an
advanced feature, and we should trust users to correctly implement and
test
their custom assignors. So I would personally opt not to do any
post-assignment verification -- although I'm flexible on this if anyone
else feels strongly.

117b. Task not assigned to any client: valid assignment
Imo this should be allowed and could be used to implement some
interesting
features. For example let's say you wanted to introduce a "pause" or
checkpoint-like feature, wherein Streams will stop processing new records
from the input topics but will continue processing everything downstream
until all intermediary records are flushed, ie the repartition topics are
drained. One way to do this would be to trigger a rebalance and assign
only
downstream subtopologies, so that tasks which read from the input topics
aren't processed (until it's "restarted").

117c. Active and standby on the same KafkaStreams client: valid(?)
assignment
I'm not quite convinced either way on this, but think there could be edge
cases for which advanced users might want to have an active and standby
version of an in-memory store on the same client. Obviously this doesn't
make sense if you scale in/out by adding/removing entire KafkaStreams
clients, but those who scale up/down by adding/removing single
StreamThreads would benefit from relaxing this constraint.

117d. Unknown ProcessId: invalid assignment (should be enforced)
This one definitely sounds the most like a clear contract violation and
in
fact, would make it impossible for the StreamsPartitionAssignor to
correctly process the returned KafkaStreamsAssignment into a set of
individual consumer assignments. So we almost certainly need to throw an
exception here/call out an error here. Maybe we can use the #onAssignment
callback to return an error code as well? WDYT?

118. Agreed, right now it's a bit hard to tell which APIs are for the
user
to access, and which ones are for them to implement. Updated the KIP to
make this more clear by breaking up the classes into "User APIs" and
"Read-only APIs" (with a short description of each category).

119. Good point, added a public constructor for AssignedTask

120. Also good point, gave the #onAssignment callback a default no-op
implementation


On Wed, Apr 24, 2024 at 2:32 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Responding to Bruno first:

(1) I actually think "KafkaStreams" is exactly right here -- for the
reason you said, ultimately this is describing a literal instance of the
"KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went
with
StreamsClient but i also prefer KafkaStreams)

(4) Rohan is  right about what I was saying -- but I'm now realizing
that
I completely misinterpreted what your concern was. Sorry for the
long-winded and ultimately irrelevant answer. I'm completely fine with
having the return type be a simple Set with additional info such as
TaskId
in the AssignedTask class (and I see Rohan already made this change so
we're all good)

(5) I don't insist either way :)   ApplicationState works for me

On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax <mj...@apache.org>
wrote:

One more thing. It might be good to clearly call out, which interfaced
a
user would implement, vs the other ones Kafka Streams implements and
TaskAssignor only uses.

My understanding is, that users would implement `TaskAssignor`,
`TaskAssignment`, and `StreamsClientAssignment`.

For `AssignedTask` it seems that users would actually only need to
instantiate them. Should we add a public constructor?

Also wondering if we should add an empty default implementation for
`onAssignmentComputed()` as it seems not to be strictly necessary to
use
this method?


-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:
Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same
partition.assignor  prefix". What does this mean?


101 (nit) The KIP says: "Note that the thread-level assignment will
remain an un-configurable internal implementation detail of the
partition assignor (see "Rejected Alternatives" for further thoughts
and
reasoning)." -- When I was reading this the first time, I did not
understand it, and it did only become clear later (eg while reading
the
discussion thread). I think it would be good to be a little bit more
explicit, because this is not just some minor thing, but a core design
decision (which I, btw, support).


102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with
'public' :)


104: Method name `TaskAssignor#onAssignmentComputed()` -> the name
seems
to be a little bit clumsy? I kinda like the original
`finalAssignment()`
-- I would also be happy with `onFinalAssignment` to address Bruno's
line of thinking (which I think is a good call out). (Btw:
`finalAssignment` is still used in the text on the KIP and should also
be updated.)


105: Please remove all `private` variables. We should only show public
stuff on the KIP. Everything else is an implementation detail.


106: `TaskAssignment#numStreamsClients()` -- why do we need this
method?
Seems calling `assignment()` gives as a collection and we can just
call
size() on it to get the same value? -- Also, why do we explicitly call
out the overwrite of `toString()`; seems unnecessary?


107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
number of StreamThreads on this client, which is equal to the number
of
main consumers and represents its overall capacity." -- Given our
planned thread refactoring, this might not hold correct for long (and
I
am sure we will forget to updated the JavaDocs later). Talking to
Lucas
the plan is to cut down `StreamsThread` to host the consumer (and
there
will be only one, and it won't be configurable any longer), and we
would
introduce a number of configurable "processing threads". Can/should we
build this API in a forward looking manner?


108: Why do we need
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
this would be useful?


109 `StreamsClientState#consumers`: should we rename this to
`#consumerClientIds()`?


110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc
says 'owned by consumers on this node' -- Should we just say `owned by
the Streams client`?


111 `StreamsClientState#prevTasksByLag()`: it takes a `String
consumer`
parameter -- not clear what this is -- I guess it's a consumer's
client.id? If yes, should we rename the parameter `consumerClientId`?


112 `ApplicationState`: what is the reason to have `allTasks()` and
`stafefulTasks() -- why not have `statelessTasks()` and
`statefulTasks()` instead? Or all three?


113 `ApplicationState#computeTaskLags()`: I understand the
indent/reason
why we have this one, but it seems to be somewhat difficult to use
correctly, as it triggers an internal side-effect... Would it be
possible to replace this method in favor of passing in a `boolean
computeTaskLag` parameter into #streamClientState() instead, what
might
make it less error prone to use, as it seems the returned
`StreamsClient` object would be modified when calling
#computeTaskTags()
and thus both are related to each other?


114 nit/typo: `ApplicationState#streamsClientStates()` returns
`StreamsClientState` not `StreamsClient`.


115 `StreamsAssignorRetryableException`: not sure if I fully
understand
the purpose of this exception.


116 "No actual changes to functionality": allowing to plug in customer
TaskAssignor sounds like adding new functionality. Can we rephrase
this?



117: What happens if the returned assignment is "invalid" -- for
example, a task might not have been assigned, or is assigned to two
nodes? Or a standby is assigned to the same node as its active? Or a
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if
this list of potential issues is complete or not...)



-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:
Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I
prefer KafkaStreams* since that class represents the Kafka Streams
client. I am also fine with KafkaStreamsClient*. I really would like
to avoid introducing a new term in Kafka Streams for which we already
have an equivalent term even if it is used on the brokers since that
is a different level of abstraction. Additionally, I have never been
a
big fan of the term "instance".

(4)
I think the question is if we need to retrieve assignment metadata by
task for a Kafka client or if it is enough to iterate over the
assigned tasks. Could you explain why we cannot add additional
metadata to the class AssignedTask?
The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) )
could be something like

public interface NodeAssignment {
       ProcessID processId();

       Instant followupRebalanceDeadline();

       Set<AssignedTask> assignment();

       enum AssignedTaskType {
       STATELESS,
           STATEFUL,
           STANDBY
       }

       static class AssignedTask {
           AssignedTaskType type();
           TaskId id();

           ... other metadata needed in future
       }
}
If we need to retrieve assigned task by task ID, maybe it is better
to
add methods like assignedFor(TaskId) and not to expose the Map.

(5)
I am in favor of ApplicationState but I am also fine
ApplicationMetadata if you insist.

(6)
Is

void finalAssignment(GroupAssignment assignment, GroupSubscription
subscription);

kind of a callback? If yes, would it make sense to call it
onAssignmentComputed()?


(7)
What do you think of changing the TaskAssignmentUtils signatures to

public static TaskAssignment default*Assignment(final
ApplicationState
applicationState, final TaskAssignment taskAssignment, ...) {...}

to avoid to mutate the assignment in place?


Best,
Bruno

On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:
Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every
single one
of your points, though I am going to push back a bit on the first
one.

[1] The idea here is to help avoid some confusion around the
overloaded
term "client", which can mean either "an instance of Kafka Streams"
or
"a consumer/producer client". The problem is that the former applies
to
the entire Streams process and therefore should be interpreted as
"all
of the StreamThread on an instance" whereas the latter is typically
used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since
differentiating
between instance and thread level is critical to understanding and
properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in
the
Kafka Streams codebase, and that we usually do use "instance" when
we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm
convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too
wordy?
I honestly do still like Node personally, and don't see what's wrong
with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in
theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe
"Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually
following
a pattern that we used when refactoring the old PartitionAssignor
into
the new (at the time) Consumer PartitionAssignor interface. The idea
was
to wrap the return type to protect the assign method in case we ever
wanted
to add something to what was returned, such as metadata for the
entire
group. This way we could avoid a massively disruptive
deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for
this
in the ConsumerPartitionAssignor interface, and to this day we've
never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this
map
out.
I guess the question is just, can we imagine any case in which we
might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer
client
case,
but again, I'm totally fine with just flattening it to a plain map
return
type

[5] I guess not. I think ApplicationMetadata was added during the
initial
KIP discussion so that's probably why it doesn't follow the same
naming
pattern. Personally I'm fine either way (I do think
ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka
Streams
we do not really have the concept of node but we have the concept
of
client (admittedly, we sometimes also use instance). I would like
to
avoid introducing a new term to basically describe the Streams
client.
I know that we already have a ClientState but that would be in a
different package.

(2)
Did you consider to use Instant instead of long as return type of
followupRebalanceDeadline()? Instant is a bit more flexible and
readable
as a plain long, IMO. BTW, you list followupRebalanceDeadline()
twice in
interface NodeAssignment.

(3)
Did you consider to use an enum instead of class AssignedTask? As
far as
I understand not all combinations are possible. A stateless standby
task
does not exist. An enum with values STATELESS, STATEFUL, STANDBY
would
be clearer. Or even better instead of two methods in AssignedTask
that
return a boolean you could have one method -- say type() -- that
returns
the enum.

(4)
Does the return type of assignment need to be a map from task ID to
AssignedTask? Wouldn't it be enough to be a collection of
AssignedTasks
with AssignedTask containing the task ID?

(5)
I there a semantic difference between *State and *Metadata? I was
wondering whether ApplicationMetadata could also be
ApplicationState
for
the sake of consistency.

Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to
call
for
a vote?

However, since as noted we are setting aside time to discuss this
during
the sync next Thursday, we can also wait until after that meeting
to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <
desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition
assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful
personally. I
think its worth adding an interface that lets the plugin observe
the
final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP
with
that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <
desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:

1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the
motivation
behind
it, does our controller also want to change how the
partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method
as
it's
not needed by the task assignor.

2. Just on the API layering itself: it feels a bit weird to
have
the
three built-in functions (defaultStandbyTaskAssignment etc)
sitting in
the
ApplicationMetadata class. If we consider them as some default
util
functions, how about introducing moving those into their own
static
util
methods to separate from the ApplicationMetadata “fact objects”
?
Agreed. Updated in the latest revision of the kip. These have
been
moved
to TaskAssignorUtils

3. I personally prefer `NodeAssignment` to be a read-only
object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked
results
inside the assignor itself, maybe we can just be flexible to let
users
use
whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.

1. For the API, thoughts on changing the method signature to
return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just
have a
default implementation class that people can extend if they
don't
want
to
implement every method.
Based on some other discussion, I actually decided to get rid of
the
plugin interface, and instead use config to specify individual
plugin
behaviour. So the method you're referring to is no longer part
of
the
proposal.

3. Speaking of ApplicationMetadata, the javadoc says it's read
only
but
theres methods that return void on it? It's not totally clear to
me how
that interface is supposed to be used by the assignor. It'd be
nice if
we
could flip that interface such that it becomes part of the
output
instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility
methods
the assignor might want to call to do some default or optimized
assignment
for some cases like rack-awareness.

4. We should consider wrapping UUID in a ProcessID class so
that
we
control
the interface (there are a few places where UUID is directly
used).
I like it. Updated the proposal.

5. What does NodeState#newAssignmentForNode() do? I thought the
point
was
for the plugin to make the assignment? Is that the result of the
default
logic?
It doesn't need to be part of the interface. I've removed it.

re 2/6:

I generally agree with these points, but I'd rather hash that
out
in a
PR
than in the KIP review, as it'll be clearer what gets used how.
It
seems
to
me (committers please correct me if I'm wrong) that as long as
we're on
the
same page about what information the interfaces are returning,
that's
ok
at
this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
<desai.p.ro...@gmail.com>
wrote:

Hello All,

I'd like to start a discussion on KIP-924 (




https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
)
which proposes an interface to allow users to plug into the
streams
partition assignor. The motivation section in the KIP goes into
some
more
detail on why we think this is a useful addition. Thanks in
advance
for
your feedback!

Best Regards,

Rohan












Reply via email to