Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-15 Thread Sophie Blee-Goldman
Thanks Bruno!

First, need to make one quick fix to what I said in the previous email --
the new rackId() getter will be added to KafkaStreamsState, not
KafkaStreamsApplication (The KIP is correct, but what I put in the email
was not)

U1. I would actually prefer to keep the constructors as is, for reasons I
realize I forgot to mention. Let me know if this makes sense to you or you
would still prefer to break up the constructors anyways:

The KafkaStreamsApplication class has two required parameters and one
optional one. The required params are of course the processId and
assignment so imo it would not make sense to break these up across two
different constructors, since both have to be supplied. The
followupRebalanceDeadline on the other hand is purely optional, which is
why that one is in a separate, non-static constructor

Re: for vs of, unfortunately for is a protected keyword in java. I'm open
to other naming suggestions though. I actually personally prefer the more
direct naming style, eg something like #ofProcessIdAndAssignment (or
#forProcessIdAndAssignment if you'd prefer?) But we tend to trends towards
the shorter fluent naming style and others have mentioned a preference for
names like "#of" in the past. I'm happy with any name (any name that's
allowed in java, that is :P)

U3: This is a very good point, and in fact the existence of TaskMetadata
was why I went with TaskInfo here. I'm not super happy with it but at least
this TaskInfo is pretty well encapsulated in the assignment code and
generally won't be mixed up with the TaskMetadata/Task/etc part of the
code. If anyone would like to submit a KIP to clean all this up at some
point, I would definitely be supportive!

On Wed, May 15, 2024 at 2:44 AM Bruno Cadonna  wrote:

> Hi,
>
> Thank you for the updates!
>
> I have a couple of comments.
>
> U1
> Yeah, that makes sense to me. However, could we break out the assignment
> part to make it more readable? Something like:
>
>
> KafkaStreamsAssignment.for(processId).withAssignment(assignment).withFollowRebalance(rebalanceDeadline)
>
> nits:
> U1.a I slightly prefer "for" compared to "of", but I leave the decision
> to others.
> U1.b An alternative to "withAssignment()" could be simply "with()".
>
>
> U3
> Yeah, I like the TaskInfo approach. However, task metadata interfaces
> start to proliferate a bit too much in our code base. We have
> TaskMetadata, TaskInfo, and finally Task that provide similar methods. I
> think we should try to consolidate those interfaces. Does not need to
> happen in this KIP, but we should consider it.
>
> Best,
> Bruno
>
>
> On 5/15/24 6:01 AM, Sophie Blee-Goldman wrote:
> > Hey all,
> >
> > I have a few updates to mention that have come up during the
> implementation
> > phase. The KIP should reflect the latest proposal and what has been
> merged
> > so far, but I'll list everything again here so that you don't have to
> sift
> > through the entire KIP:
> >
> > U1: The KafkaStreamsAssignment interface was converted to a class, and
> two
> > public constructors were added in keeping with the fluent API used
> > elsewhere in Streams:
> >
> >
> >> public  class KafkaStreamsAssignment {
> >
> >  public static KafkaStreamsAssignment of(final ProcessId processId,
> >> final Set assignment);
> >>
> >>  public KafkaStreamsAssignment withFollowupRebalance(final Instant
> >> rebalanceDeadline);
> >
> >   }
> >
> >
> > U2: Any lag-related APIs in the KafkaStreamsState interface will throw an
> > UnsupportedOperationException if the user opted out of computing the task
> > lags when getting the KafkaStreamsState
> >
> > U3: While refactoring the RackAwareTaskAssignor, we realized the current
> > proposal was missing the requisite rack id information. We will need to
> add
> > both the per-client rackId to the KafkaStreamsState, as well as the
> > per-task rack ids of all replicas hosting the topic partitions for that
> > task. The former is straightforward and leads to this new method on the
> > KafkaStreamsState interface:
> >
> > interface KafkaStreamsAssignment {
> >
> >  Optional rackId();
> >
> >   }
> >
> >
> > For the latter issue, we need to add the per-partition rack ids to the
> > ApplicationState interface, but the exact API is a bit less
> straightforward
> > since we don't currently have any concept of a partition in the proposed
> > API, instead dealing only with tasks.
> >
> > option 1: https://github.com/apache/kafka/pull/15960
> > The easiest way to address this would be to add a single method to the
> > ApplicationState interface returning a map from TaskId to the rack ids
> for
> > its partitions. To avoid a nasty multi-layered nested data structure,
> we'd
> > also introduce a simple container for the partition to rack ids map, with
> > separate maps for input topics vs changelogs (since the
> > RackAwareTaskAssignor needs the ability to differentiate these, and so
> > would the new rack-aware assignment utility methods). See the short
> example
> > 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-15 Thread Bruno Cadonna

Hi,

Thank you for the updates!

I have a couple of comments.

U1
Yeah, that makes sense to me. However, could we break out the assignment 
part to make it more readable? Something like:


KafkaStreamsAssignment.for(processId).withAssignment(assignment).withFollowRebalance(rebalanceDeadline)

nits:
U1.a I slightly prefer "for" compared to "of", but I leave the decision 
to others.

U1.b An alternative to "withAssignment()" could be simply "with()".


U3
Yeah, I like the TaskInfo approach. However, task metadata interfaces 
start to proliferate a bit too much in our code base. We have 
TaskMetadata, TaskInfo, and finally Task that provide similar methods. I 
think we should try to consolidate those interfaces. Does not need to 
happen in this KIP, but we should consider it.


Best,
Bruno


On 5/15/24 6:01 AM, Sophie Blee-Goldman wrote:

Hey all,

I have a few updates to mention that have come up during the implementation
phase. The KIP should reflect the latest proposal and what has been merged
so far, but I'll list everything again here so that you don't have to sift
through the entire KIP:

U1: The KafkaStreamsAssignment interface was converted to a class, and two
public constructors were added in keeping with the fluent API used
elsewhere in Streams:



public  class KafkaStreamsAssignment {


 public static KafkaStreamsAssignment of(final ProcessId processId,

final Set assignment);

 public KafkaStreamsAssignment withFollowupRebalance(final Instant
rebalanceDeadline);


  }


U2: Any lag-related APIs in the KafkaStreamsState interface will throw an
UnsupportedOperationException if the user opted out of computing the task
lags when getting the KafkaStreamsState

U3: While refactoring the RackAwareTaskAssignor, we realized the current
proposal was missing the requisite rack id information. We will need to add
both the per-client rackId to the KafkaStreamsState, as well as the
per-task rack ids of all replicas hosting the topic partitions for that
task. The former is straightforward and leads to this new method on the
KafkaStreamsState interface:

interface KafkaStreamsAssignment {

 Optional rackId();

  }


For the latter issue, we need to add the per-partition rack ids to the
ApplicationState interface, but the exact API is a bit less straightforward
since we don't currently have any concept of a partition in the proposed
API, instead dealing only with tasks.

option 1: https://github.com/apache/kafka/pull/15960
The easiest way to address this would be to add a single method to the
ApplicationState interface returning a map from TaskId to the rack ids for
its partitions. To avoid a nasty multi-layered nested data structure, we'd
also introduce a simple container for the partition to rack ids map, with
separate maps for input topics vs changelogs (since the
RackAwareTaskAssignor needs the ability to differentiate these, and so
would the new rack-aware assignment utility methods). See the short example
PR linked to above for the complete API being proposed in this option.

option 2: https://github.com/apache/kafka/pull/15959
While it is clear that tasks are the right level of abstraction for the
TaskAssignor on the whole, it could be argued that the topic partition
information might be valuable to a more sophisticated assignor. So another
option would be to go all-in and create a new metadata class for each task
that exposes essential and useful information: eg the set of input
partitions and changelog partitions belonging to each task and the mapping
of partition to rackIds, and perhaps also whether it is stateful and the
names of any state stores for that TaskId. This would also allow us to
simplify the ApplicationState interface to return just a single set of
tasks with all metadata encapsulated in the task, rather than having to
offer a separate API for stateful vs stateless tasks to differentiate the
two. See the example PR for the full proposal and changes to the existing
API

I personally am slightly in favor of option #2 (pull/15959
) as I believe including
general task metadata may be useful and this API would be easy to evolve if
we wanted to add anything else in a future KIP. The current KIP was updated
using this option, although nothing related to the rack ids has been merged
yet. We're happy to defer to anyone with a strong preference for either of
these options, or a new suggestion of their own.

As always, let us know if you have any questions or concerns or feedback of
any kind.

Thanks!


On Mon, May 6, 2024 at 1:33 PM Sophie Blee-Goldman 
wrote:


Thanks guys. Updated the error codes in both the code and the explanation
under "Public Changes". To sum up, here are the error codes listed in the
KIP:

enum AssignmentError {
 NONE,
 ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
 ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
 INVALID_STANDBY_TASK,
 UNKNOWN_PROCESS_ID,
 UNKNOWN_TASK_ID
}

Anything missing?


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-14 Thread Sophie Blee-Goldman
Hey all,

I have a few updates to mention that have come up during the implementation
phase. The KIP should reflect the latest proposal and what has been merged
so far, but I'll list everything again here so that you don't have to sift
through the entire KIP:

U1: The KafkaStreamsAssignment interface was converted to a class, and two
public constructors were added in keeping with the fluent API used
elsewhere in Streams:


> public  class KafkaStreamsAssignment {

public static KafkaStreamsAssignment of(final ProcessId processId,
> final Set assignment);
>
> public KafkaStreamsAssignment withFollowupRebalance(final Instant
> rebalanceDeadline);

 }


U2: Any lag-related APIs in the KafkaStreamsState interface will throw an
UnsupportedOperationException if the user opted out of computing the task
lags when getting the KafkaStreamsState

U3: While refactoring the RackAwareTaskAssignor, we realized the current
proposal was missing the requisite rack id information. We will need to add
both the per-client rackId to the KafkaStreamsState, as well as the
per-task rack ids of all replicas hosting the topic partitions for that
task. The former is straightforward and leads to this new method on the
KafkaStreamsState interface:

interface KafkaStreamsAssignment {

Optional rackId();

 }


For the latter issue, we need to add the per-partition rack ids to the
ApplicationState interface, but the exact API is a bit less straightforward
since we don't currently have any concept of a partition in the proposed
API, instead dealing only with tasks.

option 1: https://github.com/apache/kafka/pull/15960
The easiest way to address this would be to add a single method to the
ApplicationState interface returning a map from TaskId to the rack ids for
its partitions. To avoid a nasty multi-layered nested data structure, we'd
also introduce a simple container for the partition to rack ids map, with
separate maps for input topics vs changelogs (since the
RackAwareTaskAssignor needs the ability to differentiate these, and so
would the new rack-aware assignment utility methods). See the short example
PR linked to above for the complete API being proposed in this option.

option 2: https://github.com/apache/kafka/pull/15959
While it is clear that tasks are the right level of abstraction for the
TaskAssignor on the whole, it could be argued that the topic partition
information might be valuable to a more sophisticated assignor. So another
option would be to go all-in and create a new metadata class for each task
that exposes essential and useful information: eg the set of input
partitions and changelog partitions belonging to each task and the mapping
of partition to rackIds, and perhaps also whether it is stateful and the
names of any state stores for that TaskId. This would also allow us to
simplify the ApplicationState interface to return just a single set of
tasks with all metadata encapsulated in the task, rather than having to
offer a separate API for stateful vs stateless tasks to differentiate the
two. See the example PR for the full proposal and changes to the existing
API

I personally am slightly in favor of option #2 (pull/15959
) as I believe including
general task metadata may be useful and this API would be easy to evolve if
we wanted to add anything else in a future KIP. The current KIP was updated
using this option, although nothing related to the rack ids has been merged
yet. We're happy to defer to anyone with a strong preference for either of
these options, or a new suggestion of their own.

As always, let us know if you have any questions or concerns or feedback of
any kind.

Thanks!


On Mon, May 6, 2024 at 1:33 PM Sophie Blee-Goldman 
wrote:

> Thanks guys. Updated the error codes in both the code and the explanation
> under "Public Changes". To sum up, here are the error codes listed in the
> KIP:
>
> enum AssignmentError {
> NONE,
> ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
> ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
> INVALID_STANDBY_TASK,
> UNKNOWN_PROCESS_ID,
> UNKNOWN_TASK_ID
> }
>
> Anything missing?
>
> (also updated all the code block headings, thanks for noticing that Bruno)
>
> On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax  wrote:
>
>> 117f: Good point by Bruno. We should check for this, and could have an
>> additional `INVALID_STANDBY_TASK` error code?
>>
>>
>> -Matthias
>>
>> On 5/3/24 5:52 AM, Guozhang Wang wrote:
>> > Hi Sophie,
>> >
>> > Re: As for the return type of the TaskAssignmentUtils, I think that
>> > makes sense. LGTM.
>> >
>> > On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna 
>> wrote:
>> >>
>> >> Hi Sophie,
>> >>
>> >> 117f:
>> >> I think, removing the STATEFUL and STATELESS types is not enough to
>> >> avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
>> >> the information whether a task is stateless or stateful into the task
>> >> assignor. However, the task assignor can return 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-06 Thread Sophie Blee-Goldman
Thanks guys. Updated the error codes in both the code and the explanation
under "Public Changes". To sum up, here are the error codes listed in the
KIP:

