Boyang Chen created KAFKA-7018:
----------------------------------
Summary: persist memberId for consumer restart
Key: KAFKA-7018
URL: https://issues.apache.org/jira/browse/KAFKA-7018
Project: Kafka
Issue Type: Improvement
Components: consumer, streams
Reporter: Boyang Chen
Assignee: Boyang Chen
In group coordinator, there is a logic to neglect join group request from
existing follower consumers:
{code:java}
case Empty | Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId,
clientHost, protocolType, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader
sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes
affecting assignment
// which do not affect the member metadata (such as topic metadata
changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
// for followers with no actual change to their metadata, just return
group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
{code}
While looking at the AbstractCoordinator, I found that the generation was
hard-coded as
NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the
first join group request. This means we will treat the restarted consumer as a
new member, so the rebalance will be triggered until session timeout.
I'm trying to clarify the following things before we extend the discussion:
# Whether my understanding of the above logic is right (Hope [~mjsax] could
help me double check)
# Whether it makes sense to persist last round of memberId for consumers? We
currently only need this feature in stream application, but will do no harm if
we also use it for consumer in general. This would be a nice-to-have feature on
consumer restart when we configured the loading-previous-memberId to true. If
we failed, simply use the UNKNOWN_MEMBER_ID
# The behavior could also be changed on the broker side, but I suspect it is
very risky. So far client side change should be the least effort. The end goal
is to avoid excessive rebalance from the same consumer restart, so if you feel
server side change could also help, we could further discuss.
Thank you for helping out! [~mjsax] [~guozhang]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)