One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I created
one code for each of the invalid cases we described above. The downside is
that this means we'll have to go through a deprecation cycle if we want to
loosen up the restrictions on any of the enforced cases. The upside is that
we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by clearly
denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
this case by returning the same assignment to the consumers and scheduling
a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public Changes"
as well.

Please let me know if there are any concerns, or if you have suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> Thanks guys! I agree with what Lucas said about 117c, we can always loosen
> a restriction later and I don't want to do anything now that might get in
> the way of the new threading models.
>
> With that I think we're all in agreement on 117. I'll update the KIP to
> include what we've discussed
>
> (and will fix the remaining #finalAssignment mention as well, thanks
> Bruno. Glad to have such good proof readers! :P)
>
> On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna <cado...@apache.org> wrote:
>
>> Hi again,
>>
>> I forgot to ask whether you could add the agreement about handling
>> invalid assignment to the KIP.
>>
>> Best,
>> Bruno
>>
>> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
>> > Hi all,
>> >
>> > I think we are converging!
>> >
>> > 117
>> > a) fail: Since it is an invalid consumer assignment
>> > b) pass: I agree that not assigning a task might be reasonable in some
>> > situations
>> > c) fail: For the reasons Lucas pointed out. I am missing a good use
>> case
>> > here.
>> > d) fail: It is invalid
>> >
>> >
>> > Somewhere in the KIP you still use finalAssignment() instead of the
>> > wonderful method name onAssignmentComputed() ;-)
>> > "... interface also includes a method named finalAssignment which is
>> > called with the final computed GroupAssignment ..."
>> >
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> > On 4/30/24 1:04 PM, Lucas Brutschy wrote:
>> >> Hi,
>> >>
>> >> Looks like a great KIP to me!
>> >>
>> >> I'm late, so I'm only going to comment on the last open point 117. I'm
>> >> against any fallbacks like "use the default assignor if the custom
>> >> assignment is invalid", as it's just going to hide bugs. For the 4
>> >> cases mentioned by Sophie:
>> >>
>> >> 117a) I'd fail immediately here, as it's an implementation bug, and
>> >> should not lead to a valid consumer group assignment.
>> >> 117b) Agreed. This is a useful assignment and should be allowed.
>> >> 117c) This is the tricky case. However, I'm leaning towards not
>> >> allowing this, unless we have a concrete use case. This will block us
>> >> from potentially using a single consumer for active and standby tasks
>> >> in the future. It's easier to drop the restriction later if we have a
>> >> concrete use case.
>> >> 117d) Definitely fail immediately, as you said.
>> >>
>> >> Cheers,
>> >> Lucas
>> >>
>> >>
>> >>
>> >> On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
>> >> <sop...@responsive.dev> wrote:
>> >>>
>> >>> Yeah I think that sums it up well. Either you computed a *possible*
>> >>> assignment,
>> >>> or you returned something that makes it literally impossible for the
>> >>> StreamsPartitionAssignor to decipher/translate into an actual group
>> >>> assignment, in which case it should just fail
>> >>>
>> >>> That's more or less it for the open questions that have been raised
>> >>> so far,
>> >>> so I just want to remind folks that there's already a voting thread
>> for
>> >>> this. I cast my vote a few minutes ago so it should resurface in
>> >>> everyone's
>> >>> inbox :)
>> >>>
>> >>> On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai <desai.p.ro...@gmail.com
>> >
>> >>> wrote:
>> >>>
>> >>>> 117: as Sophie laid out, there are two cases here right:
>> >>>> 1. cases that are considered invalid by the existing assignors but
>> are
>> >>>> still valid assignments in the sense that they can be used to
>> >>>> generate a
>> >>>> valid consumer group assignment (from the perspective of the
>> >>>> consumer group
>> >>>> protocol). An assignment that excludes a task is one such example,
>> and
>> >>>> Sophie pointed out a good use case for it. I also think it makes
>> >>>> sense to
>> >>>> allow these. It's hard to predict how a user might want to use the
>> >>>> custom
>> >>>> assignor, and its reasonable to expect them to use it with care and
>> not
>> >>>> hand-hold them.
>> >>>> 2. cases that are not valid because it is impossible to compute a
>> valid
>> >>>> consumer group assignment from them. In this case it seems totally
>> >>>> reasonable to just throw a fatal exception that gets passed to the
>> >>>> uncaught
>> >>>> exception handler. If this case happens then there is some bug in the
>> >>>> user's assignor and its totally reasonable to fail the application
>> >>>> in that
>> >>>> case. We _could_ try to be more graceful and default to one of the
>> >>>> existing
>> >>>> assignors. But it's usually better to fail hard and fast when there
>> >>>> is some
>> >>>> illegal state detected imo.
>> >>>>
>> >>>> On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai <desai.p.ro...@gmail.com
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Bruno, I've incorporated your feedback into the KIP document.
>> >>>>>
>> >>>>> On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai <
>> desai.p.ro...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Thanks for the feedback Bruno! For the most part I think it makes
>> >>>>>> sense,
>> >>>>>> but leaving a couple follow-up thoughts/questions:
>> >>>>>>
>> >>>>>> re 4: I think Sophie's point was slightly different - that we
>> >>>>>> might want
>> >>>>>> to wrap the return type for `assign` in a class so that its easily
>> >>>>>> extensible. This makes sense to me. Whether we do that or not, we
>> can
>> >>>> have
>> >>>>>> the return type be a Set instead of a Map as well.
>> >>>>>>
>> >>>>>> re 6: Yes, it's a callback that's called with the final
>> assignment. I
>> >>>>>> like your suggested name.
>> >>>>>>
>> >>>>>> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai <
>> desai.p.ro...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Thanks for the feedback Sophie!
>> >>>>>>>
>> >>>>>>> re1: Totally agree. The fact that it's related to the partition
>> >>>> assignor
>> >>>>>>> is clear from just `task.assignor`. I'll update.
>> >>>>>>> re3: This is a good point, and something I would find useful
>> >>>> personally.
>> >>>>>>> I think its worth adding an interface that lets the plugin
>> >>>>>>> observe the
>> >>>>>>> final assignment. I'll add that.
>> >>>>>>> re4: I like the new `NodeAssignment` type. I'll update the KIP
>> with
>> >>>> that.
>> >>>>>>>
>> >>>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai
>> >>>>>>> <desai.p.ro...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the feedback so far! I think pretty much all of it is
>> >>>>>>>> reasonable. I'll reply to it inline:
>> >>>>>>>>
>> >>>>>>>>> 1. All the API logic is granular at the Task level, except the
>> >>>>>>>> previousOwnerForPartition func. I’m not clear what’s the
>> motivation
>> >>>>>>>> behind it, does our controller also want to change how the
>> >>>>>>>> partitions->tasks mapping is formed?
>> >>>>>>>> You're right that this is out of place. I've removed this method
>> as
>> >>>>>>>> it's not needed by the task assignor.
>> >>>>>>>>
>> >>>>>>>>> 2. Just on the API layering itself: it feels a bit weird to
>> >>>>>>>>> have the
>> >>>>>>>> three built-in functions (defaultStandbyTaskAssignment etc)
>> >>>>>>>> sitting in
>> >>>>>>>> the ApplicationMetadata class. If we consider them as some
>> default
>> >>>> util
>> >>>>>>>> functions, how about introducing moving those into their own
>> static
>> >>>> util
>> >>>>>>>> methods to separate from the ApplicationMetadata “fact objects” ?
>> >>>>>>>> Agreed. Updated in the latest revision of the kip. These have
>> been
>> >>>>>>>> moved to TaskAssignorUtils
>> >>>>>>>>
>> >>>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only object
>> >>>>>>>> containing the decisions made by the assignor, including the
>> >>>>>>>> requestFollowupRebalance flag. For manipulating the half-baked
>> >>>>>>>> results
>> >>>>>>>> inside the assignor itself, maybe we can just be flexible to let
>> >>>> users use
>> >>>>>>>> whatever struts / their own classes even, if they like. WDYT?
>> >>>>>>>> Agreed. Updated in the latest version of the kip.
>> >>>>>>>>
>> >>>>>>>>> 1. For the API, thoughts on changing the method signature to
>> >>>>>>>>> return
>> >>>> a
>> >>>>>>>> (non-Optional) TaskAssignor? Then we can either have the default
>> >>>>>>>> implementation return new HighAvailabilityTaskAssignor or just
>> >>>>>>>> have a
>> >>>>>>>> default implementation class that people can extend if they don't
>> >>>> want to
>> >>>>>>>> implement every method.
>> >>>>>>>> Based on some other discussion, I actually decided to get rid of
>> >>>>>>>> the
>> >>>>>>>> plugin interface, and instead use config to specify individual
>> >>>>>>>> plugin
>> >>>>>>>> behaviour. So the method you're referring to is no longer part
>> >>>>>>>> of the
>> >>>>>>>> proposal.
>> >>>>>>>>
>> >>>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read
>> >>>>>>>>> only
>> >>>>>>>> but
>> >>>>>>>> theres methods that return void on it? It's not totally clear to
>> me
>> >>>> how
>> >>>>>>>> that interface is supposed to be used by the assignor. It'd be
>> nice
>> >>>> if we
>> >>>>>>>> could flip that interface such that it becomes part of the output
>> >>>> instead
>> >>>>>>>> of an input to the plugin.
>> >>>>>>>> I've moved those methods to a util class. They're really utility
>> >>>>>>>> methods the assignor might want to call to do some default or
>> >>>> optimized
>> >>>>>>>> assignment for some cases like rack-awareness.
>> >>>>>>>>
>> >>>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so
>> >>>>>>>>> that we
>> >>>>>>>> control
>> >>>>>>>> the interface (there are a few places where UUID is directly
>> used).
>> >>>>>>>> I like it. Updated the proposal.
>> >>>>>>>>
>> >>>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I thought the
>> >>>>>>>> point was
>> >>>>>>>> for the plugin to make the assignment? Is that the result of the
>> >>>>>>>> default logic?
>> >>>>>>>> It doesn't need to be part of the interface. I've removed it.
>> >>>>>>>>
>> >>>>>>>>> re 2/6:
>> >>>>>>>>
>> >>>>>>>> I generally agree with these points, but I'd rather hash that
>> >>>>>>>> out in a
>> >>>>>>>> PR than in the KIP review, as it'll be clearer what gets used
>> >>>>>>>> how. It
>> >>>> seems
>> >>>>>>>> to me (committers please correct me if I'm wrong) that as long as
>> >>>> we're on
>> >>>>>>>> the same page about what information the interfaces are
>> returning,
>> >>>> that's
>> >>>>>>>> ok at this level of discussion.
>> >>>>>>>>
>> >>>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
>> >>>>>>>> <desai.p.ro...@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hello All,
>> >>>>>>>>>
>> >>>>>>>>> I'd like to start a discussion on KIP-924 (
>> >>>>>>>>>
>> >>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
>> >>>> )
>> >>>>>>>>> which proposes an interface to allow users to plug into the
>> >>>>>>>>> streams
>> >>>>>>>>> partition assignor. The motivation section in the KIP goes into
>> >>>>>>>>> some
>> >>>> more
>> >>>>>>>>> detail on why we think this is a useful addition. Thanks in
>> >>>>>>>>> advance
>> >>>> for
>> >>>>>>>>> your feedback!
>> >>>>>>>>>
>> >>>>>>>>> Best Regards,
>> >>>>>>>>>
>> >>>>>>>>> Rohan
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>
>>
>

Reply via email to