Hello David,

On Fri, Sep 23, 2022 at 2:00 AM David Jacot <dja...@confluent.io.invalid>
wrote:

> Hey,
>
> > Just to clarify I was asking about the `version` of the assignor (i.e. up
> to what version that the client would support), and I do agree we would not
> need metadata. What I have in mind is that, for some specific built-in
> broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> newer version we would have a hierarchical rack ID string format, like
> "tier1-tier2" etc, but if some client has not upgraded their rack ID
> would still be in old format. In this case, the broker then needs to choose
> the old versioned assignor. I'm probably making something up here for rack
> aware assignors, but I'm wondering if in general such an "auto-downgrade"
> behavior would be needed still for broker-side assignor, and if yes would
> "version" still be useful.
>
> Got it. That's an interesting thought. I think that the issue is that
> the client will never tell you which version of the server-side
> assignor should be used. Do you think that the coordinator would
> downgrade the version if the assignment fails with a higher version? I
> tend to believe that this should be handled within the assignor
> itself. In the example that you mentioned, the assignor would have to
> handle all the cases. I am not really convinced that we need this at
> the moment.
>
> The version from the client side would not be indicating the broker which
version to use, but rather which version the client would "work best with".
Such a "version" field would not be settible by the users, since they will
be hard-codedly bumped when the Kafka byte code version bumped.
Back to the rack aware assignor example, if the older versioned client does
not have a hierarchical rack ID, however if the assignment returned to them
is assuming a hierarchical rack structure, it may not reflect the best
workload balance among those new and old versioned clients. That means,
when receiving the members subscriptions at the server side, if the
versions from all these members are different, the broker's assignor may
need to consider using the lower version logic to do the assignment. So yes
the assignor would indeed have to handle all such cases, but it needs to do
so such that if there are clients who would not work with certain new
logic, it would then handle such cases automatically by e.g. still using an
older versioned logic.



> > Okay, my understanding is that the calling ordering of these callbacks
> would be like the following:
>
> Yes, your examples look right.
>
> > I'm wondering if we would still call onAssignment just once, that encodes
> all the assignment for this rebalance, including all the partitions that
> should be assigned to the member but not yet assigned since they have not
> been revoked by others. In that case the call ordering would be:
>
> Interesting. Is there a case for Streams where having the full
> assignment is beneficial? For instance, I can think of the following
> case. When a standby task is promoted to an active task, the metadata
> would not contain the standby task anymore and the assignment may not
> have the partition yet. In this case, Streams would stop the standby
> tasks but not have the active task yet if my understanding of Streams
> is correct. So knowing the full assignment could be helpful here.
>
> If we want to do this, we could structure the assignment given to the
> member as follow: version, error, metadata, assigned partitions,
> pending partitions, where the pending partitions would be the one
> assigned to this member but not yet available. What do you think?
>
> Regarding onAssignment being called only once, I am not sure to fully
> grasp the benefit yet. Does the assignor really care about this? In
> the end, the epoch does not really matter for the assignor because it
> has to converge its state to the desired state anyway.
>
> Here's my rationale (maybe rephased a bit :P ): the implementers of
rebalance listener and assignor are two groups of people, and most users
fall into the former group, while only very few people fall into the later
group. For rebalance listener implementers, they just want to know when a
partition is actually revoked or assigned to the consumer and reacts to it,
for this purpose, `onPartitionsRevoked` and `onPartitionsAssigned` would be
triggered interleavingly upon `poll` calls across rebalances. The usual
logic for such rebalance listeners are metrics reporting, committing
offsets (if they do not use Kafka for that), etc. They would not care which
calls are from which rebalances --- in the past with eager rebalance, it
maybe that each rebalance is associated with exactly a
`onPartitionsRevoked` first and then a `onPartitionsAssigned`, but it would
no longer the cases now.

The implementers of the assignor though, would care about "how the
assignment was made", that includes from which rebalance a certain
revoke/assign decision was made, based on what metadata such assignment is
made, etc. And that's the whole point of the `onAssignment` function since
otherwise they can just rely on the listeners. They usually implementation
logic of this callback is to e.g. bookkeep the assignment decision driving
factors a.k.a. the metadata, global information that needs to be propagated
to all members, etc. Take Streams as an example, the active processing
tasks go along with the assigned partitions, and we can always just
incrementally create / close them upon each rebalance listener triggers,
when certain partitions are revoked or assigned together; standby tasks
however are encoded with the metadata, and we can only know which standby
tasks should we get / drop based on the `onAssignment` function, and in
fact the creation of such tasks as a result of the metadata bookkeeping
does not need to wait until all the partitions that are yet-assigned have
been completely assigned to the member. Such information may not always be
updatable in an incremental manner as the partitions-revoked /
partitions-assigned. In such a case, it's better to just trigger this
function "once per decision made" i.e. once per rebalance generation.


Guozhang