enum AssignmentError {
NONE,
ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
INVALID_STANDBY_TASK,
UNKNOWN_PROCESS_ID,
UNKNOWN_TASK_ID
}

Anything missing?

(also updated all the code block headings, thanks for noticing that Bruno)

On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax  wrote:

> 117f: Good point by Bruno. We should check for this, and could have an
> additional `INVALID_STANDBY_TASK` error code?
>
>
> -Matthias
>
> On 5/3/24 5:52 AM, Guozhang Wang wrote:
> > Hi Sophie,
> >
> > Re: As for the return type of the TaskAssignmentUtils, I think that
> > makes sense. LGTM.
> >
> > On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:
> >>
> >> Hi Sophie,
> >>
> >> 117f:
> >> I think, removing the STATEFUL and STATELESS types is not enough to
> >> avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
> >> the information whether a task is stateless or stateful into the task
> >> assignor. However, the task assignor can return a standby task for a
> >> stateless task which is inconsistent.
> >>
> >> Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.
> >>
> >> nit:
> >> The titles of some code blocks in the KIP are not consistent with their
> >> content, e.g., KafkaStreamsState <-> NodeState
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >> On 5/3/24 2:43 AM, Matthias J. Sax wrote:
> >>> Thanks Sophie. My bad. You are of course right about `TaskAssignment`
> >>> and the StreamsPartitionAssignor's responsibitliy to map tasks of a
> >>> instance to consumers. When I wrote my reply, I forgot about this
> detail.
> >>>
> >>> Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by
> Guozhang?
> >>>
> >>> Otherwise LGTM.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:
>  Guozhang:
> 
>  117. All three additions make sense to me. However, while thinking
> about
>  how users would actually produce an assignment, I realized that it
> seems
>  silly to make it their responsibility to distinguish between a
> stateless
>  and stateful task when they return the assignment. The
>  StreamsPartitionAssignor already knows which tasks are stateful vs
>  stateless, so there's no need to add this extra step for users to
>  figure it
>  out themselves, and potentially make a mistake.
> 
>  117f: So, rather than add a new error type for "inconsistent task
> types",
>  I'm proposing to just flatten the AssignedTask.Type enum to only
> "ACTIVE"
>  and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
>  altogether.
>  Any objections?
> 
>  -
> 
>  -Thanks, fixed the indentation of headers under "User APIs" and
>  "Read-Only
>  APIs"
> 
>  -As for the return type of the TaskAssignmentUtils methods, I don't
>  personally feel too strongly about this, but the reason for the return
>  type
>  being a Map rather than a
>  TaskAssignment
>  is because they are meant to be used iteratively/to create a part of
> the
>  full assignment, and not necessarily a full assignment for each.
> Notice
>  that they all have an input parameter of the same type: Map  KafkaStreamsAssignment>. The idea is you can take the output of any of
>  these and pass it in to another to generate or optimize another piece
> of
>  the overall assignment. For example, if you want to perform the
>  rack-aware
>  optimization on both active and standby tasks, you would need to call
>  #optimizeRackAwareActiveTasks and then forward the output to
>  #optimizeRackAwareStandbyTasks to get the final assignment. If we
>  return a
>  TaskAssignment, it will usually need to be unwrapped right away.
> Perhaps
>  more importantly, I worry that returning a TaskAssignment will make it
>  seem
>  like each of these utility methods return a "full" and final
> assignment
>  that can just be returned as-is from the TaskAssignor's #assign
> method.
>  Whereas they are each just a single step in the full assignment
> process,
>  and not the final product. Does that make sense?
> 
>  On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman
>  
>  wrote:
> 
> > Matthias:
> >
> > Thanks for the naming suggestions for the error codes. I was
> > definitely not happy with my original naming but couldn't think of
> > anything
> > better.  I like your proposals though, will update the KIP names.
> > I'll also
> > add a "NONE" option as well -- much better than just passing in null
> > for no
> > error.
> >
> >> OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
> >> same active task
> >
> >Would also be an error 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Matthias J. Sax
117f: Good point by Bruno. We should check for this, and could have an 
additional `INVALID_STANDBY_TASK` error code?



-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
the information whether a task is stateless or stateful into the task
assignor. However, the task assignor can return a standby task for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about `TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks of a
instance to consumers. When I wrote my reply, I forgot about this detail.

Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return
type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we
return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it
seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of
anything
better.  I like your proposals though, will update the KIP names.
I'll also
add a "NONE" option as well -- much better than just passing in null
for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


   Would also be an error if assigned to two consumers of the same
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this
case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating
unfixable
issues due to the TaskAssignor itself returning an invalid
assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Guozhang Wang
Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:
>
> Hi Sophie,
>
> 117f:
> I think, removing the STATEFUL and STATELESS types is not enough to
> avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
> the information whether a task is stateless or stateful into the task
> assignor. However, the task assignor can return a standby task for a
> stateless task which is inconsistent.
>
> Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.
>
> nit:
> The titles of some code blocks in the KIP are not consistent with their
> content, e.g., KafkaStreamsState <-> NodeState
>
>
> Best,
> Bruno
>
> On 5/3/24 2:43 AM, Matthias J. Sax wrote:
> > Thanks Sophie. My bad. You are of course right about `TaskAssignment`
> > and the StreamsPartitionAssignor's responsibitliy to map tasks of a
> > instance to consumers. When I wrote my reply, I forgot about this detail.
> >
> > Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?
> >
> > Otherwise LGTM.
> >
> >
> > -Matthias
> >
> > On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:
> >> Guozhang:
> >>
> >> 117. All three additions make sense to me. However, while thinking about
> >> how users would actually produce an assignment, I realized that it seems
> >> silly to make it their responsibility to distinguish between a stateless
> >> and stateful task when they return the assignment. The
> >> StreamsPartitionAssignor already knows which tasks are stateful vs
> >> stateless, so there's no need to add this extra step for users to
> >> figure it
> >> out themselves, and potentially make a mistake.
> >>
> >> 117f: So, rather than add a new error type for "inconsistent task types",
> >> I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
> >> and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
> >> altogether.
> >> Any objections?
> >>
> >> -
> >>
> >> -Thanks, fixed the indentation of headers under "User APIs" and
> >> "Read-Only
> >> APIs"
> >>
> >> -As for the return type of the TaskAssignmentUtils methods, I don't
> >> personally feel too strongly about this, but the reason for the return
> >> type
> >> being a Map rather than a
> >> TaskAssignment
> >> is because they are meant to be used iteratively/to create a part of the
> >> full assignment, and not necessarily a full assignment for each. Notice
> >> that they all have an input parameter of the same type: Map >> KafkaStreamsAssignment>. The idea is you can take the output of any of
> >> these and pass it in to another to generate or optimize another piece of
> >> the overall assignment. For example, if you want to perform the
> >> rack-aware
> >> optimization on both active and standby tasks, you would need to call
> >> #optimizeRackAwareActiveTasks and then forward the output to
> >> #optimizeRackAwareStandbyTasks to get the final assignment. If we
> >> return a
> >> TaskAssignment, it will usually need to be unwrapped right away. Perhaps
> >> more importantly, I worry that returning a TaskAssignment will make it
> >> seem
> >> like each of these utility methods return a "full" and final assignment
> >> that can just be returned as-is from the TaskAssignor's #assign method.
> >> Whereas they are each just a single step in the full assignment process,
> >> and not the final product. Does that make sense?
> >>
> >> On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman
> >> 
> >> wrote:
> >>
> >>> Matthias:
> >>>
> >>> Thanks for the naming suggestions for the error codes. I was
> >>> definitely not happy with my original naming but couldn't think of
> >>> anything
> >>> better.  I like your proposals though, will update the KIP names.
> >>> I'll also
> >>> add a "NONE" option as well -- much better than just passing in null
> >>> for no
> >>> error.
> >>>
>  OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>  same active task
> >>>
> >>>   Would also be an error if assigned to two consumers of the same
> >>> client...
>  Needs to be rephrased.
> >>>
> >>>
> >>> Well the TaskAssignor only assigns tasks to KafkaStreams clients,
> >>> it's not
> >>> responsible for the assignment of tasks to consumers within a
> >>> KafkaStreams.
> >>> It would be a bug in the StreamsPartitionAssignor if it received a valid
> >>> assignment from the TaskAssignor with only one copy of a task
> >>> assigned to a
> >>> single KAfkaStreams client, and then somehow ended up assigning that
> >>> task
> >>> to multiple consumers on the KafkaStreams client. It wouldn't be the
> >>> TaskAssignor's fault so imo it would not make sense to include this
> >>> case in
> >>> the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
> >>> ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
> >>> caused
> >>> the StreamsPartitionAssignor to assign a task to multiple consumers, it
> >>> presumably wouldn't even notice 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Bruno Cadonna

Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to 
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes 
the information whether a task is stateless or stateful into the task 
assignor. However, the task assignor can return a standby task for a 
stateless task which is inconsistent.


Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their 
content, e.g., KafkaStreamsState <-> NodeState



Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:
Thanks Sophie. My bad. You are of course right about `TaskAssignment` 
and the StreamsPartitionAssignor's responsibitliy to map tasks of a 
instance to consumers. When I wrote my reply, I forgot about this detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to 
figure it

out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types 
altogether.

Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and 
"Read-Only

APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return 
type
being a Map rather than a 
TaskAssignment

is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the 
rack-aware

optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we 
return a

TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it 
seem

like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 


wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of 
anything
better.  I like your proposals though, will update the KIP names. 
I'll also
add a "NONE" option as well -- much better than just passing in null 
for no

error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


  Would also be an error if assigned to two consumers of the same 
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients, 
it's not
responsible for the assignment of tasks to consumers within a 
KafkaStreams.

It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task 
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that 
task

to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this 
case in

the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that 
caused

the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating 
unfixable
issues due to the TaskAssignor itself returning an invalid 
assignment. The

phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just 
throwing a

StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 


wrote:


Jumping back to the party here :)

107: I agree with the rationale behind this, and

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax
Thanks Sophie. My bad. You are of course right about `TaskAssignment` 
and the StreamsPartitionAssignor's responsibitliy to map tasks of a 
instance to consumers. When I wrote my reply, I forgot about this detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and "Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return type
being a Map rather than a TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 
wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of anything
better.  I like your proposals though, will update the KIP names. I'll also
add a "NONE" option as well -- much better than just passing in null for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


  Would also be an error if assigned to two consumers of the same client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
responsible for the assignment of tasks to consumers within a KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task assigned to a
single KAfkaStreams client, and then somehow ended up assigning that task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating unfixable
issues due to the TaskAssignor itself returning an invalid assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just throwing a
StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
wrote:


Jumping back to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
   * 117e: unknown taskID: fail
   * 117f: inconsistent task types (e.g. a known taskID was indicated
stateless from ApplicationState, but the returned AssignedTask states
stateful): fail
   * 117g: some ProcessID was not included in the returned Set: pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up with :)


If any of these errors are detected, the StreamsPartitionAssignor will

immediately "fail" the rebalance and retry it by scheduling an 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Sophie Blee-Goldman
Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and "Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return type
being a Map rather than a TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 
wrote:

> Matthias:
>
> Thanks for the naming suggestions for the error codes. I was
> definitely not happy with my original naming but couldn't think of anything
> better.  I like your proposals though, will update the KIP names. I'll also
> add a "NONE" option as well -- much better than just passing in null for no
> error.
>
> > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>> same active task
>
>  Would also be an error if assigned to two consumers of the same client...
>> Needs to be rephrased.
>
>
> Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
> responsible for the assignment of tasks to consumers within a KafkaStreams.
> It would be a bug in the StreamsPartitionAssignor if it received a valid
> assignment from the TaskAssignor with only one copy of a task assigned to a
> single KAfkaStreams client, and then somehow ended up assigning that task
> to multiple consumers on the KafkaStreams client. It wouldn't be the
> TaskAssignor's fault so imo it would not make sense to include this case in
> the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
> ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
> the StreamsPartitionAssignor to assign a task to multiple consumers, it
> presumably wouldn't even notice since it's a bug -- if it did notice, it
> can just fix the issue. The error codes are about communicating unfixable
> issues due to the TaskAssignor itself returning an invalid assignment. The
> phrasing is intentional, and (imo) correct as it is.
>
> I do see your point about how the StreamsPartitionAssignor should
> handle/react to invalid assignments though. I'm fine with just throwing a
> StreamsException and crashing after we invoke the #onAssignmentComputed
> callback to notify the user of the error.
>
> On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
> wrote:
>
>> Jumping back to the party here :)
>>
>> 107: I agree with the rationale behind this, and
>> `numProcessingThreads` looks good to me as it covers both the current
>> and future scenarios.
>>
>> 117: I agree with Lucas and Bruno, and would add:
>>   * 117e: unknown taskID: fail
>>   * 117f: inconsistent task types (e.g. a known taskID was indicated
>> stateless from ApplicationState, but the returned AssignedTask states
>> stateful): fail
>>   * 117g: some ProcessID was not included in the returned Set: pass,
>> and interprets it as no tasks assigned to it.
>>
>> And I'm open for any creative error codes folks would come up with :)
>>
>> > If any of these errors are detected, the StreamsPartitionAssignor will
>> immediately "fail" the rebalance and retry it by scheduling an immediate
>> followup rebalance.
>>
>> I'm also a bit concerned here, as such endless retry loops have
>> happened in the past in my memory. Given that we would likely see most
>> of the user implementations be deterministic, I'm also leaning towards
>> 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Sophie Blee-Goldman
Matthias:

Thanks for the naming suggestions for the error codes. I was definitely not
happy with my original naming but couldn't think of anything better.  I
like your proposals though, will update the KIP names. I'll also add a
"NONE" option as well -- much better than just passing in null for no error.

> OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same
> active task

 Would also be an error if assigned to two consumers of the same client...
> Needs to be rephrased.


Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
responsible for the assignment of tasks to consumers within a KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task assigned to a
single KAfkaStreams client, and then somehow ended up assigning that task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating unfixable
issues due to the TaskAssignor itself returning an invalid assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just throwing a
StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
wrote:

> Jumping back to the party here :)
>
> 107: I agree with the rationale behind this, and
> `numProcessingThreads` looks good to me as it covers both the current
> and future scenarios.
>
> 117: I agree with Lucas and Bruno, and would add:
>   * 117e: unknown taskID: fail
>   * 117f: inconsistent task types (e.g. a known taskID was indicated
> stateless from ApplicationState, but the returned AssignedTask states
> stateful): fail
>   * 117g: some ProcessID was not included in the returned Set: pass,
> and interprets it as no tasks assigned to it.
>
> And I'm open for any creative error codes folks would come up with :)
>
> > If any of these errors are detected, the StreamsPartitionAssignor will
> immediately "fail" the rebalance and retry it by scheduling an immediate
> followup rebalance.
>
> I'm also a bit concerned here, as such endless retry loops have
> happened in the past in my memory. Given that we would likely see most
> of the user implementations be deterministic, I'm also leaning towards
> failing the app immediately and let the crowd educates us if there are
> some very interesting scenarios out there that are not on our radar to
> re-consider this, rather than getting hard to debug cases in the dark.
>
> -
>
> And here are just some nits about the KIP writings itself:
>
> * I think some bullet points under `User APIs` and `Read-only APIs`
> should have a lower level indention? It caught me for a sec until I
> realized there are just two categories.
>
> * In TaskAssignmentUtils , why not let those util functions return
> `TaskAssignment` (to me it feels more consistent with the user APIs),
> but instead return a Map?
>
>
> Guozhang
>
> On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:
> >
> > I like the idea of error codes. Not sure if the name are ideal?
> > UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
> > difficult to understand?
> >
> > Should we be very descriptive (and also try to avoid coupling it to the
> > threading model -- important for the first error code):
> >   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
> >   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE
> >
> > I think we also need to add NONE as option or make the error parameter
> > an `Optional`?
> >
> >
> > > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
> same active task
> >
> > Would also be an error if assigned to two consumers of the same
> > client... Needs to be rephrased.
> >
> >
> >
> > > If any of these errors are detected, the StreamsPartitionAssignor will
> immediately "fail" the rebalance and retry it by scheduling an immediate
> followup rebalance.
> >
> > Does this make sense? If we assume that the task-assignment is
> > deterministic, we would end up with an infinite retry loop? Also,
> > assuming that an client leave the group, we cannot assign some task any
> > longer... I would rather throw a StreamsException and let the client
> crash.
> >
> >
> >
> > -Matthias
> >
> > On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
> > > One last thing: I added an error code enum to be returned from the
> > > #onAssignmentComputed method in case of an invalid assignment. I
> 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-01 Thread Guozhang Wang
Jumping back to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
  * 117e: unknown taskID: fail
  * 117f: inconsistent task types (e.g. a known taskID was indicated
stateless from ApplicationState, but the returned AssignedTask states
stateful): fail
  * 117g: some ProcessID was not included in the returned Set: pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up with :)

> If any of these errors are detected, the StreamsPartitionAssignor will 
> immediately "fail" the rebalance and retry it by scheduling an immediate 
> followup rebalance.

I'm also a bit concerned here, as such endless retry loops have
happened in the past in my memory. Given that we would likely see most
of the user implementations be deterministic, I'm also leaning towards
failing the app immediately and let the crowd educates us if there are
some very interesting scenarios out there that are not on our radar to
re-consider this, rather than getting hard to debug cases in the dark.

-

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only APIs`
should have a lower level indention? It caught me for a sec until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions return
`TaskAssignment` (to me it feels more consistent with the user APIs),
but instead return a Map?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:
>
> I like the idea of error codes. Not sure if the name are ideal?
> UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
> difficult to understand?
>
> Should we be very descriptive (and also try to avoid coupling it to the
> threading model -- important for the first error code):
>   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
>   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE
>
> I think we also need to add NONE as option or make the error parameter
> an `Optional`?
>
>
> > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same 
> > active task
>
> Would also be an error if assigned to two consumers of the same
> client... Needs to be rephrased.
>
>
>
> > If any of these errors are detected, the StreamsPartitionAssignor will 
> > immediately "fail" the rebalance and retry it by scheduling an immediate 
> > followup rebalance.
>
> Does this make sense? If we assume that the task-assignment is
> deterministic, we would end up with an infinite retry loop? Also,
> assuming that an client leave the group, we cannot assign some task any
> longer... I would rather throw a StreamsException and let the client crash.
>
>
>
> -Matthias
>
> On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
> > One last thing: I added an error code enum to be returned from the
> > #onAssignmentComputed method in case of an invalid assignment. I created
> > one code for each of the invalid cases we described above. The downside is
> > that this means we'll have to go through a deprecation cycle if we want to
> > loosen up the restrictions on any of the enforced cases. The upside is that
> > we can very clearly mark what is an invalid assignment and this will
> > (hopefully) assist users who are new to customizing assignments by clearly
> > denoting the requirements, and returning a clear error if they are not
> > followed.
> >
> > Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
> > this case by returning the same assignment to the consumers and scheduling
> > a followup rebalance. I've added all of this to the TaskAssignor  and
> > #onAssignmentComputed javadocs, and added a section under "Public Changes"
> > as well.
> >
> > Please let me know if there are any concerns, or if you have suggestions
> > for how else we can handle an invalid assignment
> >
> > On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
> > wrote:
> >
> >> Thanks guys! I agree with what Lucas said about 117c, we can always loosen
> >> a restriction later and I don't want to do anything now that might get in
> >> the way of the new threading models.
> >>
> >> With that I think we're all in agreement on 117. I'll update the KIP to
> >> include what we've discussed
> >>
> >> (and will fix the remaining #finalAssignment mention as well, thanks
> >> Bruno. Glad to have such good proof readers! :P)
> >>
> >> On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:
> >>
> >>> Hi again,
> >>>
> >>> I forgot to ask whether you could add the agreement about handling
> >>> invalid assignment to the KIP.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
>  Hi all,
> 
>  I think we are converging!
> 
>  117
>  a) fail: Since it is an invalid consumer assignment
>  b) pass: I agree that not assigning a task might 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Matthias J. Sax
I like the idea of error codes. Not sure if the name are ideal? 
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit 
difficult to understand?


