Hey Dong, thanks for the follow-up here!

1) It is not very clear to the user what is the difference between
member.name and client.id as both seems to be used to identify the
consumer. I am wondering if it would be more intuitive to name it
group.member.name (preferred choice since it matches the current group.id
config name) or rebalance.member.name to explicitly show that the id is
solely used for rebalance.
Great question. I feel `member.name` is enough to explain itself, it seems not 
very
helpful to make the config name longer. Comparing `name` with `id` gives user 
the
impression that they have the control over it with customized rule than library 
decided.

2) In the interface change section it is said that GroupMaxSessionTimeoutMs
will be changed to 30 minutes. It seems to suggest that we will change the
default value of this config. It does not seem necessary to increase the
time of consumer failure detection when user doesn't use static membership.
Also, say static membership is enabled, then this default config change
will cause a partition to be unavailable for consumption for 30 minutes if
there is hard consumer failure, which seems to be worse experience than
having unnecessary rebalance (when this timeout is small), particularly for
new users of Kafka. Could you explain more why we should make this change?
We are not changing the default session timeout value. We are just changing the
cap we are enforcing on the session timeout max value. So this change is not 
affecting
what kind of membership end user is using, and loosing the cap is giving end 
user
more flexibility on trade-off between liveness and stability.

3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
into one error? It seems that these two errors are currently handled by the
consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
to happen. Thus it is not clear what is the benefit of having two errors.
I agree that we should remove DUPLICATE_STATIC_MEMBER error because with the 
KIP-394<https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member+id+for+initial+join+group+request>
we will automatically fence all join requests with UNKNOWN_MEMBER_ID.

4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
member name which is already in the consumer group, however the member id
was missing". After a consumer is restarted, it will send a
JoinGroupRequest with an existing memberName (as the coordinator has not
expired this member from the memory) and memberId
= JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
across consumer restart in the consumer side). Does it mean that
JoinGroupRequest from a newly restarted consumer will always be rejected
until the sessionTimeoutMs has passed?
Same answer as question 3). This part of the logic shall be removed from the 
proposal.

5) It seems that we always add two methods to the interface
org.apache.kafka.clients.admin.AdminClient.java, one with options and the
other without option. Could this be specified in the interface change
section?
Sounds good! Added both methods.

6) Do we plan to have off-the-shelf command line tool for SRE to trigger
rebalance? If so, we probably want to specify the command line tool
interface similar to
https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&amp;reserved=0
.
Added the script.

7) Would it be simpler to replace name "forceStaticRebalance" with
"invokeConsumerRebalance"? It is not very clear what is the extra meaning
of world "force" as compared to "trigger" or "invoke". And it seems simpler
to allows this API to trigger rebalance regardless of whether consumer is
configured with memberName.
Sounds good. Right now I feel for both static and dynamic membership it is
more manageable to introduce the consumer rebalance method through admin
client API.

8) It is not very clear how the newly added AdminClient API trigger
rebalance. For example, does it send request? Can this be explained in the
KIP?

Sure, I will add more details to the API.


Thanks again for the helpful suggestions!


Best,
Boyang

________________________________
From: Dong Lin <lindon...@gmail.com>
Sent: Saturday, November 24, 2018 2:54 PM
To: dev
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hey Boyang,

Thanks for the update! Here are some followup comments:

1) It is not very clear to the user what is the difference between
member.name and client.id as both seems to be used to identify the
consumer. I am wondering if it would be more intuitive to name it
group.member.name (preferred choice since it matches the current group.id
config name) or rebalance.member.name to explicitly show that the id is
solely used for rebalance.

2) In the interface change section it is said that GroupMaxSessionTimeoutMs
will be changed to 30 minutes. It seems to suggest that we will change the
default value of this config. It does not seem necessary to increase the
time of consumer failure detection when user doesn't use static membership.
Also, say static membership is enabled, then this default config change
will cause a partition to be unavailable for consumption for 30 minutes if
there is hard consumer failure, which seems to be worse experience than
having unnecessary rebalance (when this timeout is small), particularly for
new users of Kafka. Could you explain more why we should make this change?

3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
into one error? It seems that these two errors are currently handled by the
consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
to happen. Thus it is not clear what is the benefit of having two errors.