> Best,
> David
>
> On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hi David, thanks for all the detailed explanations. I think they all make
> > sense. Just want to have a couple follow-ups here:
> >
> > > I don't really see the benefits here because server side assignors
> > don't have metadata at all. They only assign topic-partitions. They
> > are not supposed to generate metadata nor to receive metadata from the
> > members.
> >
> > Just to clarify I was asking about the `version` of the assignor (i.e. up
> > to what version that the client would support), and I do agree we would
> not
> > need metadata. What I have in mind is that, for some specific built-in
> > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > newer version we would have a hierarchical rack ID string format, like
> > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > would still be in old format. In this case, the broker then needs to
> choose
> > the old versioned assignor. I'm probably making something up here for
> rack
> > aware assignors, but I'm wondering if in general such an "auto-downgrade"
> > behavior would be needed still for broker-side assignor, and if yes would
> > "version" still be useful.
> >
> > > Yeah, that's right. Within a rebalance, `onAssignment` is called once
> > when the member transitions to a new epoch. This one contains the full
> > metadata provided by the client side assignor. Then, `onAssignment`
> > can be called max N times where N is the number of partitions pending
> > revocation by other members. Let me try to clarify this in the KIP.
> >
> > Okay, my understanding is that the calling ordering of these callbacks
> > would be like the following:
> >
> > ----------------------------------------
> > onPartitionsRevoked();   // just once, since we do not really need
> > to revoke incrementally.
> >
> > onAssignment();    // the first call, with epoch incremented
> > onPartitionsAssigned();   // paired with the onAssignment
> >
> > onAssignment();              // the first onAssignment would bump up the
> > epoch, and the metadata reflected.
> > onPartitionsAssigned();   // each time we get an additional assignment,
> we
> > call onAssignment and then paired with an onPartitionsAssigned
> > ...
> > onAssignment();
> > onPartitionsAssigned();   // on each of the onAssignment calls, the
> encoded
> > metadata would not change, only the incrementally added partitions be
> > reflected
> >
> > Is that the case?
> >
> > I'm wondering if we would still call onAssignment just once, that encodes
> > all the assignment for this rebalance, including all the partitions that
> > should be assigned to the member but not yet assigned since they have not
> > been revoked by others. In that case the call ordering would be:
> >
> > ----------------------------------------
> > onPartitionsRevoked();   // just once
> > onAssignment();    // just once, with epoch incremented, and metadata
> > encoded changed, the "assignment" field also reflect the final target
> > assignment
> > onPartitionsAssigned();   // multiple times, which represent
> incrementally
> > added partitions
> > ...
> > onPartitionsAssigned();
> >
> > The motivation from this is that, most users would only implement the
> > rebalance callback listeners and hence we'd definitely need to make sure
> > the semantics of that does not change much, and the time
> > `onPartitionsAssigned` indicate the time when the partitions are actually
> > assigned to it; while for assignors, the `onAssignment` is used to
> indicate
> > what decision is made regarding for this member, i.e. when the partitions
> > are decided to be given to it, but not necessarily meant that it has been
> > given, since that time should be determined by the time of
> > `onPartitionsAssigned`. The benefits though, would be that assignor
> > implementers would not need to reason which `onAssignment` would be the
> > last one for this epoch.
> >
> >
> > Guozhang
> >
> >
> >
> >
> > Guozhang
> >
> > On Thu, Sep 22, 2022 at 2:20 AM David Jacot <dja...@confluent.io.invalid
> >
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > > 1) The client-side "PartitionAssignor#Assignment" has an Error byte
> > > field,
> > > > while the broker-side "PartitionAssignor#Assignment" does not. And
> the
> > > > rationale seems to be assuming that we should always be able to do
> the
> > > > assignment at the broker-side assignor without errors. Personally I
> think
> > > > it's still potentially beneficial to add the Error field even for
> > > > broker-side assignors, e.g. for some edge cases where some subscribed
> > > > topics are not recognized with the current broker's metadata. What
> do you
> > > > think?
> > >
> > > Yeah, that seems reasonable. However, I wonder if it would be better
> > > to use an exception on the server side. This is what we usually do for
> > > server side plugins. On the client side, we use a field because the
> > > errors are not defined in advance.
> > >
> > > Your comment also makes me think about what we should do when the
> > > server side assignor fails. I suppose that we have to keep the current
> > > assignment until a new event occurs. For instance, in your example,
> > > the coordinator would have to trigger a rebalance when unrecognized
> > > topics are available. This would be part of the metadata monitoring.
> > >
> > > > 2) The client-side "GroupMember" has three additional fields
> > > > reason/version/metadata compared with the broker-side GroupMember. I
> > > agree
> > > > that broker-side assignor would not need reason/metadata since they
> are
> > > > blackbox strings/bytes to the assignor, but what about version? E.g.
> is
> > > it
> > > > possible that we evolve our broker-side built-in assignor but the old
> > > > versioned clients would not be able to work with the new version, in
> > > which
> > > > case we need to let the broker being aware of this and upgrade its
> > > behavior
> > > > to cooperate with the clients?
> > >
> > > I don't really see the benefits here because server side assignors
> > > don't have metadata at all. They only assign topic-partitions. They
> > > are not supposed to generate metadata nor to receive metadata from the
> > > members.
> > >
> > > > 3) Also related to 2) above, for the client-side "GroupMember",
> instead
> > > of
> > > > including these three fields, what about just adding the "Metadata"
> field
> > > > class which has these three fields? Also, there are two "Metadata"
> > > > currently in the APIs, the first is a class that encodes
> > > > reason/version/metadata, and the second is just the encoded metadata
> > > bytes.
> > > > I'm wondering what about just naming the first as memberMetadata,
> which
> > > > then has a bytebuffer field Metadata, or instead naming the second
> > > > bytebuffer field as metadataBytes?
> > >
> > > That's a good point. Let me try to rationalize this interface based on
> > > your suggestions.
> > >
> > > Best,
> > > David
> > >
> > > On Tue, Sep 13, 2022 at 9:21 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > > >
> > > > Hello David,
> > > >
> > > > Just had a few more nit questions about the public APIs:
> > > >
> > > > 1) The client-side "PartitionAssignor#Assignment" has an Error byte
> > > field,
> > > > while the broker-side "PartitionAssignor#Assignment" does not. And
> the
> > > > rationale seems to be assuming that we should always be able to do
> the
> > > > assignment at the broker-side assignor without errors. Personally I
> think
> > > > it's still potentially beneficial to add the Error field even for
> > > > broker-side assignors, e.g. for some edge cases where some subscribed
> > > > topics are not recognized with the current broker's metadata. What
> do you
> > > > think?
> > > >
> > > > 2) The client-side "GroupMember" has three additional fields
> > > > reason/version/metadata compared with the broker-side GroupMember. I
> > > agree
> > > > that broker-side assignor would not need reason/metadata since they
> are
> > > > blackbox strings/bytes to the assignor, but what about version? E.g.
> is
> > > it
> > > > possible that we evolve our broker-side built-in assignor but the old
> > > > versioned clients would not be able to work with the new version, in
> > > which
> > > > case we need to let the broker being aware of this and upgrade its
> > > behavior
> > > > to cooperate with the clients?
> > > >
> > > > 3) Also related to 2) above, for the client-side "GroupMember",
> instead
> > > of
> > > > including these three fields, what about just adding the "Metadata"
> field
> > > > class which has these three fields? Also, there are two "Metadata"
> > > > currently in the APIs, the first is a class that encodes
> > > > reason/version/metadata, and the second is just the encoded metadata
> > > bytes.
> > > > I'm wondering what about just naming the first as memberMetadata,
> which
> > > > then has a bytebuffer field Metadata, or instead naming the second
> > > > bytebuffer field as metadataBytes?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Sep 13, 2022 at 12:08 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello David,
> > > > >
> > > > > Thanks for bringing this question up. I think the main benefits as
> you
> > > > > listed is 2) above if it stays; just to clarify, we would only be
> able
> > > to
> > > > > save one round trip if the rebalance is still triggered by the
> broker;
> > > if
> > > > > the rebalance is triggered by the client then the num.round.trips
> are
> > > the
> > > > > same:
> > > > >
> > > > > 1) With GroupPrepareAssignment:
> > > > >
> > > > > T0: client decides to do a new assignment, suppose it has already
> sent
> > > a
> > > > > HB and hence has to wait for it to return first since only one
> request
> > > /
> > > > > response can be inflight with the coordinator's socket.
> > > > > T1: client receives the HB response, and then sends the
> > > > > GroupPrepareAssignment request.
> > > > > T2: the GroupPrepareAssignment response is returned.
> > > > > T3: it calculates the new assignment, and sends a
> > > GroupInstallAssignment
> > > > > request.
> > > > >
> > > > > In total, two round trips.
> > > > >
> > > > > 2) Without GroupPrepareAssignment:
> > > > >
> > > > > T0: client decides to do a new assignment, suppose it has already
> sent
> > > a
> > > > > HB and hence has to wait for it to return first since only one
> request
> > > /
> > > > > response can be inflight with the coordinator's socket.
> > > > > T1: client receives the HB response, and then sends the new HB
> request
> > > > > with the flag indicating a new rebalance needed..
> > > > > T2: the HB response with the optional member metadata map is
> returned.
> > > > > T3: it calculates the new assignment, and sends a
> > > GroupInstallAssignment
> > > > > request.
> > > > >
> > > > > In total, two round trips as well.
> > > > >
> > > > > -----------------------------
> > > > >
> > > > > So to complete the full picture here, we'd need to modify both HB
> > > request
> > > > > and response so that the client can also indicate a new rebalance
> via
> > > the
> > > > > HB request as well, right?
> > > > >
> > > > > Assuming all above is true, I think it's okay to merge the
> > > > > GroupPrepareAssignment into HB given that we can make the
> additional
> > > fields
> > > > > encoding the full member (subscription) metadata and topic
> metadata as
> > > > > optional fields.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Sep 12, 2022 at 5:22 AM David Jacot
> > > <dja...@confluent.io.invalid>
> > > > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> During an offline conversation, someone asked why we need the
> > > > >> ConsumerGroupPrepareAssignment API and suggested that we could
> instead
> > > > >> provide the group state in the heartbeat response. This has a few
> > > > >> advantages: 1) it does not require using a special error code to
> > > > >> signal that a new assignment is required as the signal would be
> the
> > > > >> provided group state; 2) it removes one round trip when a client
> side
> > > > >> assignor is used. The downside is that it makes the heartbeat
> > > > >> response's definition quite large. I recall that I went with the
> > > > >> current approach due to this.
> > > > >>
> > > > >> Providing the group state in the heartbeat response is appealing.
> What
> > > > >> do you guys think?
> > > > >>
> > > > >> Best,
> > > > >> David
> > > > >>
> > > > >> On Mon, Sep 12, 2022 at 2:17 PM David Jacot <dja...@confluent.io>
> > > wrote:
> > > > >> >
> > > > >> > Hi Guozhang,
> > > > >> >
> > > > >> > 1. I have added a reference to the relevant chapter instead of
> > > > >> > repeating the whole thing. Does that work for you?
> > > > >> >
> > > > >> > 2. The "Rebalance Triggers" section you are referring to is
> about
> > > when
> > > > >> > a rebalance should be triggered for the non-upgraded members
> using
> > > the
> > > > >> > old protocol. The section mentions that a rebalance must be
> > > triggered
> > > > >> > when a new assignment is installed. This implies that the group
> > > epoch
> > > > >> > was updated either by a native member or a non-upgraded member.
> For
> > > > >> > the latter, the JoinGroup request would be the trigger. I have
> > > added a
> > > > >> > reference to the relevant chapter in the "JoinGroup Handling"
> > > section
> > > > >> > as well. Does that make sense?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > David
> > > > >> >
> > > > >> > On Fri, Sep 9, 2022 at 10:35 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > >> wrote:
> > > > >> > >
> > > > >> > > Hello David,
> > > > >> > >
> > > > >> > > Alright I think that's sufficient. Just to make that clear in
> the
> > > doc,
> > > > >> > > could we update:
> > > > >> > >
> > > > >> > > 1) the heartbeat request handling section, stating when
> > > coordinator
> > > > >> will
> > > > >> > > trigger rebalance based on the HB's member metadata / reason?
> > > > >> > > 2) the "Rebalance Triggers" section to include what we
> described
> > > in
> > > > >> "Group
> > > > >> > > Epoch - Trigger a rebalance" section as well?
> > > > >> > >
> > > > >> > >
> > > > >> > > Guozhang
> > > > >> > >
> > > > >> > > On Fri, Sep 9, 2022 at 1:28 AM David Jacot
> > > > >> <dja...@confluent.io.invalid>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Guozhang,
> > > > >> > > >
> > > > >> > > > I thought that the assignor will always be consulted when
> the
> > > next
> > > > >> > > > heartbeat request is constructed. In other words,
> > > > >> > > > `PartitionAssignor#metadata` will be called for every
> heartbeat.
> > > > >> This
> > > > >> > > > gives the opportunity for the assignor to enforce a
> rebalance by
> > > > >> > > > setting the reason to a non-zero value or by changing the
> > > bytes. Do
> > > > >> > > > you think that this is not sufficient? Are you concerned by
> the
> > > > >> delay?
> > > > >> > > >
> > > > >> > > > Best,
> > > > >> > > > David
> > > > >> > > >
> > > > >> > > > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang <
> > > wangg...@gmail.com>
> > > > >> wrote:
> > > > >> > > > >
> > > > >> > > > > Hello David,
> > > > >> > > > >
> > > > >> > > > > One of Jun's comments make me thinking:
> > > > >> > > > >
> > > > >> > > > > ```
> > > > >> > > > > In this case, a new assignment is triggered by the client
> side
> > > > >> > > > > assignor. When constructing the HB, the consumer will
> always
> > > > >> consult
> > > > >> > > > > the client side assignor and propagate the information to
> the
> > > > >> group
> > > > >> > > > > coordinator. In other words, we don't expect users to call
> > > > >> > > > > Consumer#enforceRebalance anymore.
> > > > >> > > > > ```
> > > > >> > > > >
> > > > >> > > > > As I looked at the current PartitionAssignor's interface,
> we
> > > > >> actually do
> > > > >> > > > > not have a way yet to instruct how to construct the next
> HB
> > > > >> request, e.g.
> > > > >> > > > > when the assignor wants to enforce a new rebalance with a
> new
> > > > >> assignment,
> > > > >> > > > > we'd need some customizable APIs inside the
> PartitionAssignor
> > > to
> > > > >> indicate
> > > > >> > > > > the next HB telling broker about so. WDYT about adding
> such an
> > > > >> API on the
> > > > >> > > > > PartitionAssignor?
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > Guozhang
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot
> > > > >> <dja...@confluent.io.invalid>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi Jun,
> > > > >> > > > > >
> > > > >> > > > > > I have updated the KIP to include your feedback. I have
> also
> > > > >> tried to
> > > > >> > > > > > clarify the parts which were not cleared.
> > > > >> > > > > >
> > > > >> > > > > > Best,
> > > > >> > > > > > David
> > > > >> > > > > >
> > > > >> > > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot <
> > > dja...@confluent.io
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > Hi Jun,
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks for your feedback. Let me start by answering
> your
> > > > >> questions
> > > > >> > > > > > > inline and I will update the KIP next week.
> > > > >> > > > > > >
> > > > >> > > > > > > > Thanks for the KIP. Overall, the main benefits of
> the
> > > KIP
> > > > >> seem to
> > > > >> > > > be
> > > > >> > > > > > fewer
> > > > >> > > > > > > > RPCs during rebalance and more efficient support of
> > > > >> wildcard. A few
> > > > >> > > > > > > > comments below.
> > > > >> > > > > > >
> > > > >> > > > > > > I would also add that the KIP removes the global sync
> > > barrier
> > > > >> in the
> > > > >> > > > > > > protocol which is essential to improve group
> stability and
> > > > >> > > > > > > scalability, and the KIP also simplifies the client by
> > > moving
> > > > >> most of
> > > > >> > > > > > > the logic to the server side.
> > > > >> > > > > > >
> > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest
> > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we plan to
> > > support
> > > > >> rolling
> > > > >> > > > > > changing
> > > > >> > > > > > > > of the partition assignor in the consumers?
> > > > >> > > > > > >
> > > > >> > > > > > > Definitely. The group coordinator will use the
> assignor
> > > used
> > > > >> by a
> > > > >> > > > > > > majority of the members. This allows the group to move
> > > from
> > > > >> one
> > > > >> > > > > > > assignor to another by a roll. This is explained in
> the
> > > > >> Assignor
> > > > >> > > > > > > Selection chapter.
> > > > >> > > > > > >
> > > > >> > > > > > > > 30.2 For each field, could you explain whether it's
> > > > >> required in
> > > > >> > > > every
> > > > >> > > > > > > > request or the scenarios when it needs to be
> filled? For
> > > > >> example,
> > > > >> > > > it's
> > > > >> > > > > > not
> > > > >> > > > > > > > clear to me when TopicPartitions needs to be filled.
> > > > >> > > > > > >
> > > > >> > > > > > > The client is expected to set those fields in case of
> a
> > > > >> connection
> > > > >> > > > > > > issue (e.g. timeout) or when the fields have changed
> since
> > > > >> the last
> > > > >> > > > > > > HB. The server populates those fields as long as the
> > > member
> > > > >> is not
> > > > >> > > > > > > fully reconciled - the member should acknowledge that
> it
> > > has
> > > > >> the
> > > > >> > > > > > > expected epoch and assignment. I will clarify this in
> the
> > > KIP.
> > > > >> > > > > > >
> > > > >> > > > > > > > 31. In the current consumer protocol, the rack
> affinity
> > > > >> between the
> > > > >> > > > > > client
> > > > >> > > > > > > > and the broker is only considered during fetching,
> but
> > > not
> > > > >> during
> > > > >> > > > > > assigning
> > > > >> > > > > > > > partitions to consumers. Sometimes, once the
> assignment
> > > is
> > > > >> made,
> > > > >> > > > there
> > > > >> > > > > > is
> > > > >> > > > > > > > no opportunity for read affinity because no
> replicas of
> > > > >> assigned
> > > > >> > > > > > partitions
> > > > >> > > > > > > > are close to the member. I am wondering if we
> should use
> > > > >> this
> > > > >> > > > > > opportunity
> > > > >> > > > > > > > to address this by including rack in GroupMember.
> > > > >> > > > > > >
> > > > >> > > > > > > That's an interesting idea. I don't see any issue with
> > > adding
> > > > >> the
> > > > >> > > > rack
> > > > >> > > > > > > to the members. I will do so.
> > > > >> > > > > > >
> > > > >> > > > > > > > 32. On the metric side, often, it's useful to know
> how
> > > busy
> > > > >> a group
> > > > >> > > > > > > > coordinator is. By moving the event loop model, it
> seems
> > > > >> that we
> > > > >> > > > could
> > > > >> > > > > > add
> > > > >> > > > > > > > a metric that tracks the fraction of the time the
> event
> > > > >> loop is
> > > > >> > > > doing
> > > > >> > > > > > the
> > > > >> > > > > > > > actual work.
> > > > >> > > > > > >
> > > > >> > > > > > > That's a great idea. I will add it. Thanks.
> > > > >> > > > > > >
> > > > >> > > > > > > > 33. Could we add a section on coordinator failover
> > > > >> handling? For
> > > > >> > > > > > example,
> > > > >> > > > > > > > does it need to trigger the check if any group with
> the
> > > > >> wildcard
> > > > >> > > > > > > > subscription now has a new matching topic?
> > > > >> > > > > > >
> > > > >> > > > > > > Sure. When the new group coordinator takes over, it
> has
> > > to:
> > > > >> > > > > > > * Setup the session timeouts.
> > > > >> > > > > > > * Trigger a new assignment if a client side assignor
> is
> > > used.
> > > > >> We
> > > > >> > > > don't
> > > > >> > > > > > > store the information about the member selected to
> run the
> > > > >> assignment
> > > > >> > > > > > > so we have to start a new one.
> > > > >> > > > > > > * Update the topics metadata, verify the wildcard
> > > > >> subscriptions, and
> > > > >> > > > > > > trigger a rebalance if needed.
> > > > >> > > > > > >
> > > > >> > > > > > > > 34. ConsumerGroupMetadataValue,
> > > > >> > > > ConsumerGroupPartitionMetadataValue,
> > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we document
> what
> > > > >> the epoch
> > > > >> > > > > > field
> > > > >> > > > > > > > reflects? For example, does the epoch in
> > > > >> ConsumerGroupMetadataValue
> > > > >> > > > > > reflect
> > > > >> > > > > > > > the latest group epoch? What about the one in
> > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and
> > > > >> > > > > > ConsumerGroupMemberMetadataValue?
> > > > >> > > > > > >
> > > > >> > > > > > > Sure. I will clarify that but it is always the latest
> > > group
> > > > >> epoch.
> > > > >> > > > > > > When the group state is updated, the group epoch is
> > > bumped so
> > > > >> we use
> > > > >> > > > > > > that one for all the change records related to the
> update.
> > > > >> > > > > > >
> > > > >> > > > > > > > 35. "the group coordinator will ensure that the
> > > following
> > > > >> > > > invariants
> > > > >> > > > > > are
> > > > >> > > > > > > > met: ... All members exists." It's possible for a
> member
> > > > >> not to
> > > > >> > > > get any
> > > > >> > > > > > > > assigned partitions, right?
> > > > >> > > > > > >
> > > > >> > > > > > > That's right. Here I meant that the members provided
> by
> > > the
> > > > >> assignor
> > > > >> > > > > > > in the assignment must exist in the group. The
> assignor
> > > can
> > > > >> not make
> > > > >> > > > > > > up new member ids.
> > > > >> > > > > > >
> > > > >> > > > > > > > 36. "He can rejoins the group with a member epoch
> > > equals to
> > > > >> 0":
> > > > >> > > > When
> > > > >> > > > > > would
> > > > >> > > > > > > > a consumer rejoin and what member id would be used?
> > > > >> > > > > > >
> > > > >> > > > > > > A member is expected to abandon all its partitions and
> > > > >> rejoins when
> > > > >> > > > it
> > > > >> > > > > > > receives the FENCED_MEMBER_EPOCH error. In this case,
> the
> > > > >> group
> > > > >> > > > > > > coordinator will have removed the member from the
> group.
> > > The
> > > > >> member
> > > > >> > > > > > > can rejoin the group with the same member id but with
> 0 as
> > > > >> epoch. Let
> > > > >> > > > > > > me see if I can clarify this in the KIP.
> > > > >> > > > > > >
> > > > >> > > > > > > > 37. "Instead, power users will have the ability to
> > > trigger a
> > > > >> > > > > > reassignment
> > > > >> > > > > > > > by either providing a non-zero reason or by
> updating the
> > > > >> assignor
> > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting with
> the
> > > > >> deprecation
> > > > >> > > > of
> > > > >> > > > > > > > Consumer#enforeRebalance.
> > > > >> > > > > > >
> > > > >> > > > > > > In this case, a new assignment is triggered by the
> client
> > > side
> > > > >> > > > > > > assignor. When constructing the HB, the consumer will
> > > always
> > > > >> consult
> > > > >> > > > > > > the client side assignor and propagate the
> information to
> > > the
> > > > >> group
> > > > >> > > > > > > coordinator. In other words, we don't expect users to
> call
> > > > >> > > > > > > Consumer#enforceRebalance anymore.
> > > > >> > > > > > >
> > > > >> > > > > > > > 38. The reassignment examples are nice. But the
> section
> > > > >> seems to
> > > > >> > > > have
> > > > >> > > > > > > > multiple typos.
> > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, B
> > > immediately
> > > > >> gets into
> > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems
> incorrect.
> > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, C seems
> to
> > > get
> > > > >> into
> > > > >> > > > > > epoch=3,
> > > > >> > > > > > > > partitions=[foo-1] too early.
> > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still has A -
> > > > >> epoch=2,
> > > > >> > > > > > > > partitions=[foo-0].
> > > > >> > > > > > >
> > > > >> > > > > > > Sorry for that! I will revise them.
> > > > >> > > > > > >
> > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we support the
> > > upgrade
> > > > >> from
> > > > >> > > > any
> > > > >> > > > > > old
> > > > >> > > > > > > > version to new one?
> > > > >> > > > > > >
> > > > >> > > > > > > We will support upgrading from the consumer protocol
> > > version
> > > > >> 3,
> > > > >> > > > > > > introduced in KIP-792. KIP-792 is not implemented yet
> so
> > > the
> > > > >> earliest
> > > > >> > > > > > > version is unknown at the moment. This is explained
> in the
> > > > >> migration
> > > > >> > > > > > > plan chapter.
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks again for your feedback, Jun. I will update
> the KIP
> > > > >> based on
> > > > >> > > > it
> > > > >> > > > > > > next week.
> > > > >> > > > > > >
> > > > >> > > > > > > Best,
> > > > >> > > > > > > David
> > > > >> > > > > > >
> > > > >> > > > > > > On Thu, Sep 1, 2022 at 9:07 PM Jun Rao
> > > > >> <j...@confluent.io.invalid>
> > > > >> > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > Hi, David,
> > > > >> > > > > > > >
> > > > >> > > > > > > > Thanks for the KIP. Overall, the main benefits of
> the
> > > KIP
> > > > >> seem to
> > > > >> > > > be
> > > > >> > > > > > fewer
> > > > >> > > > > > > > RPCs during rebalance and more efficient support of
> > > > >> wildcard. A few
> > > > >> > > > > > > > comments below.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest
> > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we plan to
> > > support
> > > > >> rolling
> > > > >> > > > > > changing
> > > > >> > > > > > > > of the partition assignor in the consumers?
> > > > >> > > > > > > > 30.2 For each field, could you explain whether it's
> > > > >> required in
> > > > >> > > > every
> > > > >> > > > > > > > request or the scenarios when it needs to be
> filled? For
> > > > >> example,
> > > > >> > > > it's
> > > > >> > > > > > not
> > > > >> > > > > > > > clear to me when TopicPartitions needs to be filled.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 31. In the current consumer protocol, the rack
> affinity
> > > > >> between the
> > > > >> > > > > > client
> > > > >> > > > > > > > and the broker is only considered during fetching,
> but
> > > not
> > > > >> during
> > > > >> > > > > > assigning
> > > > >> > > > > > > > partitions to consumers. Sometimes, once the
> assignment
> > > is
> > > > >> made,
> > > > >> > > > there
> > > > >> > > > > > is
> > > > >> > > > > > > > no opportunity for read affinity because no
> replicas of
> > > > >> assigned
> > > > >> > > > > > partitions
> > > > >> > > > > > > > are close to the member. I am wondering if we
> should use
> > > > >> this
> > > > >> > > > > > opportunity
> > > > >> > > > > > > > to address this by including rack in GroupMember.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 32. On the metric side, often, it's useful to know
> how
> > > busy
> > > > >> a group
> > > > >> > > > > > > > coordinator is. By moving the event loop model, it
> seems
> > > > >> that we
> > > > >> > > > could
> > > > >> > > > > > add
> > > > >> > > > > > > > a metric that tracks the fraction of the time the
> event
> > > > >> loop is
> > > > >> > > > doing
> > > > >> > > > > > the
> > > > >> > > > > > > > actual work.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 33. Could we add a section on coordinator failover
> > > > >> handling? For
> > > > >> > > > > > example,
> > > > >> > > > > > > > does it need to trigger the check if any group with
> the
> > > > >> wildcard
> > > > >> > > > > > > > subscription now has a new matching topic?
> > > > >> > > > > > > >
> > > > >> > > > > > > > 34. ConsumerGroupMetadataValue,
> > > > >> > > > ConsumerGroupPartitionMetadataValue,
> > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we document
> what
> > > > >> the epoch
> > > > >> > > > > > field
> > > > >> > > > > > > > reflects? For example, does the epoch in
> > > > >> ConsumerGroupMetadataValue
> > > > >> > > > > > reflect
> > > > >> > > > > > > > the latest group epoch? What about the one in
> > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and
> > > > >> > > > > > ConsumerGroupMemberMetadataValue?
> > > > >> > > > > > > >
> > > > >> > > > > > > > 35. "the group coordinator will ensure that the
> > > following
> > > > >> > > > invariants
> > > > >> > > > > > are
> > > > >> > > > > > > > met: ... All members exists." It's possible for a
> member
> > > > >> not to
> > > > >> > > > get any
> > > > >> > > > > > > > assigned partitions, right?
> > > > >> > > > > > > >
> > > > >> > > > > > > > 36. "He can rejoins the group with a member epoch
> > > equals to
> > > > >> 0":
> > > > >> > > > When
> > > > >> > > > > > would
> > > > >> > > > > > > > a consumer rejoin and what member id would be used?
> > > > >> > > > > > > >
> > > > >> > > > > > > > 37. "Instead, power users will have the ability to
> > > trigger a
> > > > >> > > > > > reassignment
> > > > >> > > > > > > > by either providing a non-zero reason or by
> updating the
> > > > >> assignor
> > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting with
> the
> > > > >> deprecation
> > > > >> > > > of
> > > > >> > > > > > > > Consumer#enforeRebalance.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 38. The reassignment examples are nice. But the
> section
> > > > >> seems to
> > > > >> > > > have
> > > > >> > > > > > > > multiple typos.
> > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, B
> > > immediately
> > > > >> gets into
> > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems
> incorrect.
> > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, C seems
> to
> > > get
> > > > >> into
> > > > >> > > > > > epoch=3,
> > > > >> > > > > > > > partitions=[foo-1] too early.
> > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still has A -
> > > > >> epoch=2,
> > > > >> > > > > > > > partitions=[foo-0].
> > > > >> > > > > > > >
> > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we support the
> > > upgrade
> > > > >> from
> > > > >> > > > any
> > > > >> > > > > > old
> > > > >> > > > > > > > version to new one?
> > > > >> > > > > > > >
> > > > >> > > > > > > > Thanks,
> > > > >> > > > > > > >
> > > > >> > > > > > > > Jun
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Mon, Aug 29, 2022 at 9:20 AM David Jacot
> > > > >> > > > > > <dja...@confluent.io.invalid>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Hi all,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > The KIP states that we will re-implement the
> > > coordinator
> > > > >> in
> > > > >> > > > Java. I
> > > > >> > > > > > > > > discussed this offline with a few folks and folks
> are
> > > > >> concerned
> > > > >> > > > that
> > > > >> > > > > > > > > we could introduce many regressions in the old
> > > protocol
> > > > >> if we do
> > > > >> > > > so.
> > > > >> > > > > > > > > Therefore, I am going to remove this statement
> from
> > > the
> > > > >> KIP. It
> > > > >> > > > is an
> > > > >> > > > > > > > > implementation detail after all so it does not
> have
> > > to be
> > > > >> > > > decided at
> > > > >> > > > > > > > > this stage. We will likely start by trying to
> > > refactor the
> > > > >> > > > current
> > > > >> > > > > > > > > implementation as a first step.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Cheers,
> > > > >> > > > > > > > > David
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Mon, Aug 29, 2022 at 3:52 PM David Jacot <
> > > > >> dja...@confluent.io
> > > > >> > > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Hi Luke,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 1.1. I think the state machine are: "Empty,
> > > assigning,
> > > > >> > > > > > reconciling,
> > > > >> > > > > > > > > stable,
> > > > >> > > > > > > > > > > dead" mentioned in Consumer Group States
> section,
> > > > >> right?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > This sentence does not refer to those group
> states
> > > but
> > > > >> rather
> > > > >> > > > to a
> > > > >> > > > > > > > > > state machine replication (SMR). This refers to
> the
> > > > >> entire
> > > > >> > > > state of
> > > > >> > > > > > > > > > group coordinator which is replicated via the
> log
> > > > >> layer. I will
> > > > >> > > > > > > > > > clarify this in the KIP.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 1.2. What do you mean "each state machine is
> > > modelled
> > > > >> as an
> > > > >> > > > event
> > > > >> > > > > > > > > loop"?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > The idea is to follow a model similar to the new
> > > quorum
> > > > >> > > > > > controller. We
> > > > >> > > > > > > > > > will have N threads to process events. Each
> > > > >> __consumer_offsets
> > > > >> > > > > > > > > > partition is assigned to a unique thread and
> all the
> > > > >> events
> > > > >> > > > (e.g.
> > > > >> > > > > > > > > > requests, callbacks, etc.) are processed by this
> > > > >> thread. This
> > > > >> > > > > > simplify
> > > > >> > > > > > > > > > concurrency and will enable us to do simulation
> > > testing
> > > > >> for the
> > > > >> > > > > > group
> > > > >> > > > > > > > > > coordinator.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 1.3. Why do we need a state machine per
> > > > >> *__consumer_offsets*
> > > > >> > > > > > > > > partitions?
> > > > >> > > > > > > > > > > Not a state machine "per consumer group"
> owned by
> > > a
> > > > >> group
> > > > >> > > > > > coordinator?
> > > > >> > > > > > > > > For
> > > > >> > > > > > > > > > > example, if one group coordinator owns 2
> consumer
> > > > >> groups, and
> > > > >> > > > > > both
> > > > >> > > > > > > > > exist in
> > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1 state
> > > machine
> > > > >> for it,
> > > > >> > > > or
> > > > >> > > > > > 2?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > See 1.1. The confusion comes from there, I
> think.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 1.4. I know the "*group.coordinator.threads"
> *is
> > > the
> > > > >> number
> > > > >> > > > of
> > > > >> > > > > > threads
> > > > >> > > > > > > > > used
> > > > >> > > > > > > > > > > to run the state machines. But I'm wondering
> if
> > > the
> > > > >> purpose
> > > > >> > > > of
> > > > >> > > > > > the
> > > > >> > > > > > > > > threads
> > > > >> > > > > > > > > > > is only to keep the state of each consumer
> group
> > > (or
> > > > >> > > > > > > > > *__consumer_offsets*
> > > > >> > > > > > > > > > > partitions?), and no heavy computation, why
> > > should we
> > > > >> need
> > > > >> > > > > > > > > multi-threads
> > > > >> > > > > > > > > > > here?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > See 1.2. The idea is to have an ability to
> shard the
> > > > >> > > > processing as
> > > > >> > > > > > the
> > > > >> > > > > > > > > > computation could be heavy.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why does
> the
> > > > >> default
> > > > >> > > > session
> > > > >> > > > > > > > > timeout not
> > > > >> > > > > > > > > > > locate between min (45s) and max(60s)? I
> thought
> > > the
> > > > >> min/max
> > > > >> > > > > > session
> > > > >> > > > > > > > > > > timeout is to define lower/upper bound of it,
> no?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int 30s The
> > > > >> timeout to
> > > > >> > > > detect
> > > > >> > > > > > client
> > > > >> > > > > > > > > > > failures when using the consumer group
> protocol.
> > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms int
> 45s The
> > > > >> minimum
> > > > >> > > > > > session
> > > > >> > > > > > > > > timeout.
> > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms int
> 60s The
> > > > >> maximum
> > > > >> > > > > > session
> > > > >> > > > > > > > > timeout.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > This is indeed a mistake. The default session
> > > timeout
> > > > >> should
> > > > >> > > > be 45s
> > > > >> > > > > > > > > > (the current default).
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > 2.2. The default server side assignor are
> [range,
> > > > >> uniform],
> > > > >> > > > > > which means
> > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd like to
> > > know
> > > > >> why not
> > > > >> > > > > > uniform
> > > > >> > > > > > > > > one? I
> > > > >> > > > > > > > > > > thought usually users will choose uniform
> assignor
> > > > >> (former
> > > > >> > > > sticky
> > > > >> > > > > > > > > assinor)
> > > > >> > > > > > > > > > > for better evenly distribution. Any other
> reason
> > > we
> > > > >> choose
> > > > >> > > > range
> > > > >> > > > > > > > > assignor
> > > > >> > > > > > > > > > > as default?
> > > > >> > > > > > > > > > > group.consumer.assignors List range, uniform
> The
> > > > >> server side
> > > > >> > > > > > assignors.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > The order on the server side has no influence
> > > because
> > > > >> the
> > > > >> > > > client
> > > > >> > > > > > must
> > > > >> > > > > > > > > > chose the selector that he wants to use. There
> is no
> > > > >> default
> > > > >> > > > in the
> > > > >> > > > > > > > > > current proposal. If the assignor is not
> specified
> > > by
> > > > >> the
> > > > >> > > > client,
> > > > >> > > > > > the
> > > > >> > > > > > > > > > request is rejected. The default client value
> for
> > > > >> > > > > > > > > > `group.remote.assignor` is `uniform` though.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Thanks for your very good comments, Luke. I hope
> > > that my
> > > > >> > > > answers
> > > > >> > > > > > help
> > > > >> > > > > > > > > > to clarify things. I will update the KIP as well
> > > based
> > > > >> on your
> > > > >> > > > > > > > > > feedback.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Cheers,
> > > > >> > > > > > > > > > David
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Mon, Aug 22, 2022 at 9:29 AM Luke Chen <
> > > > >> show...@gmail.com>
> > > > >> > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Hi David,
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Thanks for the update.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Some more questions:
> > > > >> > > > > > > > > > > 1. In Group Coordinator section, you
> mentioned:
> > > > >> > > > > > > > > > > > The new group coordinator will have a state
> > > machine
> > > > >> per
> > > > >> > > > > > > > > > > *__consumer_offsets* partitions, where each
> state
> > > > >> machine is
> > > > >> > > > > > modelled
> > > > >> > > > > > > > > as an
> > > > >> > > > > > > > > > > event loop. Those state machines will be
> executed
> > > in
> > > > >> > > > > > > > > > > *group.coordinator.threads* threads.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > 1.1. I think the state machine are: "Empty,
> > > assigning,
> > > > >> > > > > > reconciling,
> > > > >> > > > > > > > > stable,
> > > > >> > > > > > > > > > > dead" mentioned in Consumer Group States
> section,
> > > > >> right?
> > > > >> > > > > > > > > > > 1.2. What do you mean "each state machine is
> > > modelled
> > > > >> as an
> > > > >> > > > event
> > > > >> > > > > > > > > loop"?
> > > > >> > > > > > > > > > > 1.3. Why do we need a state machine per
> > > > >> *__consumer_offsets*
> > > > >> > > > > > > > > partitions?
> > > > >> > > > > > > > > > > Not a state machine "per consumer group"
> owned by
> > > a
> > > > >> group
> > > > >> > > > > > coordinator?
> > > > >> > > > > > > > > For
> > > > >> > > > > > > > > > > example, if one group coordinator owns 2
> consumer
> > > > >> groups, and
> > > > >> > > > > > both
> > > > >> > > > > > > > > exist in
> > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1 state
> > > machine
> > > > >> for it,
> > > > >> > > > or
> > > > >> > > > > > 2?
> > > > >> > > > > > > > > > > 1.4. I know the "*group.coordinator.threads"
> *is
> > > the
> > > > >> number
> > > > >> > > > of
> > > > >> > > > > > threads
> > > > >> > > > > > > > > used
> > > > >> > > > > > > > > > > to run the state machines. But I'm wondering
> if
> > > the
> > > > >> purpose
> > > > >> > > > of
> > > > >> > > > > > the
> > > > >> > > > > > > > > threads
> > > > >> > > > > > > > > > > is only to keep the state of each consumer
> group
> > > (or
> > > > >> > > > > > > > > *__consumer_offsets*
> > > > >> > > > > > > > > > > partitions?), and no heavy computation, why
> > > should we
> > > > >> need
> > > > >> > > > > > > > > multi-threads
> > > > >> > > > > > > > > > > here?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > 2. For the default value in the new configs:
> > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why does
> the
> > > > >> default
> > > > >> > > > session
> > > > >> > > > > > > > > timeout not
> > > > >> > > > > > > > > > > locate between min (45s) and max(60s)? I
> thought
> > > the
> > > > >> min/max
> > > > >> > > > > > session
> > > > >> > > > > > > > > > > timeout is to define lower/upper bound of it,
> no?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int 30s The
> > > > >> timeout to
> > > > >> > > > detect
> > > > >> > > > > > client
> > > > >> > > > > > > > > > > failures when using the consumer group
> protocol.
> > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms int
> 45s The
> > > > >> minimum
> > > > >> > > > > > session
> > > > >> > > > > > > > > timeout.
> > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms int
> 60s The
> > > > >> maximum
> > > > >> > > > > > session
> > > > >> > > > > > > > > timeout.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > 2.2. The default server side assignor are
> [range,
> > > > >> uniform],
> > > > >> > > > > > which means
> > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd like to
> > > know
> > > > >> why not
> > > > >> > > > > > uniform
> > > > >> > > > > > > > > one? I
> > > > >> > > > > > > > > > > thought usually users will choose uniform
> assignor
> > > > >> (former
> > > > >> > > > sticky
> > > > >> > > > > > > > > assinor)
> > > > >> > > > > > > > > > > for better evenly distribution. Any other
> reason
> > > we
> > > > >> choose
> > > > >> > > > range
> > > > >> > > > > > > > > assignor
> > > > >> > > > > > > > > > > as default?
> > > > >> > > > > > > > > > > group.consumer.assignors List range, uniform
> The
> > > > >> server side
> > > > >> > > > > > assignors.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Thank you.
> > > > >> > > > > > > > > > > Luke
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Mon, Aug 22, 2022 at 2:10 PM Luke Chen <
> > > > >> show...@gmail.com
> > > > >> > > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Hi Sagar,
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > I have some thoughts about Kafka Connect
> > > > >> integrating with
> > > > >> > > > > > KIP-848,
> > > > >> > > > > > > > > but I
> > > > >> > > > > > > > > > > > think we should have a separate discussion
> > > thread
> > > > >> for the
> > > > >> > > > Kafka
> > > > >> > > > > > > > > Connect
> > > > >> > > > > > > > > > > > KIP: Integrating Kafka Connect With New
> Consumer
> > > > >> Rebalance
> > > > >> > > > > > Protocol
> > > > >> > > > > > > > > [1],
> > > > >> > > > > > > > > > > > and let this discussion thread focus on
> consumer
> > > > >> rebalance
> > > > >> > > > > > protocol,
> > > > >> > > > > > > > > WDYT?
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > [1]
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thank you.
> > > > >> > > > > > > > > > > > Luke
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Fri, Aug 12, 2022 at 9:31 PM Sagar <
> > > > >> > > > > > sagarmeansoc...@gmail.com>
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >> Thank you Guozhang/David for the feedback.
> > > Looks
> > > > >> like
> > > > >> > > > there's
> > > > >> > > > > > > > > agreement on
> > > > >> > > > > > > > > > > >> using separate APIs for Connect. I would
> > > revisit
> > > > >> the doc
> > > > >> > > > and
> > > > >> > > > > > see
> > > > >> > > > > > > > > what
> > > > >> > > > > > > > > > > >> changes are to be made.
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> Thanks!
> > > > >> > > > > > > > > > > >> Sagar.
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> On Tue, Aug 9, 2022 at 7:11 PM David Jacot
> > > > >> > > > > > > > > <dja...@confluent.io.invalid>
> > > > >> > > > > > > > > > > >> wrote:
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> > Hi Sagar,
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Thanks for the feedback and the document.
> > > That's
> > > > >> really
> > > > >> > > > > > helpful. I
> > > > >> > > > > > > > > > > >> > will take a look at it.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Overall, it seems to me that both Connect
> > > and the
> > > > >> > > > Consumer
> > > > >> > > > > > could
> > > > >> > > > > > > > > share
> > > > >> > > > > > > > > > > >> > the same underlying "engine". The main
> > > > >> difference is
> > > > >> > > > that
> > > > >> > > > > > the
> > > > >> > > > > > > > > Consumer
> > > > >> > > > > > > > > > > >> > assigns topic-partitions to members
> whereas
> > > > >> Connect
> > > > >> > > > assigns
> > > > >> > > > > > tasks
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > > >> > workers. I see two ways to move forward:
> > > > >> > > > > > > > > > > >> > 1) We extend the new proposed APIs to
> support
> > > > >> different
> > > > >> > > > > > resource
> > > > >> > > > > > > > > types
> > > > >> > > > > > > > > > > >> > (e.g. partitions, tasks, etc.); or
> > > > >> > > > > > > > > > > >> > 2) We use new dedicated APIs for
> Connect. The
> > > > >> dedicated
> > > > >> > > > APIs
> > > > >> > > > > > > > > would be
> > > > >> > > > > > > > > > > >> > similar to the new ones but different on
> the
> > > > >> > > > > > content/resources and
> > > > >> > > > > > > > > > > >> > they would rely on the same engine on the
> > > > >> coordinator
> > > > >> > > > side.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > I personally lean towards 2) because I am
> > > not a
> > > > >> fan of
> > > > >> > > > > > > > > overcharging
> > > > >> > > > > > > > > > > >> > APIs to serve different purposes. That
> being
> > > > >> said, I am
> > > > >> > > > not
> > > > >> > > > > > > > > opposed to
> > > > >> > > > > > > > > > > >> > 1) if we can find an elegant way to do
> it.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > I think that we can continue to discuss
> it
> > > here
> > > > >> for now
> > > > >> > > > in
> > > > >> > > > > > order
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > > >> > ensure that this KIP is compatible with
> what
> > > we
> > > > >> will do
> > > > >> > > > for
> > > > >> > > > > > > > > Connect in
> > > > >> > > > > > > > > > > >> > the future.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Best,
> > > > >> > > > > > > > > > > >> > David
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > On Mon, Aug 8, 2022 at 2:41 PM David
> Jacot <
> > > > >> > > > > > dja...@confluent.io>
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Hi all,
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > I am back from vacation. I will go
> through
> > > and
> > > > >> address
> > > > >> > > > > > your
> > > > >> > > > > > > > > comments
> > > > >> > > > > > > > > > > >> > > in the coming days. Thanks for your
> > > feedback.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Cheers,
> > > > >> > > > > > > > > > > >> > > David
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory
> > > Harris
> > > > >> <
> > > > >> > > > > > > > > gharris1...@gmail.com
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > wrote:
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > Hey All!
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > Thanks for the KIP, it's wonderful
> to see
> > > > >> > > > cooperative
> > > > >> > > > > > > > > rebalancing
> > > > >> > > > > > > > > > > >> > making it
> > > > >> > > > > > > > > > > >> > > > down the stack!
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > I had a few questions:
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > 1. The 'Rejected Alternatives'
> section
> > > > >> describes how
> > > > >> > > > > > member
> > > > >> > > > > > > > > epoch
> > > > >> > > > > > > > > > > >> > should
> > > > >> > > > > > > > > > > >> > > > advance in step with the group epoch
> and
> > > > >> assignment
> > > > >> > > > > > epoch
> > > > >> > > > > > > > > values. I
> > > > >> > > > > > > > > > > >> > think
> > > > >> > > > > > > > > > > >> > > > that this is a good idea for the
> reasons
> > > > >> described
> > > > >> > > > in
> > > > >> > > > > > the
> > > > >> > > > > > > > > KIP. When
> > > > >> > > > > > > > > > > >> the
> > > > >> > > > > > > > > > > >> > > > protocol is incrementally assigning
> > > > >> partitions to a
> > > > >> > > > > > worker,
> > > > >> > > > > > > > > what
> > > > >> > > > > > > > > > > >> member
> > > > >> > > > > > > > > > > >> > > > epoch does each incremental
> assignment
> > > use?
> > > > >> Are
> > > > >> > > > member
> > > > >> > > > > > epochs
> > > > >> > > > > > > > > > > >> re-used,
> > > > >> > > > > > > > > > > >> > and
> > > > >> > > > > > > > > > > >> > > > a single member epoch can correspond
> to
> > > > >> multiple
> > > > >> > > > > > different
> > > > >> > > > > > > > > > > >> > (monotonically
> > > > >> > > > > > > > > > > >> > > > larger) assignments?
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > 2. Is the Assignor's 'Reason' field
> > > opaque
> > > > >> to the
> > > > >> > > > group
> > > > >> > > > > > > > > > > >> coordinator? If
> > > > >> > > > > > > > > > > >> > > > not, should custom client-side
> assignor
> > > > >> > > > implementations
> > > > >> > > > > > > > > interact
> > > > >> > > > > > > > > > > >> with
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > > Reason field, and how is its common
> > > meaning
> > > > >> agreed
> > > > >> > > > > > upon? If
> > > > >> > > > > > > > > so, what
> > > > >> > > > > > > > > > > >> > is the
> > > > >> > > > > > > > > > > >> > > > benefit of a distinct Reason field
> over
> > > > >> including
> > > > >> > > > such
> > > > >> > > > > > > > > functionality
> > > > >> > > > > > > > > > > >> > in the
> > > > >> > > > > > > > > > > >> > > > opaque metadata?
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > 3. The following is included in the
> KIP:
> > > > >> "Thanks to
> > > > >> > > > > > this, the
> > > > >> > > > > > > > > input
> > > > >> > > > > > > > > > > >> of
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > > client side assignor is entirely
> driven
> > > by
> > > > >> the group
> > > > >> > > > > > > > > coordinator.
> > > > >> > > > > > > > > > > >> The
> > > > >> > > > > > > > > > > >> > > > consumer is no longer responsible for
> > > > >> maintaining
> > > > >> > > > any
> > > > >> > > > > > state
> > > > >> > > > > > > > > besides
> > > > >> > > > > > > > > > > >> its
> > > > >> > > > > > > > > > > >> > > > assigned partitions." Does this mean
> > > that the
> > > > >> > > > > > client-side
> > > > >> > > > > > > > > assignor
> > > > >> > > > > > > > > > > >> MAY
> > > > >> > > > > > > > > > > >> > > > incorporate additional non-Metadata
> state
> > > > >> (such as
> > > > >> > > > > > partition
> > > > >> > > > > > > > > > > >> > throughput,
> > > > >> > > > > > > > > > > >> > > > cpu/memory metrics, config topics,
> etc),
> > > or
> > > > >> that
> > > > >> > > > > > additional
> > > > >> > > > > > > > > > > >> > non-Metadata
> > > > >> > > > > > > > > > > >> > > > state SHOULD NOT be used?
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > 4. I see that there are separate
> classes
> > > > >> > > > > > > > > > > >> > > > for
> > > > >> > > > > > org.apache.kafka.server.group.consumer.PartitionAssignor
> > > > >> > > > > > > > > > > >> > > > and
> > > > >> > > > org.apache.kafka.clients.consumer.PartitionAssignor
> > > > >> > > > > > that
> > > > >> > > > > > > > > seem to
> > > > >> > > > > > > > > > > >> > > > overlap significantly. Is it
> possible for
> > > > >> these two
> > > > >> > > > > > > > > implementations
> > > > >> > > > > > > > > > > >> to
> > > > >> > > > > > > > > > > >> > be
> > > > >> > > > > > > > > > > >> > > > unified? This would serve to promote
> > > feature
> > > > >> parity
> > > > >> > > > of
> > > > >> > > > > > > > > server-side
> > > > >> > > > > > > > > > > >> and
> > > > >> > > > > > > > > > > >> > > > client-side assignors, and would also
> > > > >> facilitate
> > > > >> > > > > > operational
> > > > >> > > > > > > > > > > >> > flexibility in
> > > > >> > > > > > > > > > > >> > > > certain situations. For example, if a
> > > > >> server-side
> > > > >> > > > > > assignor
> > > > >> > > > > > > > > has some
> > > > >> > > > > > > > > > > >> > poor
> > > > >> > > > > > > > > > > >> > > > behavior and needs a patch,
> deploying the
> > > > >> patched
> > > > >> > > > > > assignor to
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > >> > client
> > > > >> > > > > > > > > > > >> > > > and switching one consumer group to a
> > > > >> client-side
> > > > >> > > > > > assignor
> > > > >> > > > > > > > > may be
> > > > >> > > > > > > > > > > >> > faster
> > > > >> > > > > > > > > > > >> > > > and less risky than patching all of
> the
> > > > >> brokers.
> > > > >> > > > With
> > > > >> > > > > > the
> > > > >> > > > > > > > > currently
> > > > >> > > > > > > > > > > >> > > > proposed distinct APIs, a non-trivial
> > > > >> > > > reimplementation
> > > > >> > > > > > would
> > > > >> > > > > > > > > have
> > > > >> > > > > > > > > > > >> to be
> > > > >> > > > > > > > > > > >> > > > assembled, and if the two APIs have
> > > diverged
> > > > >> > > > > > significantly,
> > > > >> > > > > > > > > then it
> > > > >> > > > > > > > > > > >> is
> > > > >> > > > > > > > > > > >> > > > possible that a reimplementation
> would
> > > not be
> > > > >> > > > possible.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > --
> > > > >> > > > > > > > > > > >> > > > Greg Harris
> > > > >> > > > > > > > > > > >> > > > gharris1...@gmail.com
> > > > >> > > > > > > > > > > >> > > > github.com/gharris1727
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > On Wed, Aug 3, 2022 at 8:39 AM Sagar
> <
> > > > >> > > > > > > > > sagarmeansoc...@gmail.com>
> > > > >> > > > > > > > > > > >> > wrote:
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > > Hi Guozhang/David,
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > I created a confluence page to
> discuss
> > > how
> > > > >> Connect
> > > > >> > > > > > would
> > > > >> > > > > > > > > need to
> > > > >> > > > > > > > > > > >> > change
> > > > >> > > > > > > > > > > >> > > > > based on the new rebalance
> protocol.
> > > > >> Here's the
> > > > >> > > > page:
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > It's also pretty longish and I have
> > > tried
> > > > >> to keep
> > > > >> > > > a
> > > > >> > > > > > format
> > > > >> > > > > > > > > > > >> similar to
> > > > >> > > > > > > > > > > >> > > > > KIP-848. Let me know what you
> think.
> > > Also,
> > > > >> do you
> > > > >> > > > > > think this
> > > > >> > > > > > > > > > > >> should
> > > > >> > > > > > > > > > > >> > be
> > > > >> > > > > > > > > > > >> > > > > moved to a separate discussion
> thread
> > > or
> > > > >> is this
> > > > >> > > > one
> > > > >> > > > > > fine?
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > Thanks!
> > > > >> > > > > > > > > > > >> > > > > Sagar.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > On Tue, Jul 26, 2022 at 7:37 AM
> Sagar <
> > > > >> > > > > > > > > sagarmeansoc...@gmail.com>
> > > > >> > > > > > > > > > > >> > wrote:
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > > Hello Guozhang,
> > > > >> > > > > > > > > > > >> > > > > >
> > > > >> > > > > > > > > > > >> > > > > > Thank you so much for the doc on
> > > Kafka
> > > > >> Streams.
> > > > >> > > > > > Sure, I
> > > > >> > > > > > > > > would do
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > > > > analysis and come up with such a
> > > > >> document.
> > > > >> > > > > > > > > > > >> > > > > >
> > > > >> > > > > > > > > > > >> > > > > > Thanks!
> > > > >> > > > > > > > > > > >> > > > > > Sagar.
> > > > >> > > > > > > > > > > >> > > > > >
> > > > >> > > > > > > > > > > >> > > > > > On Tue, Jul 26, 2022 at 4:47 AM
> > > Guozhang
> > > > >> Wang <
> > > > >> > > > > > > > > > > >> wangg...@gmail.com>
> > > > >> > > > > > > > > > > >> > > > > wrote:
> > > > >> > > > > > > > > > > >> > > > > >
> > > > >> > > > > > > > > > > >> > > > > >> Hello Sagar,
> > > > >> > > > > > > > > > > >> > > > > >>
> > > > >> > > > > > > > > > > >> > > > > >> It would be great if you could
> come
> > > > >> back with
> > > > >> > > > some
> > > > >> > > > > > > > > analysis on
> > > > >> > > > > > > > > > > >> > how to
> > > > >> > > > > > > > > > > >> > > > > >> implement the Connect side
> > > integration
> > > > >> with
> > > > >> > > > the new
> > > > >> > > > > > > > > protocol;
> > > > >> > > > > > > > > > > >> so
> > > > >> > > > > > > > > > > >> > far
> > > > >> > > > > > > > > > > >> > > > > >> besides leveraging on the new
> > > "protocol
> > > > >> type"
> > > > >> > > > we
> > > > >> > > > > > did not
>


-- 
-- Guozhang

Reply via email to