dajac commented on a change in pull request #10863: URL: https://github.com/apache/kafka/pull/10863#discussion_r652188256
########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int, group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) member.isNew = false + + group.addPendingSyncMember(member.memberId) } + + schedulePendingSync(group) + } + } + } + } + + private def removePendingSyncMember( + group: GroupMetadata, + memberId: String + ): Unit = { + group.removePendingSyncMember(memberId) + maybeCompleteSyncExpiration(group) + } + + private def removeSyncExpiration( Review comment: Definitely. I will add that. I cannot really think of any other doable alternatives. At least, we know that the leader has sent the SyncGroup as the group transitioned to stable. Throwing few ideas: * Could we store the pending set in the log? That does not seem really doable as the set could be updated before the write is acknowledged. Therefore we could think that a member is pending after a failover but it might not be. * We could transition to Stable only when all pending members have joined. We already discussed this and agreed that this is not ideal as it would delay processing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org