4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
member name which is already in the consumer group, however the member id
was missing". After a consumer is restarted, it will send a
JoinGroupRequest with an existing memberName (as the coordinator has not
expired this member from the memory) and memberId
= JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
across consumer restart in the consumer side). Does it mean that
JoinGroupRequest from a newly restarted consumer will always be rejected
until the sessionTimeoutMs has passed?

5) It seems that we always add two methods to the interface
org.apache.kafka.clients.admin.AdminClient.java, one with options and the
other without option. Could this be specified in the interface change
section?

6) Do we plan to have off-the-shelf command line tool for SRE to trigger
rebalance? If so, we probably want to specify the command line tool
interface similar to
https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&amp;reserved=0
.

7) Would it be simpler to replace name "forceStaticRebalance" with
"invokeConsumerRebalance"? It is not very clear what is the extra meaning
of world "force" as compared to "trigger" or "invoke". And it seems simpler
to allows this API to trigger rebalance regardless of whether consumer is
configured with memberName.

8) It is not very clear how the newly added AdminClient API trigger
rebalance. For example, does it send request? Can this be explained in the
KIP?

Thanks,
Dong


On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <bche...@outlook.com> wrote:

> Hey Mayuresh,
>
>
> thanks for your feedbacks! I will try do another checklist here.
>
>
> > By this you mean, even if the application has not called
> > KafkaConsumer.poll() within session timeout, it will not be sending the
> > LeaveGroup request, right?
>
> Yep it's true, we will prevent client from sending leave group request
> when they are set with `member.name`.
>
>
> > When is the member.name removed from this map?
> Good question, we will only kick off member due to session timeout within
> static membership. Let me update the KIP to clearly assert that.
>
> > How is this case (missing member id) handled on the client side? What is
> the application that
> > is using the KafkaConsumer suppose to do in this scenario?
> I have extended the two exceptions within join group response V4.
> Basically I define both corresponding actions to be immediate failing
> client application, because so far it is unknown what kind of client issue
> could trigger them. After the first version, we will keep enhance the error
> handling logic!
>
> > This would mean that it might take more time to detect unowned topic
> > partitions and may cause delay for applications that perform data
> mirroring
> > tasks. I discussed this with our sre and we have a suggestion to make
> here
> > as listed below separately.
> The goal of extending session timeout cap is for users with good client
> side monitoring tools that could auto-heal the dead consumers very fast. So
> it is optional (and personal) to extend session timeout to a reasonable
> number with different client scenarios.
>
> > you meant remove unjoined members of the group, right ?
> Yep, there is a typo. Thanks for catching this!
>
> > What do you mean by " Internally we would optimize this logic by having
> > rebalance timeout only in charge of stopping prepare rebalance stage,
> > without removing non-responsive members immediately." There would not be
> a
> > full rebalance if the lagging consumer sent a JoinGroup request later,
> > right ? If yes, can you highlight this in the KIP ?
> No, there won't be. We want to limit the rebalance timeout functionality
> to only use as a timer to
> end prepare rebalance stage. This way, late joining static members will
> not trigger further rebalance
> as long as they are within session timeout. I added your highlight to the
> KIP!
>
> > The KIP talks about scale up scenario but its not quite clear how we
> > handle it. Are we adding a separate "expansion.timeout" or we adding
> status
> > "learner" ?. Can you shed more light on how this is handled in the KIP,
> if
> > its handled?
> Updated the KIP: we shall not cover scale up case in 345, because we
> believe client side could
> better handle this logic.
>
> > I think Jason had brought this up earlier about having a way to say how
> > many members/consumer hosts are you choosing to be in the consumer group.
> > If we can do this, then in case of mirroring applications we can do this
> :
> > Lets say we have a mirroring application that consumes from Kafka cluster
> > A and produces to Kafka cluster B.
> > Depending on the data and the Kafka cluster configuration, Kafka service
> > providers can set a mirroring group saying that it will take, for example
> > 300 consumer hosts/members to achieve the desired throughput and latency
> > for mirroring and can have additional 10 consumer hosts as spare in the
> > same group.
> > So when the first 300 members/consumers to join the group will start
> > mirroring the data from Kafka cluster A to Kafka cluster B.
> > The remaining 10 consumer members can sit idle.
> > The moment one of the consumer (for example: consumer number 54) from the
> > first 300 members go out of the group (crossed session timeout), it (the
> > groupCoordinator) can just assign the topicPartitions from the consumer
> > member 54 to one of the spare hosts.
> > Once the consumer member 54 comes back up, it can start as being a part
> of
> > the spare pool.
> > This enables us to have lower session timeouts and low latency mirroring,
> > in cases where the service providers are OK with having spare hosts.
> > This would mean that we would tolerate n consumer members leaving and
> > rejoining the group and still provide low latency as long as n <= number
> of
> > spare consumers.
> > If there are no spare host available, we can get back to the idea as
> > described in the KIP.
> Great idea! In fact on top of static membership we could later introduce
> APIs to set hard-coded
> client ids to the group and replace the dead host, or as you proposed to
> define spare host as
> what I understood as hot backup. I will put both Jason and your
> suggestions into a separate section
> called "Future works". Note that this spare host idea may be also solvable
> through rebalance protocol
> IMO.
>
> Thank you again for the great feedback!
>
> Boyang
> ________________________________
> From: Boyang Chen <bche...@outlook.com>
> Sent: Thursday, November 22, 2018 3:39 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hey Dong, sorry for missing your message. I couldn't find your email on my
> thread, so I will just do a checklist here!
>
>
> 1) The motivation currently explicitly states that the goal is to improve
>
> performance for heavy state application. It seems that the motivation can
>
> be stronger with the following use-case. Currently for MirrorMaker cluster
>
> with e.g. 100 MirrorMaker processes, it will take a long time to rolling
>
> bounce the entire MirrorMaker cluster. Each MirrorMaker process restart
>
> will trigger a rebalance which currently pause the consumption of the all
>
> partitions of the MirrorMaker cluster. With the change stated in this
>
> patch, as long as a MirrorMaker can restart within the specified timeout
>
> (e.g. 2 minutes), then we only need constant number of rebalance (e.g. for
>
> leader restart) for the entire rolling bounce, which will significantly
>
> improves the availability of the MirrorMaker pipeline. In my opinion, the
>
> main benefit of the KIP is to avoid unnecessary rebalance if the consumer
>
> process can be restarted within soon, which helps performance even if
>
> overhead of state shuffling for a given process is small.
>
> I just rephrased this part and added it to the KIP. Thanks for making the
> motivation more solid!
>
> 2) In order to simplify the KIP reading, can you follow the writeup style
> of other KIP (e.g. KIP-98) and list the interface change such as new
> configs (e.g. registration timeout), new request/response, new AdminClient
> API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently some of
> these are specified in the Proposed Change section which makes it a bit
> inconvenient to understand the new interface that will be exposed to user.
> Explanation of the current two-phase rebalance protocol probably can be
> moved out of public interface section.
> This is a great suggestion! I just consolidated all the public API
> changes, and the whole KIP
> looks much more organized!
>
> 3) There are currently two version of JoinGroupRequest in the KIP and only
> one of them has field memberId. This seems confusing.
> Yep, I already found this issue and fixed it.
>
> 4) It is mentioned in the KIP that "An admin API to force rebalance could
> be helpful here, but we will make a call once we finished the major
> implementation". So this seems to be still an open question in the current
> design. We probably want to agree on this before voting for the KIP.
> We have finalized the idea that this API is needed.
>
> 5) The KIP currently adds new config MEMBER_NAME for consumer. Can you
> specify the name of the config key and the default config value? Possible
> default values include empty string or null (similar to transaction.id<
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=kkCtxHpsAZpbJZpUc52dtv6ac8UJOx6CQlts3CPjDh8%3D&amp;reserved=0>
> in
> producer config).
> I have defined the `member.name` in "New configuration" section.
>
> 6) Regarding the use of the topic "static_member_map" to persist member
> name map, currently if consumer coordinator broker goes offline, rebalance
> is triggered and consumers will try connect to the new coordinator. If
> these consumers can connect to the new coordinator within
> max.poll.interval.ms<
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&amp;reserved=0>
> which by default is 5 minutes, given that broker can
> use a deterministic algorithm to determine the partition -> member_name
> mapping, each consumer should get assigned the same set of partitions
> without requiring state shuffling. So it is not clear whether we have a
> strong use-case for this new logic. Can you help clarify what is the
> benefit of using topic "static_member_map" to persist member name map?
> I have discussed with Guozhang offline, and I believe reusing the current
> `_consumer_offsets`
> topic is a better and unified solution.
>
> 7) Regarding the introduction of the expensionTimeoutMs config, it is
> mentioned that "we are using expansion timeout to replace rebalance
> timeout, which is configured by max.poll.intervals from client side, and
> using registration timeout to replace session timeout". Currently the
> default max.poll.interval.ms<
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&amp;reserved=0>
> is configured to be 5 minutes and there will
> be only one rebalance if all new consumers can join within 5 minutes. So it
> is not clear whether we have a strong use-case for this new config. Can you
> explain what is the benefit of introducing this new config?
> Previously our goal is to use expansion timeout as a workaround for
> triggering multiple
> rebalances when scaling up members are not joining at the same time. It is
> decided to
> be addressed by client side protocol change, so we will not introduce
> expansion timeout.
>
> 8) It is mentioned that "To distinguish between previous version of
> protocol, we will also increase the join group request version to v4 when
> MEMBER_NAME is set" and "If the broker version is not the latest (< v4),
> the join group request shall be downgraded to v3 without setting the member
> Id". It is probably simpler to just say that this feature is enabled if
> JoinGroupRequest V4 is supported on both client and broker and MEMBER_NAME
> is configured with non-empty string.
> Yep, addressed this!
>
> 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET error
> in OffsetCommitResponse for "commit requests under static membership". Can
> you clarify how broker determines whether the commit request is under
> static membership?
>
> We have agreed that commit request shouldn't be affected by the new
> membership, thus
> removing it here. Thanks for catching this!
>
> Let me know if you have further suggestions or concerns. Thank you for
> your valuable feedback
> to help me design the KIP better! (And I will try to address your
> feedbacks in next round Mayuresh ??)
>
> Best,
> Boyang
> ________________________________
> From: Mayuresh Gharat <gharatmayures...@gmail.com>
> Sent: Wednesday, November 21, 2018 7:50 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hi Boyang,
>
> Thanks for updating the KIP. This is a step good direction for stateful
> applications and also mirroring applications whose latency is affected due
> to the rebalance issues that we have today.
>
> I had a few questions on the current version of the KIP :
> For the effectiveness of the KIP, consumer with member.name set will *not
> send leave group request* when they go offline
>
> > By this you mean, even if the application has not called
> > KafkaConsumer.poll() within session timeout, it will not be sending the
> > LeaveGroup request, right?
> >
>
> Broker will maintain an in-memory mapping of {member.name ? member.id} to
> track member uniqueness.
>
> > When is the member.name removed from this map?
> >
>
> Member.id must be set if the *member.name <
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=aSTzJlF4GPFGKhi5K7MPZozMn67718GWpqEYxFay%2BZs%3D&amp;reserved=0>
> *is already
> within the map. Otherwise reply MISSING_MEMBER_ID
>
> > How is this case handled on the client side? What is the application that
> > is using the KafkaConsumer suppose to do in this scenario?
> >
>
> Session timeout is the timeout we will trigger rebalance when a member goes
> offline for too long (not sending heartbeat request). To make static
> membership effective, we should increase the default max session timeout to
> 30 min so that end user could config it freely.
>
> > This would mean that it might take more time to detect unowned topic
> > partitions and may cause delay for applications that perform data
> mirroring
> > tasks. I discussed this with our sre and we have a suggestion to make
> here
> > as listed below separately.
> >
>
> Currently there is a config called *rebalance timeout* which is configured
> by consumer *max.poll.intervals*. The reason we set it to poll interval is
> because consumer could only send request within the call of poll() and we
> want to wait sufficient time for the join group request. When reaching
> rebalance timeout, the group will move towards completingRebalance stage
> and remove unjoined groups
>
> > you meant remove unjoined members of the group, right ?
> >
>
> Currently there is a config called *rebalance timeout* which is configured
> by consumer *max.poll.intervals*. The reason we set it to poll interval is
> because consumer could only send request within the call of poll() and we
> want to wait sufficient time for the join group request. When reaching
> rebalance timeout, the group will move towards completingRebalance stage
> and remove unjoined groups. This is actually conflicting with the design of
> static membership, because those temporarily unavailable members will
> potentially reattempt the join group and trigger extra rebalances.
> Internally we would optimize this logic by having rebalance timeout only in
> charge of stopping prepare rebalance stage, without removing non-responsive
> members immediately.
>
> > What do you mean by " Internally we would optimize this logic by having
> > rebalance timeout only in charge of stopping prepare rebalance stage,
> > without removing non-responsive members immediately." There would not be
> a
> > full rebalance if the lagging consumer sent a JoinGroup request later,
> > right ? If yes, can you highlight this in the KIP ?
> >
>
> Scale Up
>
> > The KIP talks about scale up scenario but its not quite clear how we
> > handle it. Are we adding a separate "expansion.timeout" or we adding
> status
> > "learner" ?. Can you shed more light on how this is handled in the KIP,
> if
> > its handled?
> >
>
>
> *Discussion*
> Larger session timeouts causing latency rise for getting data for un-owned
> topic partitions :
>
> > I think Jason had brought this up earlier about having a way to say how
> > many members/consumer hosts are you choosing to be in the consumer group.
> > If we can do this, then in case of mirroring applications we can do this
> :
> > Lets say we have a mirroring application that consumes from Kafka cluster
> > A and produces to Kafka cluster B.
> > Depending on the data and the Kafka cluster configuration, Kafka service
> > providers can set a mirroring group saying that it will take, for example
> > 300 consumer hosts/members to achieve the desired throughput and latency
> > for mirroring and can have additional 10 consumer hosts as spare in the
> > same group.
> > So when the first 300 members/consumers to join the group will start
> > mirroring the data from Kafka cluster A to Kafka cluster B.
> > The remaining 10 consumer members can sit idle.
> > The moment one of the consumer (for example: consumer number 54) from the
> > first 300 members go out of the group (crossed session timeout), it (the
> > groupCoordinator) can just assign the topicPartitions from the consumer
> > member 54 to one of the spare hosts.
> > Once the consumer member 54 comes back up, it can start as being a part
> of
> > the spare pool.
> > This enables us to have lower session timeouts and low latency mirroring,
> > in cases where the service providers are OK with having spare hosts.
> > This would mean that we would tolerate n consumer members leaving and
> > rejoining the group and still provide low latency as long as n <= number
> of
> > spare consumers.
> > If there are no spare host available, we can get back to the idea as
> > described in the KIP.
> >
>
> Thanks,
>
> Mayuresh
>
>
>
>
>
> On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi Boyang.
> >
> > Thanks for preparing this KIP! It is making good progress and will be a
> > great improvement for stateful Kafka applications.
> >
> > Apologies for my late reply, I was away for a while. Lots of great
> comments
> > so far, so I'll probably second most of them in what I suggest below at
> > this point.
> >
> > When I first read the KIP, I wanted to start at the end with something
> that
> > wasn't highlighted a lot. That was the topic related to handling
> duplicate
> > members. I see now that the initial suggestion of handling this situation
> > during offset commit has been removed, and I agree with that. Issues
> > related to membership seem to be handled better when the member joins the
> > group rather than when it tries to commit offsets. This also simplifies
> how
> > many request types need to change in order to incorporate the new member
> > name field.
> >
> > I also agree with what Jason and Guozhang have said regarding timeouts.
> > Although semantically, it's easier to think of every operation having its
> > own timeout, operationally this can become a burden. Thus, consolidation
> > seems preferable here. The definition of embedded protocols on top of the
> > base group membership protocol for rebalancing gives enough flexibility
> to
> > address such needs in each client component separately.
> >
> > Finally, some minor comments:
> > In a few places the new/proposed changes are referred to as "current".
> > Which is a bit confusing considering that there is a protocol in place
> > already, and by "current" someone might understand the existing one. I'd
> > recommend using new/proposed or equivalent when referring to changes
> > introduced with KIP-345 and current/existing or equivalent when referring
> > to existing behavior.
> >
> > There's the following sentence in the "Public Interfaces" section:
> > "Since for many stateful consumer/stream applications, the state
> shuffling
> > is more painful than short time partial unavailability."
> > However, my understanding is that the changes proposed with KIP-345 will
> > not exploit any partial availability. A suggestion for dealing with
> > temporary imbalances has been made in "Incremental Cooperative
> Rebalancing"
> > which can work well with KIP-345, but here I don't see proposed changes
> > that suggest that some resources (e.g. partitions) will keep being used
> > while others will not be utilized. Thus, you might want to adjust this
> > sentence. Correct me if I'm missing something related to that.
> >
> > In the rejected alternatives, under point 2) I read "we can copy the
> member
> > id to the config files". I believe it means to say "member name" unless
> I'm
> > missing something about reusing member ids. Also below I read: "By
> allowing
> > consumers to optionally specifying a member id" which probably implies
> > "member name" again. In a sense this section highlights a potential
> > confusion between member name and member id. I wonder if we could come up
> > with a better term for the new field. StaticTag, StaticLabel, or even
> > StaticName are some suggestions that could potentially help with
> confusion
> > between MemberId and MemberName and what corresponds to what. But I
> > wouldn't like to disrupt the discussion with naming conventions too much
> at
> > this point. I just mention it here as a thought.
> >
> > Looking forward to see the final details of this KIP. Great work so far!
> >
> > Konstantine
> >
> >
> > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <bche...@outlook.com> wrote:
> >
> > > Thanks Guozhang for the great summary here, and I have been following
> up
> > > the action items here.
> > >
> > >
> > >   1.  I already updated the KIP to remove the expansion timeout and
> > > registration timeout. Great to see them being addressed in client side!
> > >   2.  I double checked the design and I believe that it is ok to have
> > both
> > > static member and dynamic member co-exist in the same group. So the
> > upgrade
> > > shouldn't be destructive and we are removing the two membership
> protocol
> > > switching APIs.
> > >   3.  I only have question about this one. I'm still reading the
> > KafkaApis
> > > code here. Should I just use the same authorization logic for
> > > ForceStaticRebalanceRequest as JoinGroupRequest?
> > >   4.  I'm very excited to see this work with K8! Like you suggested,
> this
> > > feature could be better addressed in a separate KIP because it is
> pretty
> > > independent. I could start drafting the KIP once the current proposal
> is
> > > approved.
> > >   5.  I believe that we don't need fencing in offset commit request,
> > since
> > > duplicate member.name issue could be handled by join group request. We
> > > shall reject join group with known member name but no member id (which
> > > means we already have an active member using this identity).
> > >   6.  I agree to remove that internal config once we move forward with
> > > static membership. And I already removed the entire section from the
> KIP.
> > >
> > > Let me know if you have other concerns.
> > >
> > > Best,
> > > Boyang
> > > ________________________________
> > > From: Guozhang Wang <wangg...@gmail.com>
> > > Sent: Tuesday, November 20, 2018 4:21 PM
> > > To: dev
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Hello Boyang,
> > >
> > > Thanks a lot for the KIP! It is a great write-up and I appreciate your
> > > patience answering to the feedbacks from the community. I'd like to add
> > my
> > > 2cents here:
> > >
> > > 1. By introducing another two timeout configs, registration_timeout and
> > > expansion_timeout, we are effectively having four timeout configs:
> > session
> > > timeout, rebalance timeout (configured as "max.poll.interval.ms" on
> > client
> > > side), and these two. Interplaying these timeout configs can be quite
> > hard
> > > for users with such complexity, and hence I'm wondering if we can
> > simplify
> > > the situation with as less possible timeout configs as possible. Here
> is
> > a
> > > concrete suggestion I'd like propose:
> > >
> > > 1.a) Instead of introducing a registration_timeout in addition to the
> > > session_timeout for static members, we can just reuse the
> session_timeout
> > > and ask users to set it to a larger value when they are upgrading a
> > dynamic
> > > client to a static client by setting the "member.name" at the same
> time.
> > > By
> > > default, the broker-side min.session.timeout is 6 seconds and
> > > max.session.timeout is 5 minutes, which seems reasonable to me (we can
> of
> > > course modify this broker config to enlarge the valid interval if we
> want
> > > in practice). And then we should also consider removing the condition
> for
> > > marking a client as failed if the rebalance timeout has reached while
> the
> > > JoinGroup was not received, so that the semantics of session_timeout
> and
> > > rebalance_timeout are totally separated: the former is only used to
> > > determine if a consumer member of the group should be marked as failed
> > and
> > > kicked out of the group, and the latter is only used to determine the
> > > longest time coordinator should wait for PREPARE_REBALANCE phase. In
> > other
> > > words if a member did not send the JoinGroup in time of the
> > > rebalance_timeout, we still include it in the new generation of the
> group
> > > and use its old subscription info to send to leader for assignment.
> Later
> > > if the member came back with HeartBeat request, we can still follow the
> > > normal path to bring it to the latest generation while checking that
> its
> > > sent JoinGroup request contains the same subscription info as we used
> to
> > > assign the partitions previously (which should be likely the case in
> > > practice). In addition, we should let static members to not send the
> > > LeaveGroup request when it is gracefully shutdown, so that a static
> > member
> > > can only be leaving the group if its session has timed out, OR it has
> > been
> > > indicated to not exist in the group any more (details below).
> > >
> > > 1.b) We have a parallel discussion about Incremental Cooperative
> > > Rebalancing, in which we will encode the "when to rebalance" logic at
> the
> > > application level, instead of at the protocol level. By doing this we
> can
> > > also enable a few other optimizations, e.g. at the Streams level to
> first
> > > build up the state store as standby tasks and then trigger a second
> > > rebalance to actually migrate the active tasks while keeping the actual
> > > rebalance latency and hence unavailability window to be small (
> > >
> > >
> >
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=ZJlsB%2FHLhZykd9mtu5CINNNEqMBvX75bdhqR3IlxGI8%3D&amp;reserved=0
> > ).
> > > I'd propose we align
> > > KIP-345 along with this idea, and hence do not add the
> expansion_timeout
> > as
> > > part of the protocol layer, but only do that at the application's
> > > coordinator / assignor layer (Connect, Streams, etc). We can still,
> > > deprecate the "*group.initial.rebalance.delay.ms
> > > <
> > >
> >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=DDhjV41nPU3euYCQ3w8WPENuw9fPB6ah2j6rF0JjRBg%3D&amp;reserved=0
> > >*"
> > > though as part of this KIP
> > > since we have discussed about its limit and think it is actually not a
> > very
> > > good design and could be replaced with client-side logic above.
> > >
> > >
> > > 2. I'd like to see your thoughts on the upgrade path for this KIP. More
> > > specifically, let's say after we have upgraded broker version to be
> able
> > to
> > > recognize the new versions of JoinGroup request and the admin requests,
> > how
> > > should we upgrade the clients and enable static groups? On top of my
> head
> > > if we do a rolling bounce in which we set the member.name config as
> well
> > > as
> > > optionally increase the session.timeout config when we bounce each
> > > instance, then during this rolling bounces we will have a group
> contained
> > > with both dynamic members and static members. It means that we should
> > have
> > > the group to allow such scenario (i.e. we cannot reject JoinGroup
> > requests
> > > from dynamic members), and hence the "member.name" -> "member.id"
> > mapping
> > > will only be partial at this scenario. Also could you describe if the
> > > upgrade to the first version that support this feature would ever get
> any
> > > benefits, or only the future upgrade path for rolling bounces could get
> > > benefits out of this feature?
> > >
> > > If that's the case and we will do 1) as suggested above, do we still
> need
> > > the enableStaticMembership and enableDynamicMembership admin requests
> any
> > > more? Seems it is not necessary any more as we will only have the
> notion
> > of
> > > "dynamic or static members" that can co-exist in a group while there no
> > > notion of "dynamic or static groups", and hence these two requests are
> > not
> > > needed anymore.
> > >
> > >
> > > 3. We need to briefly talk about the implications for ACL as we
> introduce
> > > new admin requests that are related to a specific group.id. For
> example,
> > > we
> > > need to make sure that whoever created the group or joined the group
> can
> > > actually send admin requests for the group, otherwise the application
> > > owners need to bother the Kafka operators on a multi-tenant cluster
> every
> > > time they want to send any admin requests for their groups which would
> be
> > > an operational nightmare.
> > >
> > >
> > > 4. I like Jason's suggestion of adding an optional field for the list
> of
> > > member names, and I'm wondering if that can be done as part of the
> > > forceStaticRebalance request: i.e. by passing a list of members, we
> will
> > > enforce a rebalance immediately since it indicates that some static
> > member
> > > will be officially kicked out of the group and some new static members
> > may
> > > be added. So back to 1.a) above, a static member can only be kicked out
> > of
> > > the group if a) its session (arguably long period of time) has timed
> out,
> > > and b) this admin request explicitly state that it is no longer part of
> > the
> > > group. As for execution I'm fine with keeping it as a future work of
> this
> > > KIP if you'd like to make its scope smaller.
> > >
> > > Following are minor comments:
> > >
> > > 5. I'm not sure if we need to include "member.name" as part of the
> > > OffsetCommitRequest for fencing purposes, as I think the memberId plus
> > the
> > > generation number should be sufficient for fencing even with static
> > > members.
> > >
> > > 6. As mentioned above, if we agree to do 1) we can get rid of the "
> > > LEAVE_GROUP_ON_CLOSE_CONFIG" config.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Boyang,
> > > >
> > > > Thanks for the proposal! This is very useful. I have some comments
> > below:
> > > >
> > > > 1) The motivation currently explicitly states that the goal is to
> > improve
> > > > performance for heavy state application. It seems that the motivation
> > can
> > > > be stronger with the following use-case. Currently for MirrorMaker
> > > cluster
> > > > with e.g. 100 MirrorMaker processes, it will take a long time to
> > rolling
> > > > bounce the entire MirrorMaker cluster. Each MirrorMaker process
> restart
> > > > will trigger a rebalance which currently pause the consumption of the
> > all
> > > > partitions of the MirrorMaker cluster. With the change stated in this
> > > > patch, as long as a MirrorMaker can restart within the specified
> > timeout
> > > > (e.g. 2 minutes), then we only need constant number of rebalance
> (e.g.
> > > for
> > > > leader restart) for the entire rolling bounce, which will
> significantly
> > > > improves the availability of the MirrorMaker pipeline. In my opinion,
> > the
> > > > main benefit of the KIP is to avoid unnecessary rebalance if the
> > consumer
> > > > process can be restarted within soon, which helps performance even if
> > > > overhead of state shuffling for a given process is small.
> > > >
> > > > 2) In order to simplify the KIP reading, can you follow the writeup
> > style
> > > > of other KIP (e.g. KIP-98) and list the interface change such as new
> > > > configs (e.g. registration timeout), new request/response, new
> > > AdminClient
> > > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently some
> > of
> > > > these are specified in the Proposed Change section which makes it a
> bit
> > > > inconvenient to understand the new interface that will be exposed to
> > > user.
> > > > Explanation of the current two-phase rebalance protocol probably can
> be
> > > > moved out of public interface section.
> > > >
> > > > 3) There are currently two version of JoinGroupRequest in the KIP and
> > > only
> > > > one of them has field memberId. This seems confusing.
> > > >
> > > > 4) It is mentioned in the KIP that "An admin API to force rebalance
> > could
> > > > be helpful here, but we will make a call once we finished the major
> > > > implementation". So this seems to be still an open question in the
> > > current
> > > > design. We probably want to agree on this before voting for the KIP.
> > > >
> > > > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can
> you
> > > > specify the name of the config key and the default config value?
> > Possible
> > > > default values include empty string or null (similar to
> transaction.id
> > > in
> > > > producer config).
> > > >
> > > > 6) Regarding the use of the topic "static_member_map" to persist
> member
> > > > name map, currently if consumer coordinator broker goes offline,
> > > rebalance
> > > > is triggered and consumers will try connect to the new coordinator.
> If
> > > > these consumers can connect to the new coordinator within
> > > > max.poll.interval.ms which by default is 5 minutes, given that
> broker
> > > can
> > > > use a deterministic algorithm to determine the partition ->
> member_name
> > > > mapping, each consumer should get assigned the same set of partitions
> > > > without requiring state shuffling. So it is not clear whether we
> have a
> > > > strong use-case for this new logic. Can you help clarify what is the
> > > > benefit of using topic "static_member_map" to persist member name
> map?
> > > >
> > > > 7) Regarding the introduction of the expensionTimeoutMs config, it is
> > > > mentioned that "we are using expansion timeout to replace rebalance
> > > > timeout, which is configured by max.poll.intervals from client side,
> > and
> > > > using registration timeout to replace session timeout". Currently the
> > > > default max.poll.interval.ms is configured to be 5 minutes and there
> > > will
> > > > be only one rebalance if all new consumers can join within 5 minutes.
> > So
> > > it
> > > > is not clear whether we have a strong use-case for this new config.
> Can
> > > you
> > > > explain what is the benefit of introducing this new config?
> > > >
> > > > 8) It is mentioned that "To distinguish between previous version of
> > > > protocol, we will also increase the join group request version to v4
> > when
> > > > MEMBER_NAME is set" and "If the broker version is not the latest (<
> > v4),
> > > > the join group request shall be downgraded to v3 without setting the
> > > member
> > > > Id". It is probably simpler to just say that this feature is enabled
> if
> > > > JoinGroupRequest V4 is supported on both client and broker and
> > > MEMBER_NAME
> > > > is configured with non-empty string.
> > > >
> > > > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET
> > error
> > > > in OffsetCommitResponse for "commit requests under static
> membership".
> > > Can
> > > > you clarify how broker determines whether the commit request is under
> > > > static membership?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to