Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether.
Any objections?

-----

-Thanks, fixed the indentation of headers under "User APIs" and "Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return type
being a Map<ProcessId, KafkaStreamsAssignment> rather than a TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map<ProcessId,
KafkaStreamsAssignment>. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> Matthias:
>
> Thanks for the naming suggestions for the error codes. I was
> definitely not happy with my original naming but couldn't think of anything
> better.  I like your proposals though, will update the KIP names. I'll also
> add a "NONE" option as well -- much better than just passing in null for no
> error.
>
> > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>> same active task
>
>  Would also be an error if assigned to two consumers of the same client...
>> Needs to be rephrased.
>
>
> Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
> responsible for the assignment of tasks to consumers within a KafkaStreams.
> It would be a bug in the StreamsPartitionAssignor if it received a valid
> assignment from the TaskAssignor with only one copy of a task assigned to a
> single KAfkaStreams client, and then somehow ended up assigning that task
> to multiple consumers on the KafkaStreams client. It wouldn't be the
> TaskAssignor's fault so imo it would not make sense to include this case in
> the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
> ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
> the StreamsPartitionAssignor to assign a task to multiple consumers, it
> presumably wouldn't even notice since it's a bug -- if it did notice, it
> can just fix the issue. The error codes are about communicating unfixable
> issues due to the TaskAssignor itself returning an invalid assignment. The
> phrasing is intentional, and (imo) correct as it is.
>
> I do see your point about how the StreamsPartitionAssignor should
> handle/react to invalid assignments though. I'm fine with just throwing a
> StreamsException and crashing after we invoke the #onAssignmentComputed
> callback to notify the user of the error.
>
> On Wed, May 1, 2024 at 9:46 AM Guozhang Wang <guozhang.wang...@gmail.com>
> wrote:
>
>> Jumping back to the party here :)
>>
>> 107: I agree with the rationale behind this, and
>> `numProcessingThreads` looks good to me as it covers both the current
>> and future scenarios.
>>
>> 117: I agree with Lucas and Bruno, and would add:
>>   * 117e: unknown taskID: fail
>>   * 117f: inconsistent task types (e.g. a known taskID was indicated
>> stateless from ApplicationState, but the returned AssignedTask states
>> stateful): fail
>>   * 117g: some ProcessID was not included in the returned Set: pass,
>> and interprets it as no tasks assigned to it.
>>
>> And I'm open for any creative error codes folks would come up with :)
>>
>> > If any of these errors are detected, the StreamsPartitionAssignor will
>> immediately "fail" the rebalance and retry it by scheduling an immediate
>> followup rebalance.
>>
>> I'm also a bit concerned here, as such endless retry loops have
>> happened in the past in my memory. Given that we would likely see most
>> of the user implementations be deterministic, I'm also leaning towards
>> failing the app immediately and let the crowd educates us if there are
>> some very interesting scenarios out there that are not on our radar to
>> re-consider this, rather than getting hard to debug cases in the dark.
>>
>> -----
>>
>> And here are just some nits about the KIP writings itself:
>>
>> * I think some bullet points under `User APIs` and `Read-only APIs`
>> should have a lower level indention? It caught me for a sec until I
>> realized there are just two categories.
>>
>> * In TaskAssignmentUtils , why not let those util functions return
>> `TaskAssignment` (to me it feels more consistent with the user APIs),
>> but instead return a Map<ProcessID, KafkaStreamsAssignment>?
>>
>>
>> Guozhang
>>
>> On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax <mj...@apache.org> wrote:
>> >
>> > I like the idea of error codes. Not sure if the name are ideal?
>> > UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
>> > difficult to understand?
>> >
>> > Should we be very descriptive (and also try to avoid coupling it to the
>> > threading model -- important for the first error code):
>> >   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
>> >   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE
>> >
>> > I think we also need to add NONE as option or make the error parameter
>> > an `Optional`?
>> >
>> >
>> > > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>> same active task
>> >
>> > Would also be an error if assigned to two consumers of the same
>> > client... Needs to be rephrased.
>> >
>> >
>> >
>> > > If any of these errors are detected, the StreamsPartitionAssignor
>> will immediately "fail" the rebalance and retry it by scheduling an
>> immediate followup rebalance.
>> >
>> > Does this make sense? If we assume that the task-assignment is
>> > deterministic, we would end up with an infinite retry loop? Also,
>> > assuming that an client leave the group, we cannot assign some task any
>> > longer... I would rather throw a StreamsException and let the client
>> crash.
>> >
>> >
>> >
>> > -Matthias
>> >
>> > On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
>> > > One last thing: I added an error code enum to be returned from the
>> > > #onAssignmentComputed method in case of an invalid assignment. I
>> created
>> > > one code for each of the invalid cases we described above. The
>> downside is
>> > > that this means we'll have to go through a deprecation cycle if we
>> want to
>> > > loosen up the restrictions on any of the enforced cases. The upside
>> is that
>> > > we can very clearly mark what is an invalid assignment and this will
>> > > (hopefully) assist users who are new to customizing assignments by
>> clearly
>> > > denoting the requirements, and returning a clear error if they are not
>> > > followed.
>> > >
>> > > Of course the StreamsPartitionAssignor will also do a "fallback &
>> retry" in
>> > > this case by returning the same assignment to the consumers and
>> scheduling
>> > > a followup rebalance. I've added all of this to the TaskAssignor  and
>> > > #onAssignmentComputed javadocs, and added a section under "Public
>> Changes"
>> > > as well.
>> > >
>> > > Please let me know if there are any concerns, or if you have
>> suggestions
>> > > for how else we can handle an invalid assignment
>> > >
>> > > On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <
>> sop...@responsive.dev>
>> > > wrote:
>> > >
>> > >> Thanks guys! I agree with what Lucas said about 117c, we can always
>> loosen
>> > >> a restriction later and I don't want to do anything now that might
>> get in
>> > >> the way of the new threading models.
>> > >>
>> > >> With that I think we're all in agreement on 117. I'll update the KIP
>> to
>> > >> include what we've discussed
>> > >>
>> > >> (and will fix the remaining #finalAssignment mention as well, thanks
>> > >> Bruno. Glad to have such good proof readers! :P)
>> > >>
>> > >> On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna <cado...@apache.org>
>> wrote:
>> > >>
>> > >>> Hi again,
>> > >>>
>> > >>> I forgot to ask whether you could add the agreement about handling
>> > >>> invalid assignment to the KIP.
>> > >>>
>> > >>> Best,
>> > >>> Bruno
>> > >>>
>> > >>> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
>> > >>>> Hi all,
>> > >>>>
>> > >>>> I think we are converging!
>> > >>>>
>> > >>>> 117
>> > >>>> a) fail: Since it is an invalid consumer assignment
>> > >>>> b) pass: I agree that not assigning a task might be reasonable in
>> some
>> > >>>> situations
>> > >>>> c) fail: For the reasons Lucas pointed out. I am missing a good use
>> > >>> case
>> > >>>> here.
>> > >>>> d) fail: It is invalid
>> > >>>>
>> > >>>>
>> > >>>> Somewhere in the KIP you still use finalAssignment() instead of the
>> > >>>> wonderful method name onAssignmentComputed() ;-)
>> > >>>> "... interface also includes a method named finalAssignment which
>> is
>> > >>>> called with the final computed GroupAssignment ..."
>> > >>>>
>> > >>>>
>> > >>>> Best,
>> > >>>> Bruno
>> > >>>>
>> > >>>>
>> > >>>> On 4/30/24 1:04 PM, Lucas Brutschy wrote:
>> > >>>>> Hi,
>> > >>>>>
>> > >>>>> Looks like a great KIP to me!
>> > >>>>>
>> > >>>>> I'm late, so I'm only going to comment on the last open point
>> 117. I'm
>> > >>>>> against any fallbacks like "use the default assignor if the custom
>> > >>>>> assignment is invalid", as it's just going to hide bugs. For the 4
>> > >>>>> cases mentioned by Sophie:
>> > >>>>>
>> > >>>>> 117a) I'd fail immediately here, as it's an implementation bug,
>> and
>> > >>>>> should not lead to a valid consumer group assignment.
>> > >>>>> 117b) Agreed. This is a useful assignment and should be allowed.
>> > >>>>> 117c) This is the tricky case. However, I'm leaning towards not
>> > >>>>> allowing this, unless we have a concrete use case. This will
>> block us
>> > >>>>> from potentially using a single consumer for active and standby
>> tasks
>> > >>>>> in the future. It's easier to drop the restriction later if we
>> have a
>> > >>>>> concrete use case.
>> > >>>>> 117d) Definitely fail immediately, as you said.
>> > >>>>>
>> > >>>>> Cheers,
>> > >>>>> Lucas
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
>> > >>>>> <sop...@responsive.dev> wrote:
>> > >>>>>>
>> > >>>>>> Yeah I think that sums it up well. Either you computed a
>> *possible*
>> > >>>>>> assignment,
>> > >>>>>> or you returned something that makes it literally impossible for
>> the
>> > >>>>>> StreamsPartitionAssignor to decipher/translate into an actual
>> group
>> > >>>>>> assignment, in which case it should just fail
>> > >>>>>>
>> > >>>>>> That's more or less it for the open questions that have been
>> raised
>> > >>>>>> so far,
>> > >>>>>> so I just want to remind folks that there's already a voting
>> thread
>> > >>> for
>> > >>>>>> this. I cast my vote a few minutes ago so it should resurface in
>> > >>>>>> everyone's
>> > >>>>>> inbox :)
>> > >>>>>>
>> > >>>>>> On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai <
>> desai.p.ro...@gmail.com
>> > >>>>
>> > >>>>>> wrote:
>> > >>>>>>
>> > >>>>>>> 117: as Sophie laid out, there are two cases here right:
>> > >>>>>>> 1. cases that are considered invalid by the existing assignors
>> but
>> > >>> are
>> > >>>>>>> still valid assignments in the sense that they can be used to
>> > >>>>>>> generate a
>> > >>>>>>> valid consumer group assignment (from the perspective of the
>> > >>>>>>> consumer group
>> > >>>>>>> protocol). An assignment that excludes a task is one such
>> example,
>> > >>> and
>> > >>>>>>> Sophie pointed out a good use case for it. I also think it makes
>> > >>>>>>> sense to
>> > >>>>>>> allow these. It's hard to predict how a user might want to use
>> the
>> > >>>>>>> custom
>> > >>>>>>> assignor, and its reasonable to expect them to use it with care
>> and
>> > >>> not
>> > >>>>>>> hand-hold them.
>> > >>>>>>> 2. cases that are not valid because it is impossible to compute
>> a
>> > >>> valid
>> > >>>>>>> consumer group assignment from them. In this case it seems
>> totally
>> > >>>>>>> reasonable to just throw a fatal exception that gets passed to
>> the
>> > >>>>>>> uncaught
>> > >>>>>>> exception handler. If this case happens then there is some bug
>> in the
>> > >>>>>>> user's assignor and its totally reasonable to fail the
>> application
>> > >>>>>>> in that
>> > >>>>>>> case. We _could_ try to be more graceful and default to one of
>> the
>> > >>>>>>> existing
>> > >>>>>>> assignors. But it's usually better to fail hard and fast when
>> there
>> > >>>>>>> is some
>> > >>>>>>> illegal state detected imo.
>> > >>>>>>>
>> > >>>>>>> On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai <
>> desai.p.ro...@gmail.com
>> > >>>>
>> > >>>>>>> wrote:
>> > >>>>>>>
>> > >>>>>>>> Bruno, I've incorporated your feedback into the KIP document.
>> > >>>>>>>>
>> > >>>>>>>> On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai <
>> > >>> desai.p.ro...@gmail.com>
>> > >>>>>>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> Thanks for the feedback Bruno! For the most part I think it
>> makes
>> > >>>>>>>>> sense,
>> > >>>>>>>>> but leaving a couple follow-up thoughts/questions:
>> > >>>>>>>>>
>> > >>>>>>>>> re 4: I think Sophie's point was slightly different - that we
>> > >>>>>>>>> might want
>> > >>>>>>>>> to wrap the return type for `assign` in a class so that its
>> easily
>> > >>>>>>>>> extensible. This makes sense to me. Whether we do that or
>> not, we
>> > >>> can
>> > >>>>>>> have
>> > >>>>>>>>> the return type be a Set instead of a Map as well.
>> > >>>>>>>>>
>> > >>>>>>>>> re 6: Yes, it's a callback that's called with the final
>> > >>> assignment. I
>> > >>>>>>>>> like your suggested name.
>> > >>>>>>>>>
>> > >>>>>>>>> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai <
>> > >>> desai.p.ro...@gmail.com>
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>>> Thanks for the feedback Sophie!
>> > >>>>>>>>>>
>> > >>>>>>>>>> re1: Totally agree. The fact that it's related to the
>> partition
>> > >>>>>>> assignor
>> > >>>>>>>>>> is clear from just `task.assignor`. I'll update.
>> > >>>>>>>>>> re3: This is a good point, and something I would find useful
>> > >>>>>>> personally.
>> > >>>>>>>>>> I think its worth adding an interface that lets the plugin
>> > >>>>>>>>>> observe the
>> > >>>>>>>>>> final assignment. I'll add that.
>> > >>>>>>>>>> re4: I like the new `NodeAssignment` type. I'll update the
>> KIP
>> > >>> with
>> > >>>>>>> that.
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai
>> > >>>>>>>>>> <desai.p.ro...@gmail.com>
>> > >>>>>>>>>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Thanks for the feedback so far! I think pretty much all of
>> it is
>> > >>>>>>>>>>> reasonable. I'll reply to it inline:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 1. All the API logic is granular at the Task level, except
>> the
>> > >>>>>>>>>>> previousOwnerForPartition func. I’m not clear what’s the
>> > >>> motivation
>> > >>>>>>>>>>> behind it, does our controller also want to change how the
>> > >>>>>>>>>>> partitions->tasks mapping is formed?
>> > >>>>>>>>>>> You're right that this is out of place. I've removed this
>> method
>> > >>> as
>> > >>>>>>>>>>> it's not needed by the task assignor.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 2. Just on the API layering itself: it feels a bit weird to
>> > >>>>>>>>>>>> have the
>> > >>>>>>>>>>> three built-in functions (defaultStandbyTaskAssignment etc)
>> > >>>>>>>>>>> sitting in
>> > >>>>>>>>>>> the ApplicationMetadata class. If we consider them as some
>> > >>> default
>> > >>>>>>> util
>> > >>>>>>>>>>> functions, how about introducing moving those into their own
>> > >>> static
>> > >>>>>>> util
>> > >>>>>>>>>>> methods to separate from the ApplicationMetadata “fact
>> objects” ?
>> > >>>>>>>>>>> Agreed. Updated in the latest revision of the kip. These
>> have
>> > >>> been
>> > >>>>>>>>>>> moved to TaskAssignorUtils
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only
>> object
>> > >>>>>>>>>>> containing the decisions made by the assignor, including the
>> > >>>>>>>>>>> requestFollowupRebalance flag. For manipulating the
>> half-baked
>> > >>>>>>>>>>> results
>> > >>>>>>>>>>> inside the assignor itself, maybe we can just be flexible
>> to let
>> > >>>>>>> users use
>> > >>>>>>>>>>> whatever struts / their own classes even, if they like.
>> WDYT?
>> > >>>>>>>>>>> Agreed. Updated in the latest version of the kip.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 1. For the API, thoughts on changing the method signature
>> to
>> > >>>>>>>>>>>> return
>> > >>>>>>> a
>> > >>>>>>>>>>> (non-Optional) TaskAssignor? Then we can either have the
>> default
>> > >>>>>>>>>>> implementation return new HighAvailabilityTaskAssignor or
>> just
>> > >>>>>>>>>>> have a
>> > >>>>>>>>>>> default implementation class that people can extend if they
>> don't
>> > >>>>>>> want to
>> > >>>>>>>>>>> implement every method.
>> > >>>>>>>>>>> Based on some other discussion, I actually decided to get
>> rid of
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>> plugin interface, and instead use config to specify
>> individual
>> > >>>>>>>>>>> plugin
>> > >>>>>>>>>>> behaviour. So the method you're referring to is no longer
>> part
>> > >>>>>>>>>>> of the
>> > >>>>>>>>>>> proposal.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's
>> read
>> > >>>>>>>>>>>> only
>> > >>>>>>>>>>> but
>> > >>>>>>>>>>> theres methods that return void on it? It's not totally
>> clear to
>> > >>> me
>> > >>>>>>> how
>> > >>>>>>>>>>> that interface is supposed to be used by the assignor. It'd
>> be
>> > >>> nice
>> > >>>>>>> if we
>> > >>>>>>>>>>> could flip that interface such that it becomes part of the
>> output
>> > >>>>>>> instead
>> > >>>>>>>>>>> of an input to the plugin.
>> > >>>>>>>>>>> I've moved those methods to a util class. They're really
>> utility
>> > >>>>>>>>>>> methods the assignor might want to call to do some default
>> or
>> > >>>>>>> optimized
>> > >>>>>>>>>>> assignment for some cases like rack-awareness.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so
>> > >>>>>>>>>>>> that we
>> > >>>>>>>>>>> control
>> > >>>>>>>>>>> the interface (there are a few places where UUID is directly
>> > >>> used).
>> > >>>>>>>>>>> I like it. Updated the proposal.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I
>> thought the
>> > >>>>>>>>>>> point was
>> > >>>>>>>>>>> for the plugin to make the assignment? Is that the result
>> of the
>> > >>>>>>>>>>> default logic?
>> > >>>>>>>>>>> It doesn't need to be part of the interface. I've removed
>> it.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> re 2/6:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> I generally agree with these points, but I'd rather hash
>> that
>> > >>>>>>>>>>> out in a
>> > >>>>>>>>>>> PR than in the KIP review, as it'll be clearer what gets
>> used
>> > >>>>>>>>>>> how. It
>> > >>>>>>> seems
>> > >>>>>>>>>>> to me (committers please correct me if I'm wrong) that as
>> long as
>> > >>>>>>> we're on
>> > >>>>>>>>>>> the same page about what information the interfaces are
>> > >>> returning,
>> > >>>>>>> that's
>> > >>>>>>>>>>> ok at this level of discussion.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
>> > >>>>>>>>>>> <desai.p.ro...@gmail.com>
>> > >>>>>>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> Hello All,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> I'd like to start a discussion on KIP-924 (
>> > >>>>>>>>>>>>
>> > >>>>>>>
>> > >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
>> > >>>>>>> )
>> > >>>>>>>>>>>> which proposes an interface to allow users to plug into the
>> > >>>>>>>>>>>> streams
>> > >>>>>>>>>>>> partition assignor. The motivation section in the KIP goes
>> into
>> > >>>>>>>>>>>> some
>> > >>>>>>> more
>> > >>>>>>>>>>>> detail on why we think this is a useful addition. Thanks in
>> > >>>>>>>>>>>> advance
>> > >>>>>>> for
>> > >>>>>>>>>>>> your feedback!
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Best Regards,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Rohan
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>
>> > >>>
>> > >>
>> > >
>>
>

Reply via email to