dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1551577818
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2814,7 +2820,9 @@ private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule( int delayMs, int remainingMs ) { - if (group.newMemberAdded() && remainingMs != 0) { + if (!containsClassicGroup(group.groupId())) { + log.info("Group {} is null or not a classic group, skipping the initial rebalance stage.", group.groupId()); + } else if (group.newMemberAdded() && remainingMs != 0) { Review Comment: Same comment. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2451,6 +2451,8 @@ private CoordinatorResult<Void, Record> completeClassicGroupJoin( if (group.isInState(DEAD)) { log.info("Group {} is dead, skipping rebalance stage.", groupId); + } else if (!containsClassicGroup(group.groupId())) { + log.info("Group {} is null or not a classic group, skipping rebalance stage.", groupId); Review Comment: I find this check a little unexpected here because the method received a `ClassicGroup`. It seems that `completeClassicGroupJoin` is called from two different contexts. In the first one, it is called in the regular processing of the request so the `ClassicGroup` is available and we know that it is fine. In the second one, it is called from an expired timer. In this case, we need to validate that the group still exists. Therefore, I wonder whether we should add overload to `completeClassicGroupJoin` which takes the group id and looks up the group based on it. We could then use it in the "timer context". What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2963,7 +2971,9 @@ private CoordinatorResult<Void, Record> expirePendingSync( ClassicGroup group, int generationId ) { - if (generationId != group.generationId()) { + if (!containsClassicGroup(group.groupId())) { Review Comment: It looks like that we could change the param here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2553,6 +2555,10 @@ private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat( log.info("Received notification of heartbeat expiration for member {} after group {} " + "had already been unloaded or deleted.", memberId, group.groupId()); + } else if (!containsClassicGroup(group.groupId())) { + log.info("Received notification of heartbeat expiration for member {} after group {} " + + "had already been deleted or upgraded.", + memberId, group.groupId()); Review Comment: I have a similar comment for this one. However, in this case, we could just replace the `group` argument by `groupId` and do the lookup here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org