Should we be very descriptive (and also try to avoid coupling it to the 
threading model -- important for the first error code):

 - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
 - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter 
an `Optional`?




OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same 
active task


Would also be an error if assigned to two consumers of the same 
client... Needs to be rephrased.





If any of these errors are detected, the StreamsPartitionAssignor will immediately 
"fail" the rebalance and retry it by scheduling an immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is 
deterministic, we would end up with an infinite retry loop? Also, 
assuming that an client leave the group, we cannot assign some task any 
longer... I would rather throw a StreamsException and let the client crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I created
one code for each of the invalid cases we described above. The downside is
that this means we'll have to go through a deprecation cycle if we want to
loosen up the restrictions on any of the enforced cases. The upside is that
we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by clearly
denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
this case by returning the same assignment to the consumers and scheduling
a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public Changes"
as well.

Please let me know if there are any concerns, or if you have suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always loosen
a restriction later and I don't want to do anything now that might get in
the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP to
include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:


Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is
called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible*
assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised
so far,
so I just want to remind folks that there's already a voting thread

for

this. I cast my vote a few minutes ago so it should resurface in
everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 


wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Sophie Blee-Goldman
One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I created
one code for each of the invalid cases we described above. The downside is
that this means we'll have to go through a deprecation cycle if we want to
loosen up the restrictions on any of the enforced cases. The upside is that
we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by clearly
denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
this case by returning the same assignment to the consumers and scheduling
a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public Changes"
as well.

Please let me know if there are any concerns, or if you have suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
wrote:

> Thanks guys! I agree with what Lucas said about 117c, we can always loosen
> a restriction later and I don't want to do anything now that might get in
> the way of the new threading models.
>
> With that I think we're all in agreement on 117. I'll update the KIP to
> include what we've discussed
>
> (and will fix the remaining #finalAssignment mention as well, thanks
> Bruno. Glad to have such good proof readers! :P)
>
> On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:
>
>> Hi again,
>>
>> I forgot to ask whether you could add the agreement about handling
>> invalid assignment to the KIP.
>>
>> Best,
>> Bruno
>>
>> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
>> > Hi all,
>> >
>> > I think we are converging!
>> >
>> > 117
>> > a) fail: Since it is an invalid consumer assignment
>> > b) pass: I agree that not assigning a task might be reasonable in some
>> > situations
>> > c) fail: For the reasons Lucas pointed out. I am missing a good use
>> case
>> > here.
>> > d) fail: It is invalid
>> >
>> >
>> > Somewhere in the KIP you still use finalAssignment() instead of the
>> > wonderful method name onAssignmentComputed() ;-)
>> > "... interface also includes a method named finalAssignment which is
>> > called with the final computed GroupAssignment ..."
>> >
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> > On 4/30/24 1:04 PM, Lucas Brutschy wrote:
>> >> Hi,
>> >>
>> >> Looks like a great KIP to me!
>> >>
>> >> I'm late, so I'm only going to comment on the last open point 117. I'm
>> >> against any fallbacks like "use the default assignor if the custom
>> >> assignment is invalid", as it's just going to hide bugs. For the 4
>> >> cases mentioned by Sophie:
>> >>
>> >> 117a) I'd fail immediately here, as it's an implementation bug, and
>> >> should not lead to a valid consumer group assignment.
>> >> 117b) Agreed. This is a useful assignment and should be allowed.
>> >> 117c) This is the tricky case. However, I'm leaning towards not
>> >> allowing this, unless we have a concrete use case. This will block us
>> >> from potentially using a single consumer for active and standby tasks
>> >> in the future. It's easier to drop the restriction later if we have a
>> >> concrete use case.
>> >> 117d) Definitely fail immediately, as you said.
>> >>
>> >> Cheers,
>> >> Lucas
>> >>
>> >>
>> >>
>> >> On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
>> >>  wrote:
>> >>>
>> >>> Yeah I think that sums it up well. Either you computed a *possible*
>> >>> assignment,
>> >>> or you returned something that makes it literally impossible for the
>> >>> StreamsPartitionAssignor to decipher/translate into an actual group
>> >>> assignment, in which case it should just fail
>> >>>
>> >>> That's more or less it for the open questions that have been raised
>> >>> so far,
>> >>> so I just want to remind folks that there's already a voting thread
>> for
>> >>> this. I cast my vote a few minutes ago so it should resurface in
>> >>> everyone's
>> >>> inbox :)
>> >>>
>> >>> On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai > >
>> >>> wrote:
>> >>>
>>  117: as Sophie laid out, there are two cases here right:
>>  1. cases that are considered invalid by the existing assignors but
>> are
>>  still valid assignments in the sense that they can be used to
>>  generate a
>>  valid consumer group assignment (from the perspective of the
>>  consumer group
>>  protocol). An assignment that excludes a task is one such example,
>> and
>>  Sophie pointed out a good use case for it. I also think it makes
>>  sense to
>>  allow these. It's hard to predict how a user might want to use the
>>  custom
>>  assignor, and its reasonable to expect them to use it with care and
>> not
>>  hand-hold them.
>>  2. cases that are not valid because it is impossible to compute a
>> valid
>>  consumer group assignment from them. In this case it seems totally

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Sophie Blee-Goldman
Thanks guys! I agree with what Lucas said about 117c, we can always loosen
a restriction later and I don't want to do anything now that might get in
the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP to
include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks Bruno.
Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:

> Hi again,
>
> I forgot to ask whether you could add the agreement about handling
> invalid assignment to the KIP.
>
> Best,
> Bruno
>
> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
> > Hi all,
> >
> > I think we are converging!
> >
> > 117
> > a) fail: Since it is an invalid consumer assignment
> > b) pass: I agree that not assigning a task might be reasonable in some
> > situations
> > c) fail: For the reasons Lucas pointed out. I am missing a good use case
> > here.
> > d) fail: It is invalid
> >
> >
> > Somewhere in the KIP you still use finalAssignment() instead of the
> > wonderful method name onAssignmentComputed() ;-)
> > "... interface also includes a method named finalAssignment which is
> > called with the final computed GroupAssignment ..."
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 4/30/24 1:04 PM, Lucas Brutschy wrote:
> >> Hi,
> >>
> >> Looks like a great KIP to me!
> >>
> >> I'm late, so I'm only going to comment on the last open point 117. I'm
> >> against any fallbacks like "use the default assignor if the custom
> >> assignment is invalid", as it's just going to hide bugs. For the 4
> >> cases mentioned by Sophie:
> >>
> >> 117a) I'd fail immediately here, as it's an implementation bug, and
> >> should not lead to a valid consumer group assignment.
> >> 117b) Agreed. This is a useful assignment and should be allowed.
> >> 117c) This is the tricky case. However, I'm leaning towards not
> >> allowing this, unless we have a concrete use case. This will block us
> >> from potentially using a single consumer for active and standby tasks
> >> in the future. It's easier to drop the restriction later if we have a
> >> concrete use case.
> >> 117d) Definitely fail immediately, as you said.
> >>
> >> Cheers,
> >> Lucas
> >>
> >>
> >>
> >> On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
> >>  wrote:
> >>>
> >>> Yeah I think that sums it up well. Either you computed a *possible*
> >>> assignment,
> >>> or you returned something that makes it literally impossible for the
> >>> StreamsPartitionAssignor to decipher/translate into an actual group
> >>> assignment, in which case it should just fail
> >>>
> >>> That's more or less it for the open questions that have been raised
> >>> so far,
> >>> so I just want to remind folks that there's already a voting thread for
> >>> this. I cast my vote a few minutes ago so it should resurface in
> >>> everyone's
> >>> inbox :)
> >>>
> >>> On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
> >>> wrote:
> >>>
>  117: as Sophie laid out, there are two cases here right:
>  1. cases that are considered invalid by the existing assignors but are
>  still valid assignments in the sense that they can be used to
>  generate a
>  valid consumer group assignment (from the perspective of the
>  consumer group
>  protocol). An assignment that excludes a task is one such example, and
>  Sophie pointed out a good use case for it. I also think it makes
>  sense to
>  allow these. It's hard to predict how a user might want to use the
>  custom
>  assignor, and its reasonable to expect them to use it with care and
> not
>  hand-hold them.
>  2. cases that are not valid because it is impossible to compute a
> valid
>  consumer group assignment from them. In this case it seems totally
>  reasonable to just throw a fatal exception that gets passed to the
>  uncaught
>  exception handler. If this case happens then there is some bug in the
>  user's assignor and its totally reasonable to fail the application
>  in that
>  case. We _could_ try to be more graceful and default to one of the
>  existing
>  assignors. But it's usually better to fail hard and fast when there
>  is some
>  illegal state detected imo.
> 
>  On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
>  wrote:
> 
> > Bruno, I've incorporated your feedback into the KIP document.
> >
> > On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai  >
> > wrote:
> >
> >> Thanks for the feedback Bruno! For the most part I think it makes
> >> sense,
> >> but leaving a couple follow-up thoughts/questions:
> >>
> >> re 4: I think Sophie's point was slightly different - that we
> >> might want
> >> to wrap the return type for `assign` in a class so that its easily
> >> extensible. This makes sense to me. Whether we do that or not, we
> can
>  have
> >> the return type be a Set instead of a Map as well.
> >>
> >> re 6: 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Bruno Cadonna

Hi again,

I forgot to ask whether you could add the agreement about handling 
invalid assignment to the KIP.


Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some 
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use case 
here.

d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the 
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is 
called with the final computed GroupAssignment ..."



Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible* 
assignment,

or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised 
so far,

so I just want to remind folks that there's already a voting thread for
this. I cast my vote a few minutes ago so it should resurface in 
everyone's

inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to 
generate a
valid consumer group assignment (from the perspective of the 
consumer group

protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes 
sense to
allow these. It's hard to predict how a user might want to use the 
custom

assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the 
uncaught

exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application 
in that
case. We _could_ try to be more graceful and default to one of the 
existing
assignors. But it's usually better to fail hard and fast when there 
is some

illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
wrote:


Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
wrote:

Thanks for the feedback Bruno! For the most part I think it makes 
sense,

but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we 
might want

to wrap the return type for `assign` in a class so that its easily
extensible. This makes sense to me. Whether we do that or not, we can

have

the return type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I
like your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition

assignor

is clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful

personally.
I think its worth adding an interface that lets the plugin 
observe the

final assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with

that.


On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 


wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?
You're right that this is out of place. I've removed this method as
it's not needed by the task assignor.

2. Just on the API layering itself: it feels a 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Bruno Cadonna

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some 
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use case 
here.

d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the 
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is 
called with the final computed GroupAssignment ..."



Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible* assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised so far,
so I just want to remind folks that there's already a voting thread for
this. I cast my vote a few minutes ago so it should resurface in everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to generate a
valid consumer group assignment (from the perspective of the consumer group
protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes sense to
allow these. It's hard to predict how a user might want to use the custom
assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the uncaught
exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application in that
case. We _could_ try to be more graceful and default to one of the existing
assignors. But it's usually better to fail hard and fast when there is some
illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
wrote:


Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
wrote:


Thanks for the feedback Bruno! For the most part I think it makes sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we might want
to wrap the return type for `assign` in a class so that its easily
extensible. This makes sense to me. Whether we do that or not, we can

have

the return type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I
like your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition

assignor

is clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful

personally.

I think its worth adding an interface that lets the plugin observe the
final assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with

that.


On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?
You're right that this is out of place. I've removed this method as
it's not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the ApplicationMetadata class. If we consider them as some default

util

functions, how about 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Lucas Brutschy
Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:
>
> Yeah I think that sums it up well. Either you computed a *possible* 
> assignment,
> or you returned something that makes it literally impossible for the
> StreamsPartitionAssignor to decipher/translate into an actual group
> assignment, in which case it should just fail
>
> That's more or less it for the open questions that have been raised so far,
> so I just want to remind folks that there's already a voting thread for
> this. I cast my vote a few minutes ago so it should resurface in everyone's
> inbox :)
>
> On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
> wrote:
>
> > 117: as Sophie laid out, there are two cases here right:
> > 1. cases that are considered invalid by the existing assignors but are
> > still valid assignments in the sense that they can be used to generate a
> > valid consumer group assignment (from the perspective of the consumer group
> > protocol). An assignment that excludes a task is one such example, and
> > Sophie pointed out a good use case for it. I also think it makes sense to
> > allow these. It's hard to predict how a user might want to use the custom
> > assignor, and its reasonable to expect them to use it with care and not
> > hand-hold them.
> > 2. cases that are not valid because it is impossible to compute a valid
> > consumer group assignment from them. In this case it seems totally
> > reasonable to just throw a fatal exception that gets passed to the uncaught
> > exception handler. If this case happens then there is some bug in the
> > user's assignor and its totally reasonable to fail the application in that
> > case. We _could_ try to be more graceful and default to one of the existing
> > assignors. But it's usually better to fail hard and fast when there is some
> > illegal state detected imo.
> >
> > On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
> > wrote:
> >
> > > Bruno, I've incorporated your feedback into the KIP document.
> > >
> > > On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
> > > wrote:
> > >
> > >> Thanks for the feedback Bruno! For the most part I think it makes sense,
> > >> but leaving a couple follow-up thoughts/questions:
> > >>
> > >> re 4: I think Sophie's point was slightly different - that we might want
> > >> to wrap the return type for `assign` in a class so that its easily
> > >> extensible. This makes sense to me. Whether we do that or not, we can
> > have
> > >> the return type be a Set instead of a Map as well.
> > >>
> > >> re 6: Yes, it's a callback that's called with the final assignment. I
> > >> like your suggested name.
> > >>
> > >> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
> > >> wrote:
> > >>
> > >>> Thanks for the feedback Sophie!
> > >>>
> > >>> re1: Totally agree. The fact that it's related to the partition
> > assignor
> > >>> is clear from just `task.assignor`. I'll update.
> > >>> re3: This is a good point, and something I would find useful
> > personally.
> > >>> I think its worth adding an interface that lets the plugin observe the
> > >>> final assignment. I'll add that.
> > >>> re4: I like the new `NodeAssignment` type. I'll update the KIP with
> > that.
> > >>>
> > >>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> > >>> wrote:
> > >>>
> >  Thanks for the feedback so far! I think pretty much all of it is
> >  reasonable. I'll reply to it inline:
> > 
> >  > 1. All the API logic is granular at the Task level, except the
> >  previousOwnerForPartition func. I’m not clear what’s the motivation
> >  behind it, does our controller also want to change how the
> >  partitions->tasks mapping is formed?
> >  You're right that this is out of place. I've removed this method as
> >  it's not needed by the task assignor.
> > 
> >  > 2. Just on the API layering itself: it feels a bit weird to have the
> >  three built-in functions (defaultStandbyTaskAssignment etc) sitting in
> >  the ApplicationMetadata class. If we consider them as some default
> > util
> >  functions, how about introducing moving those into their own static
> > util
> >  methods to separate from the ApplicationMetadata “fact objects” ?

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-29 Thread Sophie Blee-Goldman
Yeah I think that sums it up well. Either you computed a *possible* assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised so far,
so I just want to remind folks that there's already a voting thread for
this. I cast my vote a few minutes ago so it should resurface in everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
wrote:

> 117: as Sophie laid out, there are two cases here right:
> 1. cases that are considered invalid by the existing assignors but are
> still valid assignments in the sense that they can be used to generate a
> valid consumer group assignment (from the perspective of the consumer group
> protocol). An assignment that excludes a task is one such example, and
> Sophie pointed out a good use case for it. I also think it makes sense to
> allow these. It's hard to predict how a user might want to use the custom
> assignor, and its reasonable to expect them to use it with care and not
> hand-hold them.
> 2. cases that are not valid because it is impossible to compute a valid
> consumer group assignment from them. In this case it seems totally
> reasonable to just throw a fatal exception that gets passed to the uncaught
> exception handler. If this case happens then there is some bug in the
> user's assignor and its totally reasonable to fail the application in that
> case. We _could_ try to be more graceful and default to one of the existing
> assignors. But it's usually better to fail hard and fast when there is some
> illegal state detected imo.
>
> On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
> wrote:
>
> > Bruno, I've incorporated your feedback into the KIP document.
> >
> > On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
> > wrote:
> >
> >> Thanks for the feedback Bruno! For the most part I think it makes sense,
> >> but leaving a couple follow-up thoughts/questions:
> >>
> >> re 4: I think Sophie's point was slightly different - that we might want
> >> to wrap the return type for `assign` in a class so that its easily
> >> extensible. This makes sense to me. Whether we do that or not, we can
> have
> >> the return type be a Set instead of a Map as well.
> >>
> >> re 6: Yes, it's a callback that's called with the final assignment. I
> >> like your suggested name.
> >>
> >> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
> >> wrote:
> >>
> >>> Thanks for the feedback Sophie!
> >>>
> >>> re1: Totally agree. The fact that it's related to the partition
> assignor
> >>> is clear from just `task.assignor`. I'll update.
> >>> re3: This is a good point, and something I would find useful
> personally.
> >>> I think its worth adding an interface that lets the plugin observe the
> >>> final assignment. I'll add that.
> >>> re4: I like the new `NodeAssignment` type. I'll update the KIP with
> that.
> >>>
> >>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> >>> wrote:
> >>>
>  Thanks for the feedback so far! I think pretty much all of it is
>  reasonable. I'll reply to it inline:
> 
>  > 1. All the API logic is granular at the Task level, except the
>  previousOwnerForPartition func. I’m not clear what’s the motivation
>  behind it, does our controller also want to change how the
>  partitions->tasks mapping is formed?
>  You're right that this is out of place. I've removed this method as
>  it's not needed by the task assignor.
> 
>  > 2. Just on the API layering itself: it feels a bit weird to have the
>  three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>  the ApplicationMetadata class. If we consider them as some default
> util
>  functions, how about introducing moving those into their own static
> util
>  methods to separate from the ApplicationMetadata “fact objects” ?
>  Agreed. Updated in the latest revision of the kip. These have been
>  moved to TaskAssignorUtils
> 
>  > 3. I personally prefer `NodeAssignment` to be a read-only object
>  containing the decisions made by the assignor, including the
>  requestFollowupRebalance flag. For manipulating the half-baked results
>  inside the assignor itself, maybe we can just be flexible to let
> users use
>  whatever struts / their own classes even, if they like. WDYT?
>  Agreed. Updated in the latest version of the kip.
> 
>  > 1. For the API, thoughts on changing the method signature to return
> a
>  (non-Optional) TaskAssignor? Then we can either have the default
>  implementation return new HighAvailabilityTaskAssignor or just have a
>  default implementation class that people can extend if they don't
> want to
>  implement every method.
>  Based on some other discussion, I actually decided to get rid of the
>  plugin interface, and instead use config to specify individual plugin
> 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-26 Thread Rohan Desai
117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to generate a
valid consumer group assignment (from the perspective of the consumer group
protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes sense to
allow these. It's hard to predict how a user might want to use the custom
assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the uncaught
exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application in that
case. We _could_ try to be more graceful and default to one of the existing
assignors. But it's usually better to fail hard and fast when there is some
illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai  wrote:

> Bruno, I've incorporated your feedback into the KIP document.
>
> On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback Bruno! For the most part I think it makes sense,
>> but leaving a couple follow-up thoughts/questions:
>>
>> re 4: I think Sophie's point was slightly different - that we might want
>> to wrap the return type for `assign` in a class so that its easily
>> extensible. This makes sense to me. Whether we do that or not, we can have
>> the return type be a Set instead of a Map as well.
>>
>> re 6: Yes, it's a callback that's called with the final assignment. I
>> like your suggested name.
>>
>> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the feedback Sophie!
>>>
>>> re1: Totally agree. The fact that it's related to the partition assignor
>>> is clear from just `task.assignor`. I'll update.
>>> re3: This is a good point, and something I would find useful personally.
>>> I think its worth adding an interface that lets the plugin observe the
>>> final assignment. I'll add that.
>>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>>>
>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
>>> wrote:
>>>
 Thanks for the feedback so far! I think pretty much all of it is
 reasonable. I'll reply to it inline:

 > 1. All the API logic is granular at the Task level, except the
 previousOwnerForPartition func. I’m not clear what’s the motivation
 behind it, does our controller also want to change how the
 partitions->tasks mapping is formed?
 You're right that this is out of place. I've removed this method as
 it's not needed by the task assignor.

 > 2. Just on the API layering itself: it feels a bit weird to have the
 three built-in functions (defaultStandbyTaskAssignment etc) sitting in
 the ApplicationMetadata class. If we consider them as some default util
 functions, how about introducing moving those into their own static util
 methods to separate from the ApplicationMetadata “fact objects” ?
 Agreed. Updated in the latest revision of the kip. These have been
 moved to TaskAssignorUtils

 > 3. I personally prefer `NodeAssignment` to be a read-only object
 containing the decisions made by the assignor, including the
 requestFollowupRebalance flag. For manipulating the half-baked results
 inside the assignor itself, maybe we can just be flexible to let users use
 whatever struts / their own classes even, if they like. WDYT?
 Agreed. Updated in the latest version of the kip.

 > 1. For the API, thoughts on changing the method signature to return a
 (non-Optional) TaskAssignor? Then we can either have the default
 implementation return new HighAvailabilityTaskAssignor or just have a
 default implementation class that people can extend if they don't want to
 implement every method.
 Based on some other discussion, I actually decided to get rid of the
 plugin interface, and instead use config to specify individual plugin
 behaviour. So the method you're referring to is no longer part of the
 proposal.

 > 3. Speaking of ApplicationMetadata, the javadoc says it's read only
 but
 theres methods that return void on it? It's not totally clear to me how
 that interface is supposed to be used by the assignor. It'd be nice if we
 could flip that interface such that it becomes part of the output instead
 of an input to the plugin.
 I've moved those methods to a util class. They're really utility
 methods the assignor might want to call to do some default or optimized
 assignment for some cases like rack-awareness.

 > 4. We should consider wrapping UUID in a ProcessID class so that we
 control

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-25 Thread Matthias J. Sax

Thanks for the details!


107: I like `numProcessingThreads()` proposal.


108(a): it was not really a concern, but it was rather a question if we 
could/should simplify it, so it's easier to implement a custom task 
assignor. But if we believe that it's an integral component, I am fine 
with leaving the responsibility to request follow-up rebalances to the user.


108(b): Similar to 108(a). But I am ok to give control to the user too.


115: SGTM.



-Matthias

On 4/25/24 3:45 PM, Sophie Blee-Goldman wrote:

104. Fair enough -- also happy to defer to Rohan on this (or Bruno if he
feels super strongly)

107. That's a good point . Ultimately the task load should reflect the
processing capacity, and that's something that will exist in both the new
and old threading model. I like #processingCapacity for the name. And for
the javadocs, I think we can just say something generic enough that it will
be accurate in both the old and new model and won't need to be updated
(since, as you mentioned before, we always forget to update random
javadocs).

As for the NOTE, however, I do feel that is necessary. At the very least,
it's necessary if we rename the method to something with the word
"capacity", since that tends to imply a "maximum limit" rather than a
"minimum load". In fact I think the original method was named capacity and
that's why I added this NOTE to the javadoc in the first place, to avoid
confusion. I can see why it doesn't make sense if the name doesn't have the
word "capacity" in it though.

But I do think #processingCapacity feels appropriate. I guess a reasonable
alternative would just be #numProcessingThreads, which is both descriptive
enough to not need the NOTE in the javadocs and also accurately describes
both the old one-consumer-per-StreamThread model and the new one (it's just
a matter of what "processing thread" means will change). So how about we
just call it #numProcessingThreads and in the javadocs we can say it's like
a capacity that should correspond to the relative "weight" of assigned
tasks relative to other KafkaStreams clients? WDYT?

108. Yes, I 100% believe that a custom assignor should be able to schedule
a followup rebalance:

108(1) For one thing, imo the HATaskAssignor shouldn't be a "special case"
but should effectively look just like any other custom assignor. And it
definitely needs the ability to schedule followups. How would the
StreamsPartitionAssignor figure out whether to schedule a followup
rebalance for the HAAssignor? From the StreamsPArtitionAssignor POV it just
sees "active" and "standby" tasks -- It doesn't know whether some of those
standby tasks are actually "warmup tasks". And anyways, I think users
should be able to request a followup rebalance. What is the concern over
giving custom assignors control over this? Imo if people abuse this and
shoot themselves in the foot by scheduling followup rebalances for no
reason, that's on them. As always -- "simple things should be easy,
difficult things should be possible"

108(2) This is a fair question, although I still believe we should give the
custom assignor full control over the scheduled followup rebalance
timeline. Again, because more advanced things should be possible -- and
it's not like having this API return an Instant makes things more
complicated for people who want to do simple things, users are free to
ignore this completely and returning an Instant doesn't feel more difficult
than returning an enum. So why restrict this?

To take a more specific example: let's say users have a complicated set of
metrics they use to determine task placement. Sometimes these metrics are
unavailable, in which case they want to schedule a followup rebalance but
may want to implement a backoff/retry rather than simply scheduling an
immediate followup. I know for a fact that immediate followup rebalances
triggered by Kafka Streams can actually cause issues in some cases (see
KAFKA-14382  and
KAFKA-14419  -- same
root cause but note that the second issue is still unresolved to this day,
and I know someone besides the issue reporter who has repeatedly been
affected by it). Giving users the ability to back off and schedule smarter
"immediate" followups if/when they run into issues seems like a sufficient
motivation to me. For another: perhaps these complex metrics are advanced
enough to be able to predict when a given task will be "warmed up" (to take
the HAAssignor example) or otherwise be able to compute an exact time at
which to cut over to a new task assignment. In this case it would be
necessary to have full flexibility over when the followup was triggered.

115. I see what you mean here. I was originally thinking that way, but was
worried that users might "accidentally" catch whatever exception we throw
if the task lag computation fails and not know how to handle it. But I
suppose we can just say in the javadocs that you 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-25 Thread Sophie Blee-Goldman
104. Fair enough -- also happy to defer to Rohan on this (or Bruno if he
feels super strongly)

107. That's a good point . Ultimately the task load should reflect the
processing capacity, and that's something that will exist in both the new
and old threading model. I like #processingCapacity for the name. And for
the javadocs, I think we can just say something generic enough that it will
be accurate in both the old and new model and won't need to be updated
(since, as you mentioned before, we always forget to update random
javadocs).

As for the NOTE, however, I do feel that is necessary. At the very least,
it's necessary if we rename the method to something with the word
"capacity", since that tends to imply a "maximum limit" rather than a
"minimum load". In fact I think the original method was named capacity and
that's why I added this NOTE to the javadoc in the first place, to avoid
confusion. I can see why it doesn't make sense if the name doesn't have the
word "capacity" in it though.

But I do think #processingCapacity feels appropriate. I guess a reasonable
alternative would just be #numProcessingThreads, which is both descriptive
enough to not need the NOTE in the javadocs and also accurately describes
both the old one-consumer-per-StreamThread model and the new one (it's just
a matter of what "processing thread" means will change). So how about we
just call it #numProcessingThreads and in the javadocs we can say it's like
a capacity that should correspond to the relative "weight" of assigned
tasks relative to other KafkaStreams clients? WDYT?

108. Yes, I 100% believe that a custom assignor should be able to schedule
a followup rebalance:

108(1) For one thing, imo the HATaskAssignor shouldn't be a "special case"
but should effectively look just like any other custom assignor. And it
definitely needs the ability to schedule followups. How would the
StreamsPartitionAssignor figure out whether to schedule a followup
rebalance for the HAAssignor? From the StreamsPArtitionAssignor POV it just
sees "active" and "standby" tasks -- It doesn't know whether some of those
standby tasks are actually "warmup tasks". And anyways, I think users
should be able to request a followup rebalance. What is the concern over
giving custom assignors control over this? Imo if people abuse this and
shoot themselves in the foot by scheduling followup rebalances for no
reason, that's on them. As always -- "simple things should be easy,
difficult things should be possible"

108(2) This is a fair question, although I still believe we should give the
custom assignor full control over the scheduled followup rebalance
timeline. Again, because more advanced things should be possible -- and
it's not like having this API return an Instant makes things more
complicated for people who want to do simple things, users are free to
ignore this completely and returning an Instant doesn't feel more difficult
than returning an enum. So why restrict this?

To take a more specific example: let's say users have a complicated set of
metrics they use to determine task placement. Sometimes these metrics are
unavailable, in which case they want to schedule a followup rebalance but
may want to implement a backoff/retry rather than simply scheduling an
immediate followup. I know for a fact that immediate followup rebalances
triggered by Kafka Streams can actually cause issues in some cases (see
KAFKA-14382  and
KAFKA-14419  -- same
root cause but note that the second issue is still unresolved to this day,
and I know someone besides the issue reporter who has repeatedly been
affected by it). Giving users the ability to back off and schedule smarter
"immediate" followups if/when they run into issues seems like a sufficient
motivation to me. For another: perhaps these complex metrics are advanced
enough to be able to predict when a given task will be "warmed up" (to take
the HAAssignor example) or otherwise be able to compute an exact time at
which to cut over to a new task assignment. In this case it would be
necessary to have full flexibility over when the followup was triggered.

115. I see what you mean here. I was originally thinking that way, but was
worried that users might "accidentally" catch whatever exception we throw
if the task lag computation fails and not know how to handle it. But I
suppose we can just say in the javadocs that you can/should not catch it
and/or rethrow to allow Streams to recover and re-attempt. I agree we
should have Streams just handle this transparently for users and not
require them to rebuild the assignment on their own. I'll add this to the
javadocs -- and I don't think we need to introduce a new exception type
even. We have the "TaskAssignmentException" already which behaves similarly
now -- ie if there's an error during assignment, we throw this and return
the same assignment back and schedule a followup.

So 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-24 Thread Matthias J. Sax
104: I also don't feel super strong about it. Not sure if 
`onAssignment()` might overload the name in a confusing way? In the end, 
when the method is called, we don't assign anything? -- Guess, I am fine 
with whatever Rohan picks as a name from the suggestions we have so far.



107: Did not think about how to do it yet. Just raised the question to 
see if I am even heading into the right direction or not... I did not 
propose to remove the method; it's clear that we need it.


Thinking about it a little more, what we actually want to convey is a 
certain "processing capacity" an instance has? Thus, 
`numConsumerClients()` might not reflect this in the future? Should we 
just generically call it `processingCapacity()` or similar and for now 
explain in the JavaDocs that it maps to number of (currently running) 
`StreamsThread` (currently running, because users can dynamically 
add/remove them...). We can later update the JavaDocs when we have 
"processing threads" and point to number of processing threads? Maybe 
Lucas/Bruno can provide more input on what/how we plan the future 
threading model and configs.


Nit: not sure if we need the "NOTE" section at all? If we think we want 
it, maybe remove from the KIP and we can discuss in more detail on the 
PR (think it could be improved). Don't think the JavaDocs on he KIP but 
be 100% accurate to what we put into the code later.



108: I guess my question is two-fold. (1) Does user-code need to decide 
to schedule a probing rebalance to begin with? Or could the 
non-customizable part of `StreamsPartitionAssignor` decide it? (2) If 
custom code really need to make this decision, why would it not just 
return a boolean? It seems unnecessary to compute a deadline, given that 
the probing rebalance interval is a config? -- Or maybe I am missing 
something? If it's about regular probing rebalance vs immediate 
rebalance vs no follow up, maybe an enum would do the trick?



115: Thanks for the explanation. This does make sense. I am wondering if 
we need the new utility method though? Would it not be possible to 
encapsulate all this inside the non-customizable code? The method 
`kafkaStreamsStates(boolean computeTaskLags)` will be provided by us and 
called by the user code. Thus, if we cannot compute the lag, we could 
still throw an exception -- the user code does not need to know anything 
about it, and is not supposed to catch this exception. Hence, it should 
bubble up and get back to our code from 
`TaskAssingor#assign(ApplicationState applicationState)` which is called 
by us, and we can catch our own exception here, and do what we do 
currently: we return the old assignment, and request an immediate follow 
up rebalance? For this case, the user code does not need to know 
anything about it, and does not need to do anything special, and it 
would become a provided built-in feature what seems desirable?



117: not sure myself... Let's see what others think. I'll think about it 
a little bit more and follow up again later. Its a tricky one.



-Matthias


On 4/24/24 5:08 PM, Sophie Blee-Goldman wrote:

Now to respond to Matthias:

FYI, I'm following the numbering scheme from your email but added  to
mark responses with further questions or feedback and/or aren't yet
addressed in the KIP and need to be followed up on. You can more or less
just skip over the ones without stars to save time

100: I think this is leftover from a previous approach we considered.
Removed this line

101: I agree. Perhaps we hadn't fully committed to this decision when the
KIP was first written ;P Added a "Consumer Assignments" section under
"Public Changes" to address this more carefully and explicitly

102: fixed

103: fixed

104: I do agree with Bruno on the structure of this callback name, ie
that it should start with "on", but any of the suggestions with that sound
good to me. I really don't feel too strongly but just to throw in a bit of
extra context, there is an analogous callback on the
ConsumerPartitionAssignor that is called #onAssignment. So personally I
would slightly prefer to just call it #onAssignment. However I'm happy to
go with whatever the consensus is -- @Bruno/@Matthias WDYT?

105: done (fyi I did leave the config doc string which is technically a
private variable but is part of the public contract)

106: good point about the numKafkaStreamsClients and toString methods --
removed those

107: I guess this reflects how long it's been since the KIP was first
written :P But that's a fair point -- and yes, we should write this in a
forward looking manner. However, we can't build it in a way that's so
forward-looking that it doesn't work for the current version. Are you
proposing to remove this API altogether, or just rename it to
#numConsumerClients (or something like that) and update the javadocs
accordingly?  Assuming the latter, I totally agree, and have made the
change. But we definitely can't just remove it altogether (it may even be
relevant in 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-24 Thread Sophie Blee-Goldman
Now to respond to Matthias:

FYI, I'm following the numbering scheme from your email but added  to
mark responses with further questions or feedback and/or aren't yet
addressed in the KIP and need to be followed up on. You can more or less
just skip over the ones without stars to save time

100: I think this is leftover from a previous approach we considered.
Removed this line

101: I agree. Perhaps we hadn't fully committed to this decision when the
KIP was first written ;P Added a "Consumer Assignments" section under
"Public Changes" to address this more carefully and explicitly

102: fixed

103: fixed

104: I do agree with Bruno on the structure of this callback name, ie
that it should start with "on", but any of the suggestions with that sound
good to me. I really don't feel too strongly but just to throw in a bit of
extra context, there is an analogous callback on the
ConsumerPartitionAssignor that is called #onAssignment. So personally I
would slightly prefer to just call it #onAssignment. However I'm happy to
go with whatever the consensus is -- @Bruno/@Matthias WDYT?

105: done (fyi I did leave the config doc string which is technically a
private variable but is part of the public contract)

106: good point about the numKafkaStreamsClients and toString methods --
removed those

107: I guess this reflects how long it's been since the KIP was first
written :P But that's a fair point -- and yes, we should write this in a
forward looking manner. However, we can't build it in a way that's so
forward-looking that it doesn't work for the current version. Are you
proposing to remove this API altogether, or just rename it to
#numConsumerClients (or something like that) and update the javadocs
accordingly?  Assuming the latter, I totally agree, and have made the
change. But we definitely can't just remove it altogether (it may even be
relevant in the new threading model if we eventually allow configuring the
number of consumer clients independently of the processing threads -- but
that's a different conversation. The important thing is that this KIP be
compatible with the old/current threading model, in which case we need this
API).
Anyways, please take a look at the new javadocs and method name and lmk if
that makes sense to you

108: The #followupRebalanceDeadline allows the custom assignor to
request followup rebalances, for example in order to probe for restoration
progress or other conditions for task assignment. This is fundamental to
the HighAvailabilityTaskAssignor (ie the default assignor) and may be
useful to custom assignors with similar such approaches. So it's definitely
necessary -- are you asking why we have it at all, or why it's an API on
the KafkaStreamsAssignment class and not, say, the TaskAssignment class?

109: Makes sense to me -- done

110: ack -- updated all mentions of "node" to "KafkaStreams client"

111: ack -- updated name from "consumer" to "consumerClientId"

112: that's fair -- I do think it's valuable to have "#allTasks" since some
assignors may not care about the stateful vs stateless distinction, but
it's weird to have #statefulTasks without #statelessTasks. Let's just have
all three. Added to the KIP

113: this makes sense to me. Updated computeTaskLags  to be an input
parameter instead of a mutating API. Also noted that it can throw a
TimeoutException in this case (this is relevant for point 115 below)

114: fixed

115: Reasonable question -- I think the way it's described right now is
a bit awkward, and there's a better way to approach the issue. Ultimately
the "issue" is how we can handle failures in things like the task lag
computation, which notably makes a remote call to the brokers and can/has
been known to fail at times. Right now if this API fails, the
StreamsPartitionAssignor will just return the same assignment as the
previous one and trigger an immediate followup rebalance. This exception
was meant to be a "utility" that can be thrown to indicate to the
StreamsPartitionAssignor to just return the old assignment and trigger an
immediate followup.
That said, this exception does feel like an awkward way to do it,
especially since the TaskAssignor can already do all of this via native
APIs: it can request a followup rebalance (and better yet, determine for
itself what a reasonable retry/backoff interval would be). It can also just
return the same assignment -- the only issue is that implementing this is
kind of annoying. So I would propose that instead of throwing a
RetryableException, we should just add an additional utility method to the
TaskAssignmentUtils that does this "fallback" assignment and returns the
same tasks to their previous clients.
Added TaskAssignmentUtils#identityAssignment for this (though I'm happy to
take other name suggestions)

That's a long paragraph, sorry -- but hopefully it makes sense?

116: fair point. Updated the wording -- hopefully it makes more sense now.

117: This is an interesting question. I think it's 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-24 Thread Sophie Blee-Goldman
Responding to Bruno first:

(1) I actually think "KafkaStreams" is exactly right here -- for the reason
you said, ultimately this is describing a literal instance of the
"KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went with
StreamsClient but i also prefer KafkaStreams)

(4) Rohan is  right about what I was saying -- but I'm now realizing that I
completely misinterpreted what your concern was. Sorry for the long-winded
and ultimately irrelevant answer. I'm completely fine with having the
return type be a simple Set with additional info such as TaskId in the
AssignedTask class (and I see Rohan already made this change so we're all
good)

(5) I don't insist either way :)   ApplicationState works for me

On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax  wrote:

> One more thing. It might be good to clearly call out, which interfaced a
> user would implement, vs the other ones Kafka Streams implements and
> TaskAssignor only uses.
>
> My understanding is, that users would implement `TaskAssignor`,
> `TaskAssignment`, and `StreamsClientAssignment`.
>
> For `AssignedTask` it seems that users would actually only need to
> instantiate them. Should we add a public constructor?
>
> Also wondering if we should add an empty default implementation for
> `onAssignmentComputed()` as it seems not to be strictly necessary to use
> this method?
>
>
> -Matthias
>
> On 4/19/24 7:30 PM, Matthias J. Sax wrote:
> > Great KIP. I have some minor comments/questions:
> >
> >
> > 100 The KIP says: "In the future, additional plugins can use the same
> > partition.assignor  prefix". What does this mean?
> >
> >
> > 101 (nit) The KIP says: "Note that the thread-level assignment will
> > remain an un-configurable internal implementation detail of the
> > partition assignor (see "Rejected Alternatives" for further thoughts and
> > reasoning)." -- When I was reading this the first time, I did not
> > understand it, and it did only become clear later (eg while reading the
> > discussion thread). I think it would be good to be a little bit more
> > explicit, because this is not just some minor thing, but a core design
> > decision (which I, btw, support).
> >
> >
> > 102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).
> >
> >
> > 103 (nit): "new non-internal package" -> replace 'non-internal' with
> > 'public' :)
> >
> >
> > 104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems
> > to be a little bit clumsy? I kinda like the original `finalAssignment()`
> > -- I would also be happy with `onFinalAssignment` to address Bruno's
> > line of thinking (which I think is a good call out). (Btw:
> > `finalAssignment` is still used in the text on the KIP and should also
> > be updated.)
> >
> >
> > 105: Please remove all `private` variables. We should only show public
> > stuff on the KIP. Everything else is an implementation detail.
> >
> >
> > 106: `TaskAssignment#numStreamsClients()` -- why do we need this method?
> > Seems calling `assignment()` gives as a collection and we can just call
> > size() on it to get the same value? -- Also, why do we explicitly call
> > out the overwrite of `toString()`; seems unnecessary?
> >
> >
> > 107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
> > number of StreamThreads on this client, which is equal to the number of
> > main consumers and represents its overall capacity." -- Given our
> > planned thread refactoring, this might not hold correct for long (and I
> > am sure we will forget to updated the JavaDocs later). Talking to Lucas
> > the plan is to cut down `StreamsThread` to host the consumer (and there
> > will be only one, and it won't be configurable any longer), and we would
> > introduce a number of configurable "processing threads". Can/should we
> > build this API in a forward looking manner?
> >
> >
> > 108: Why do we need
> > `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
> > this would be useful?
> >
> >
> > 109 `StreamsClientState#consumers`: should we rename this to
> > `#consumerClientIds()`?
> >
> >
> > 110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc
> > says 'owned by consumers on this node' -- Should we just say `owned by
> > the Streams client`?
> >
> >
> > 111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer`
> > parameter -- not clear what this is -- I guess it's a consumer's
> > client.id? If yes, should we rename the parameter `consumerClientId`?
> >
> >
> > 112 `ApplicationState`: what is the reason to have `allTasks()` and
> > `stafefulTasks() -- why not have `statelessTasks()` and
> > `statefulTasks()` instead? Or all three?
> >
> >
> > 113 `ApplicationState#computeTaskLags()`: I understand the indent/reason
> > why we have this one, but it seems to be somewhat difficult to use
> > correctly, as it triggers an internal side-effect... Would it be
> > possible to replace this method in favor of passing in a `boolean
> > computeTaskLag` 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax
One more thing. It might be good to clearly call out, which interfaced a 
user would implement, vs the other ones Kafka Streams implements and 
TaskAssignor only uses.


My understanding is, that users would implement `TaskAssignor`, 
`TaskAssignment`, and `StreamsClientAssignment`.


For `AssignedTask` it seems that users would actually only need to 
instantiate them. Should we add a public constructor?


Also wondering if we should add an empty default implementation for 
`onAssignmentComputed()` as it seems not to be strictly necessary to use 
this method?



-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai  wrote:

> Thanks for the feedback Bruno! For the most part I think it makes sense,
> but leaving a couple follow-up thoughts/questions:
>
> re 4: I think Sophie's point was slightly different - that we might want
> to wrap the return type for `assign` in a class so that its easily
> extensible. This makes sense to me. Whether we do that or not, we can have
> the return type be a Set instead of a Map as well.
>
> re 6: Yes, it's a callback that's called with the final assignment. I like
> your suggested name.
>
> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback Sophie!
>>
>> re1: Totally agree. The fact that it's related to the partition assignor
>> is clear from just `task.assignor`. I'll update.
>> re3: This is a good point, and something I would find useful personally.
>> I think its worth adding an interface that lets the plugin observe the
>> final assignment. I'll add that.
>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>>
>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the feedback so far! I think pretty much all of it is
>>> reasonable. I'll reply to it inline:
>>>
>>> > 1. All the API logic is granular at the Task level, except the
>>> previousOwnerForPartition func. I’m not clear what’s the motivation
>>> behind it, does our controller also want to change how the
>>> partitions->tasks mapping is formed?
>>> You're right that this is out of place. I've removed this method as it's
>>> not needed by the task assignor.
>>>
>>> > 2. Just on the API layering itself: it feels a bit weird to have the
>>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>>> the ApplicationMetadata class. If we consider them as some default util
>>> functions, how about introducing moving those into their own static util
>>> methods to separate from the ApplicationMetadata “fact objects” ?
>>> Agreed. Updated in the latest revision of the kip. These have been moved
>>> to TaskAssignorUtils
>>>
>>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>>> containing the decisions made by the assignor, including the
>>> requestFollowupRebalance flag. For manipulating the half-baked results
>>> inside the assignor itself, maybe we can just be flexible to let users use
>>> whatever struts / their own classes even, if they like. WDYT?
>>> Agreed. Updated in the latest version of the kip.
>>>
>>> > 1. For the API, thoughts on changing the method signature to return a
>>> (non-Optional) TaskAssignor? Then we can either have the default
>>> implementation return new HighAvailabilityTaskAssignor or just have a
>>> default implementation class that people can extend if they don't want to
>>> implement every method.
>>> Based on some other discussion, I actually decided to get rid of the
>>> plugin interface, and instead use config to specify individual plugin
>>> behaviour. So the method you're referring to is no longer part of the
>>> proposal.
>>>
>>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only
>>> but
>>> theres methods that return void on it? It's not totally clear to me how
>>> that interface is supposed to be used by the assignor. It'd be nice if we
>>> could flip that interface such that it becomes part of the output instead
>>> of an input to the plugin.
>>> I've moved those methods to a util class. They're really utility methods
>>> the assignor might want to call to do some default or optimized assignment
>>> for some cases like rack-awareness.
>>>
>>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>>> control
>>> the interface (there are a few places where UUID is directly used).
>>> I like it. Updated the proposal.
>>>
>>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>>> was
>>> for the plugin to make the assignment? Is that the result of the default
>>> logic?
>>> It doesn't need to be part of the interface. I've removed it.
>>>
>>> > re 2/6:
>>>
>>> I generally agree with these points, but I'd rather hash that out in a
>>> PR than in the KIP review, as it'll be clearer what gets used how. It seems
>>> to me (committers please correct me if I'm wrong) that as long as we're on
>>> the same page about what information the interfaces are returning, that's
>>> ok at this level of discussion.
>>>
>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>>> wrote:
>>>
 Hello All,

 I'd like to start a discussion on KIP-924 (
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
 which proposes an interface to allow users to plug into the streams
 partition assignor. The motivation section in the KIP goes into some more
 detail on why we think this is a useful addition. Thanks in advance for
 your feedback!

 Best Regards,

 Rohan

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Thanks for the feedback Bruno! For the most part I think it makes sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we might want to
wrap the return type for `assign` in a class so that its easily extensible.
This makes sense to me. Whether we do that or not, we can have the return
type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I like
your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai  wrote:

> Thanks for the feedback Sophie!
>
> re1: Totally agree. The fact that it's related to the partition assignor
> is clear from just `task.assignor`. I'll update.
> re3: This is a good point, and something I would find useful personally. I
> think its worth adding an interface that lets the plugin observe the final
> assignment. I'll add that.
> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>
> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback so far! I think pretty much all of it is
>> reasonable. I'll reply to it inline:
>>
>> > 1. All the API logic is granular at the Task level, except the
>> previousOwnerForPartition func. I’m not clear what’s the motivation
>> behind it, does our controller also want to change how the
>> partitions->tasks mapping is formed?
>> You're right that this is out of place. I've removed this method as it's
>> not needed by the task assignor.
>>
>> > 2. Just on the API layering itself: it feels a bit weird to have the
>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>> the ApplicationMetadata class. If we consider them as some default util
>> functions, how about introducing moving those into their own static util
>> methods to separate from the ApplicationMetadata “fact objects” ?
>> Agreed. Updated in the latest revision of the kip. These have been moved
>> to TaskAssignorUtils
>>
>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>> containing the decisions made by the assignor, including the
>> requestFollowupRebalance flag. For manipulating the half-baked results
>> inside the assignor itself, maybe we can just be flexible to let users use
>> whatever struts / their own classes even, if they like. WDYT?
>> Agreed. Updated in the latest version of the kip.
>>
>> > 1. For the API, thoughts on changing the method signature to return a
>> (non-Optional) TaskAssignor? Then we can either have the default
>> implementation return new HighAvailabilityTaskAssignor or just have a
>> default implementation class that people can extend if they don't want to
>> implement every method.
>> Based on some other discussion, I actually decided to get rid of the
>> plugin interface, and instead use config to specify individual plugin
>> behaviour. So the method you're referring to is no longer part of the
>> proposal.
>>
>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
>> theres methods that return void on it? It's not totally clear to me how
>> that interface is supposed to be used by the assignor. It'd be nice if we
>> could flip that interface such that it becomes part of the output instead
>> of an input to the plugin.
>> I've moved those methods to a util class. They're really utility methods
>> the assignor might want to call to do some default or optimized assignment
>> for some cases like rack-awareness.
>>
>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>> control
>> the interface (there are a few places where UUID is directly used).
>> I like it. Updated the proposal.
>>
>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>> was
>> for the plugin to make the assignment? Is that the result of the default
>> logic?
>> It doesn't need to be part of the interface. I've removed it.
>>
>> > re 2/6:
>>
>> I generally agree with these points, but I'd rather hash that out in a PR
>> than in the KIP review, as it'll be clearer what gets used how. It seems to
>> me (committers please correct me if I'm wrong) that as long as we're on the
>> same page about what information the interfaces are returning, that's ok at
>> this level of discussion.
>>
>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>> wrote:
>>
>>> Hello All,
>>>
>>> I'd like to start a discussion on KIP-924 (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>>> which proposes an interface to allow users to plug into the streams
>>> partition assignor. The motivation section in the KIP goes into some more
>>> detail on why we think this is a useful addition. Thanks in advance for
>>> your feedback!
>>>
>>> Best Regards,
>>>
>>> Rohan
>>>
>>>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-18 Thread Bruno Cadonna

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add additional metadata to the 
class AssignedTask?
The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) ) could 
be something like


public interface NodeAssignment {
ProcessID processId();

Instant followupRebalanceDeadline();

Set assignment();

enum AssignedTaskType {
STATELESS,
STATEFUL,
STANDBY
}

static class AssignedTask {
AssignedTaskType type();
TaskId id();

... other metadata needed in future
}
}
If we need to retrieve assigned task by task ID, maybe it is better to 
add methods like assignedFor(TaskId) and not to expose the Map.


(5)
I am in favor of ApplicationState but I am also fine ApplicationMetadata 
if you insist.


(6)
Is

void finalAssignment(GroupAssignment assignment, GroupSubscription 
subscription);


kind of a callback? If yes, would it make sense to call it 
onAssignmentComputed()?



(7)
What do you think of changing the TaskAssignmentUtils signatures to

public static TaskAssignment default*Assignment(final ApplicationState 
applicationState, final TaskAssignment taskAssignment, ...) {...}


to avoid to mutate the assignment in place?


Best,
Bruno

On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:

Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating
between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case,
but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-17 Thread Sophie Blee-Goldman
Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating
between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case,
but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. Personally I'm fine either way (I do think ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna  wrote:

> Hi,
>
> sorry, I am late to the party.
>
> I have a couple of comments:
>
> (1)
> I would prefer Client* instead of Node* in the names. In Kafka Streams
> we do not really have the concept of node but we have the concept of
> client (admittedly, we sometimes also use instance). I would like to
> avoid introducing a new term to basically describe the Streams client.
> I know that we already have a ClientState but that would be in a
> different package.
>
> (2)
> Did you consider to use Instant instead of long as return type of
> followupRebalanceDeadline()? Instant is a bit more flexible and readable
> as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in
> interface NodeAssignment.
>
> (3)
> Did you consider to use an enum instead of class AssignedTask? As far as
> I understand not all combinations are possible. A stateless standby task
> does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
> be clearer. Or even better instead of two methods in AssignedTask that
> return a boolean you could have one method -- say type() -- that returns
> the enum.
>
> (4)
> Does the return type of assignment need to be a map from task ID to
> AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks
> with AssignedTask containing the task ID?
>
> (5)
> I there a semantic difference between *State and *Metadata? I was
> wondering whether ApplicationMetadata could also be ApplicationState for
> the sake of consistency.
>
> Best,
> Bruno
>
>
> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
> > Cool, looks good to me!
> >
> > Seems like there is no further feedback, so maybe we can start to call
> for
> > a vote?
> >
> > However, since as noted we are setting aside time 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-17 Thread Bruno Cadonna

Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams 
we do not really have the concept of node but we have the concept of 
client (admittedly, we sometimes also use instance). I would like to 
avoid introducing a new term to basically describe the Streams client.
I know that we already have a ClientState but that would be in a 
different package.


(2)
Did you consider to use Instant instead of long as return type of 
followupRebalanceDeadline()? Instant is a bit more flexible and readable 
as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in 
interface NodeAssignment.


(3)
Did you consider to use an enum instead of class AssignedTask? As far as 
I understand not all combinations are possible. A stateless standby task 
does not exist. An enum with values STATELESS, STATEFUL, STANDBY would 
be clearer. Or even better instead of two methods in AssignedTask that 
return a boolean you could have one method -- say type() -- that returns 
the enum.


(4)
Does the return type of assignment need to be a map from task ID to 
AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks 
with AssignedTask containing the task ID?


(5)
I there a semantic difference between *State and *Metadata? I was 
wondering whether ApplicationMetadata could also be ApplicationState for 
the sake of consistency.


Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:

Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai  wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation

behind

it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in

the

ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved
to TaskAssignorUtils


3. I personally prefer `NodeAssignment` to be a read-only object

containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users

use

whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.


1. For the API, thoughts on changing the method signature to return a

(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin
behaviour. So the method you're referring to is no longer part of the
proposal.


3. Speaking of ApplicationMetadata, the javadoc says it's read only but

theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized

assignment

for some cases like rack-awareness.


4. We should consider wrapping UUID in a ProcessID class so that we

control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.


5. What does NodeState#newAssignmentForNode() do? I thought the point

was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-05 Thread Sophie Blee-Goldman
Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai  wrote:

> Thanks for the feedback Sophie!
>
> re1: Totally agree. The fact that it's related to the partition assignor is
> clear from just `task.assignor`. I'll update.
> re3: This is a good point, and something I would find useful personally. I
> think its worth adding an interface that lets the plugin observe the final
> assignment. I'll add that.
> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>
> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> wrote:
>
> > Thanks for the feedback so far! I think pretty much all of it is
> > reasonable. I'll reply to it inline:
> >
> > > 1. All the API logic is granular at the Task level, except the
> > previousOwnerForPartition func. I’m not clear what’s the motivation
> behind
> > it, does our controller also want to change how the partitions->tasks
> > mapping is formed?
> > You're right that this is out of place. I've removed this method as it's
> > not needed by the task assignor.
> >
> > > 2. Just on the API layering itself: it feels a bit weird to have the
> > three built-in functions (defaultStandbyTaskAssignment etc) sitting in
> the
> > ApplicationMetadata class. If we consider them as some default util
> > functions, how about introducing moving those into their own static util
> > methods to separate from the ApplicationMetadata “fact objects” ?
> > Agreed. Updated in the latest revision of the kip. These have been moved
> > to TaskAssignorUtils
> >
> > > 3. I personally prefer `NodeAssignment` to be a read-only object
> > containing the decisions made by the assignor, including the
> > requestFollowupRebalance flag. For manipulating the half-baked results
> > inside the assignor itself, maybe we can just be flexible to let users
> use
> > whatever struts / their own classes even, if they like. WDYT?
> > Agreed. Updated in the latest version of the kip.
> >
> > > 1. For the API, thoughts on changing the method signature to return a
> > (non-Optional) TaskAssignor? Then we can either have the default
> > implementation return new HighAvailabilityTaskAssignor or just have a
> > default implementation class that people can extend if they don't want to
> > implement every method.
> > Based on some other discussion, I actually decided to get rid of the
> > plugin interface, and instead use config to specify individual plugin
> > behaviour. So the method you're referring to is no longer part of the
> > proposal.
> >
> > > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> > theres methods that return void on it? It's not totally clear to me how
> > that interface is supposed to be used by the assignor. It'd be nice if we
> > could flip that interface such that it becomes part of the output instead
> > of an input to the plugin.
> > I've moved those methods to a util class. They're really utility methods
> > the assignor might want to call to do some default or optimized
> assignment
> > for some cases like rack-awareness.
> >
> > > 4. We should consider wrapping UUID in a ProcessID class so that we
> > control
> > the interface (there are a few places where UUID is directly used).
> > I like it. Updated the proposal.
> >
> > > 5. What does NodeState#newAssignmentForNode() do? I thought the point
> > was
> > for the plugin to make the assignment? Is that the result of the default
> > logic?
> > It doesn't need to be part of the interface. I've removed it.
> >
> > > re 2/6:
> >
> > I generally agree with these points, but I'd rather hash that out in a PR
> > than in the KIP review, as it'll be clearer what gets used how. It seems
> to
> > me (committers please correct me if I'm wrong) that as long as we're on
> the
> > same page about what information the interfaces are returning, that's ok
> at
> > this level of discussion.
> >
> > On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
> > wrote:
> >
> >> Hello All,
> >>
> >> I'd like to start a discussion on KIP-924 (
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> )
> >> which proposes an interface to allow users to plug into the streams
> >> partition assignor. The motivation section in the KIP goes into some
> more
> >> detail on why we think this is a useful addition. Thanks in advance for
> >> your feedback!
> >>
> >> Best Regards,
> >>
> >> Rohan
> >>
> >>
>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-05 Thread Rohan Desai
Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai  wrote:

> Thanks for the feedback so far! I think pretty much all of it is
> reasonable. I'll reply to it inline:
>
> > 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation behind
> it, does our controller also want to change how the partitions->tasks
> mapping is formed?
> You're right that this is out of place. I've removed this method as it's
> not needed by the task assignor.
>
> > 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
> ApplicationMetadata class. If we consider them as some default util
> functions, how about introducing moving those into their own static util
> methods to separate from the ApplicationMetadata “fact objects” ?
> Agreed. Updated in the latest revision of the kip. These have been moved
> to TaskAssignorUtils
>
> > 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users use
> whatever struts / their own classes even, if they like. WDYT?
> Agreed. Updated in the latest version of the kip.
>
> > 1. For the API, thoughts on changing the method signature to return a
> (non-Optional) TaskAssignor? Then we can either have the default
> implementation return new HighAvailabilityTaskAssignor or just have a
> default implementation class that people can extend if they don't want to
> implement every method.
> Based on some other discussion, I actually decided to get rid of the
> plugin interface, and instead use config to specify individual plugin
> behaviour. So the method you're referring to is no longer part of the
> proposal.
>
> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> theres methods that return void on it? It's not totally clear to me how
> that interface is supposed to be used by the assignor. It'd be nice if we
> could flip that interface such that it becomes part of the output instead
> of an input to the plugin.
> I've moved those methods to a util class. They're really utility methods
> the assignor might want to call to do some default or optimized assignment
> for some cases like rack-awareness.
>
> > 4. We should consider wrapping UUID in a ProcessID class so that we
> control
> the interface (there are a few places where UUID is directly used).
> I like it. Updated the proposal.
>
> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
> was
> for the plugin to make the assignment? Is that the result of the default
> logic?
> It doesn't need to be part of the interface. I've removed it.
>
> > re 2/6:
>
> I generally agree with these points, but I'd rather hash that out in a PR
> than in the KIP review, as it'll be clearer what gets used how. It seems to
> me (committers please correct me if I'm wrong) that as long as we're on the
> same page about what information the interfaces are returning, that's ok at
> this level of discussion.
>
> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
> wrote:
>
>> Hello All,
>>
>> I'd like to start a discussion on KIP-924 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>> which proposes an interface to allow users to plug into the streams
>> partition assignor. The motivation section in the KIP goes into some more
>> detail on why we think this is a useful addition. Thanks in advance for
>> your feedback!
>>
>> Best Regards,
>>
>> Rohan
>>
>>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-04 Thread Sophie Blee-Goldman
Before I go into my own feedback, which I know not everyone has the time to
read, I just want to announce that we plan to go over this KIP during the
next Streams project sync on April 11. If you are interested in joining the
discussion and need an invitation, just reach out to me in a separate
thread. All are welcome.

Now on to the KIP itself...

Made a pass over the latest KIP and discussion, and had just a few (mostly
minor) points. I'll start with the trivial:

1. nit: the config name is a bit awkward with the double "assignor" in the
name, in my opinion we should either remove the "partition.assignor" part
of the name altogether

2. Just to clarify a point about the `#previousOwnerForPartition` method,
that was originally needed not because users might mix up the
task->partition mapping, but in order to ensure an assignment that
satisfies the cooperative rebalancing protocol of not assigning any
partition to a thread if it had a (different) previous owner this
generation. Since there's no thread-level assignment in this KIP, and
perhaps more importantly I believe the StreamsPartitionAssignor should
still be responsible for guaranteeing cooperative rebalancing semantics
rather than forcing this on the user's TaskAssignor, I completely agree
that we don't need the  `#previousOwnerForPartition` API any more

3. On that note, while I agree that we should leave the thread-level
assignment within a node up to the StreamsPartitionAssignor, and leave it
out of this KIP, one thing that might be valuable is the ability to read
the final thread-level assignment, for example so users could implement
metrics or other monitoring of assignment. Right now the only way to know
which partition/task is assigned to which thread is to look at the logs,
which is hardly optimal. Have you considered including some kind of API for
the StreamsPartitionAssignor to inform the TaskAssignor of the eventual
thread-level assignment that it decides on?
FWIW I am personally fine with leaving this out of the current KIP, as the
current proposal would make it easy to do this as a quick follow-up KIP. It
also would become less useful if/when we decouple the consumers from the
processing threads and do away with thread-level assignment altogether,
though it might be a while before we completely switch over, so it could
still be useful to know the thread-level assignment for a while beyond 3.8.
Just wondering if you've thought about it

4. Lastly, regarding Almog's point 2/6, I do think it would be reasonable
to consolidate the top-level APIs in NodeAssignment in favor of a single
API. For example, something like this:

public interface NodeAssignment {
  ProcessID processId();

  long followupRebalanceDeadline();

  Map assignment();

  static class AssignedTask {
boolean isActive;
boolean isStateful;
  }
}

This would make things easier on the users, as they could build up this map
one task at a time, while also helping to enforce the requirement that a
Node not contain the active and standby version of the same task. The
StreamsPartitionAssignor can then loop through the map and separate the
assigned tasks into the sub-maps that it uses to perform the thread-level
assignment from there. Assuming this is what you had in mind for #2 Almog,
I'm in favor

However, I'm less in favor of combining APIs in NodeState, ie point #6 in
your/Almog's reply. If the goal is to keep things simple for users and make
it so that someone who only wanted to modify the existing assignment logic,
rather than re-build it from the ground up, could do so without a huge
hassle, then keeping all of these individual APIs would be necessary.
There's also a performance element to this, as (IIRC) most of those "extra"
APIs such as sorted tasks by lag are simply exposing pre-computed metadata
that gets attached to each node while the StreamsPartitionAssignor is
processing the rebalance inputs. Moving those to a static utility class
would mean having to recompute everything, as they are getters rather than
utilities, and due to the assignment complexity we have to be performance
sensitive. It will probably make more sense when you see it implemented.



On Sat, Nov 18, 2023 at 4:56 PM Guozhang Wang 
wrote:

> Hi Rohan,
>
> I took another look at the updated wiki page and do not have any major
> questions. Regarding returning a plugin object v.s. configuring a
> plugin object, I do not have a strong opinion except that the latter
> seems more consistent with existing patterns. Just curious, any other
> motivations to go with the latter from you?
>
>
> Guozhang
>
> On Thu, Nov 9, 2023 at 11:19 PM Rohan Desai 
> wrote:
> >
> > Thanks for the feedback so far! I think pretty much all of it is
> > reasonable. I'll reply to it inline:
> >
> > > 1. All the API logic is granular at the Task level, except the
> > previousOwnerForPartition func. I’m not clear what’s the motivation
> behind
> > it, does our controller also want to change how the partitions->tasks
> > 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-18 Thread Guozhang Wang
Hi Rohan,

I took another look at the updated wiki page and do not have any major
questions. Regarding returning a plugin object v.s. configuring a
plugin object, I do not have a strong opinion except that the latter
seems more consistent with existing patterns. Just curious, any other
motivations to go with the latter from you?


Guozhang

On Thu, Nov 9, 2023 at 11:19 PM Rohan Desai  wrote:
>
> Thanks for the feedback so far! I think pretty much all of it is
> reasonable. I'll reply to it inline:
>
> > 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation behind
> it, does our controller also want to change how the partitions->tasks
> mapping is formed?
> You're right that this is out of place. I've removed this method as it's
> not needed by the task assignor.
>
> > 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
> ApplicationMetadata class. If we consider them as some default util
> functions, how about introducing moving those into their own static util
> methods to separate from the ApplicationMetadata “fact objects” ?
> Agreed. Updated in the latest revision of the kip. These have been moved to
> TaskAssignorUtils
>
> > 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users use
> whatever struts / their own classes even, if they like. WDYT?
> Agreed. Updated in the latest version of the kip.
>
> > 1. For the API, thoughts on changing the method signature to return a
> (non-Optional) TaskAssignor? Then we can either have the default
> implementation return new HighAvailabilityTaskAssignor or just have a
> default implementation class that people can extend if they don't want to
> implement every method.
> Based on some other discussion, I actually decided to get rid of the plugin
> interface, and instead use config to specify individual plugin behaviour.
> So the method you're referring to is no longer part of the proposal.
>
> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> theres methods that return void on it? It's not totally clear to me how
> that interface is supposed to be used by the assignor. It'd be nice if we
> could flip that interface such that it becomes part of the output instead
> of an input to the plugin.
> I've moved those methods to a util class. They're really utility methods
> the assignor might want to call to do some default or optimized assignment
> for some cases like rack-awareness.
>
> > 4. We should consider wrapping UUID in a ProcessID class so that we
> control
> the interface (there are a few places where UUID is directly used).
> I like it. Updated the proposal.
>
> > 5. What does NodeState#newAssignmentForNode() do? I thought the point was
> for the plugin to make the assignment? Is that the result of the default
> logic?
> It doesn't need to be part of the interface. I've removed it.
>
> > re 2/6:
>
> I generally agree with these points, but I'd rather hash that out in a PR
> than in the KIP review, as it'll be clearer what gets used how. It seems to
> me (committers please correct me if I'm wrong) that as long as we're on the
> same page about what information the interfaces are returning, that's ok at
> this level of discussion.
>
> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai  wrote:
>
> > Hello All,
> >
> > I'd like to start a discussion on KIP-924 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> > which proposes an interface to allow users to plug into the streams
> > partition assignor. The motivation section in the KIP goes into some more
> > detail on why we think this is a useful addition. Thanks in advance for
> > your feedback!
> >
> > Best Regards,
> >
> > Rohan
> >
> >


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-09 Thread Rohan Desai
Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:

> 1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation behind
it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.

> 2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved to
TaskAssignorUtils

> 3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users use
whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.

> 1. For the API, thoughts on changing the method signature to return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the plugin
interface, and instead use config to specify individual plugin behaviour.
So the method you're referring to is no longer part of the proposal.

> 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized assignment
for some cases like rack-awareness.

> 4. We should consider wrapping UUID in a ProcessID class so that we
control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.

> 5. What does NodeState#newAssignmentForNode() do? I thought the point was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed it.

> re 2/6:

I generally agree with these points, but I'd rather hash that out in a PR
than in the KIP review, as it'll be clearer what gets used how. It seems to
me (committers please correct me if I'm wrong) that as long as we're on the
same page about what information the interfaces are returning, that's ok at
this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai  wrote:

> Hello All,
>
> I'd like to start a discussion on KIP-924 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> which proposes an interface to allow users to plug into the streams
> partition assignor. The motivation section in the KIP goes into some more
> detail on why we think this is a useful addition. Thanks in advance for
> your feedback!
>
> Best Regards,
>
> Rohan
>
>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-09 Thread Almog Gavra
Hello Rohan,

Thanks for the KIP! Thoughts below (seems I have similar comments to
Guozhang, but I had already written this before reading his reply haha!).
They're basically all minor suggestions for improvements, I wouldn't
consider any of them blocking.

1. For the API, thoughts on changing the method signature to return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.

2. Generally the KIP could benefit from reducing the method overloads. For
example, we should consider generifying the NodeAssignment class so that
all of the reader methods are consolidated in a single Set
assignment() - an AssignedTask can have details about whether it's
active/standby/stateful/stateless (this can be reused to reduce the surface
area of ApplicationMetadata allTasks()/statefulTasks()). We can also
probably collapse the three void return type methods in ApplicationMetadata
into a single method.

3. Speaking of ApplicationMetadata, the javadoc says it's read only but
theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.

4. We should consider wrapping UUID in a ProcessID class so that we control
the interface (there are a few places where UUID is directly used).

5. What does NodeState#newAssignmentForNode() do? I thought the point was
for the plugin to make the assignment? Is that the result of the default
logic?

6. I wonder if all the NodeState lag information can be wrapped in a single
class that dumps all the most granular information then lets users compute
rollups on whatever they want instead of precomputing things like "sorted
tasks by lag" or "total lag across all stores".

Looking forward to this one! It opens the door to a lot of great things.

Cheers,
Almog

On Thu, Nov 9, 2023 at 12:22 PM Guozhang Wang 
wrote:

> Hello Rohan,
>
> Thanks for the KIP! Overall it looks very nice. Just some quick thoughts :
>
> 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation
> behind it, does our controller also want to change how the
> partitions->tasks mapping is formed?
>
> 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
> the ApplicationMetadata class. If we consider them as some default
> util functions, how about introducing moving those into their own
> static util methods to separate from the  ApplicationMetadata “fact
> objects” ?
>
> 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users
> use whatever struts / their own classes even, if they like. WDYT?
>
> Thanks,
> Guozhang
>
> On Tue, Nov 7, 2023 at 12:04 PM Rohan Desai 
> wrote:
> >
> > Hello All,
> >
> > I'd like to start a discussion on KIP-924 (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> )
> > which proposes an interface to allow users to plug into the streams
> > partition assignor. The motivation section in the KIP goes into some more
> > detail on why we think this is a useful addition. Thanks in advance for
> > your feedback!
> >
> > Best Regards,
> >
> > Rohan
>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-09 Thread Guozhang Wang
Hello Rohan,

Thanks for the KIP! Overall it looks very nice. Just some quick thoughts :

1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?

2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the ApplicationMetadata class. If we consider them as some default
util functions, how about introducing moving those into their own
static util methods to separate from the  ApplicationMetadata “fact
objects” ?

3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users
use whatever struts / their own classes even, if they like. WDYT?

Thanks,
Guozhang

On Tue, Nov 7, 2023 at 12:04 PM Rohan Desai  wrote:
>
> Hello All,
>
> I'd like to start a discussion on KIP-924 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> which proposes an interface to allow users to plug into the streams
> partition assignor. The motivation section in the KIP goes into some more
> detail on why we think this is a useful addition. Thanks in advance for
> your feedback!
>
> Best Regards,
>
> Rohan