dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record> classicGroupJoinExistingMember( return EMPTY_RESULT; } + /** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ + private CoordinatorResult<Void, Record> completeClassicGroupJoin(String groupId) { + if (containsClassicGroup(groupId)) { + return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); Review Comment: I am not a fan of this pattern because you effectively have to look up the group twice. One option would be to use a try..catch to catch the exception thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the lookup, 2) verify non-null and group type and return if it fails. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record> classicGroupJoinExistingMember( return EMPTY_RESULT; } + /** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ + private CoordinatorResult<Void, Record> completeClassicGroupJoin(String groupId) { + if (containsClassicGroup(groupId)) { + return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); + } else { + log.info("Group {} is null or not a classic group, skipping rebalance stage.", groupId); Review Comment: I wonder if we could use `debug` here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record> maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule( - ClassicGroup group, + String groupId, int delayMs, int remainingMs ) { - if (group.newMemberAdded() && remainingMs != 0) { - // A new member was added. Extend the delay. - group.setNewMemberAdded(false); - int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); - int newRemainingMs = Math.max(remainingMs - delayMs, 0); - - timer.schedule( - classicGroupJoinKey(group.groupId()), - newDelayMs, - TimeUnit.MILLISECONDS, - false, - () -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) - ); + if (containsClassicGroup(groupId)) { Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> expirePendingSync(group, group.generationId())); + () -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat( - ClassicGroup group, + String groupId, String memberId ) { - if (group.isInState(DEAD)) { - log.info("Received notification of heartbeat expiration for member {} after group {} " + - "had already been unloaded or deleted.", - memberId, group.groupId()); - } else if (group.isPendingMember(memberId)) { - log.info("Pending member {} in group {} has been removed after session timeout expiration.", - memberId, group.groupId()); - - return removePendingMemberAndUpdateClassicGroup(group, memberId); - } else if (!group.hasMemberId(memberId)) { - log.debug("Member {} has already been removed from the group.", memberId); - } else { - ClassicGroupMember member = group.member(memberId); - if (!member.hasSatisfiedHeartbeat()) { - log.info("Member {} in group {} has failed, removing it from the group.", - member.memberId(), group.groupId()); + if (containsClassicGroup(groupId)) { + ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false); + if (group.isInState(DEAD)) { + log.info("Received notification of heartbeat expiration for member {} after group {} " + + "had already been unloaded or deleted.", + memberId, group.groupId()); + } else if (group.isPendingMember(memberId)) { + log.info("Pending member {} in group {} has been removed after session timeout expiration.", + memberId, group.groupId()); + + return removePendingMemberAndUpdateClassicGroup(group, memberId); + } else if (!group.hasMemberId(memberId)) { + log.debug("Member {} has already been removed from the group.", memberId); + } else { + ClassicGroupMember member = group.member(memberId); + if (!member.hasSatisfiedHeartbeat()) { + log.info("Member {} in group {} has failed, removing it from the group.", + member.memberId(), group.groupId()); - return removeMemberAndUpdateClassicGroup( - group, - member, - "removing member " + member.memberId() + " on heartbeat expiration." - ); + return removeMemberAndUpdateClassicGroup( + group, + member, + "removing member " + member.memberId() + " on heartbeat expiration." + ); + } } + } else { + log.info("Received notification of heartbeat expiration for member {} after group {} " + Review Comment: nit: debug? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> expirePendingSync(group, group.generationId())); + () -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat( - ClassicGroup group, + String groupId, String memberId ) { - if (group.isInState(DEAD)) { - log.info("Received notification of heartbeat expiration for member {} after group {} " + - "had already been unloaded or deleted.", - memberId, group.groupId()); - } else if (group.isPendingMember(memberId)) { - log.info("Pending member {} in group {} has been removed after session timeout expiration.", - memberId, group.groupId()); - - return removePendingMemberAndUpdateClassicGroup(group, memberId); - } else if (!group.hasMemberId(memberId)) { - log.debug("Member {} has already been removed from the group.", memberId); - } else { - ClassicGroupMember member = group.member(memberId); - if (!member.hasSatisfiedHeartbeat()) { - log.info("Member {} in group {} has failed, removing it from the group.", - member.memberId(), group.groupId()); + if (containsClassicGroup(groupId)) { Review Comment: nit: Same comment here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2954,38 +2980,42 @@ private void removeSyncExpiration(ClassicGroup group) { /** * Expire pending sync. * - * @param group The group. + * @param groupId The group id. * @param generationId The generation when the pending sync was originally scheduled. * * @return The coordinator result that will be appended to the log. * */ private CoordinatorResult<Void, Record> expirePendingSync( - ClassicGroup group, + String groupId, int generationId ) { - if (generationId != group.generationId()) { - log.error("Received unexpected notification of sync expiration for {} with an old " + - "generation {} while the group has {}.", group.groupId(), generationId, group.generationId()); - } else { - if (group.isInState(DEAD) || group.isInState(EMPTY) || group.isInState(PREPARING_REBALANCE)) { - log.error("Received unexpected notification of sync expiration after group {} already " + - "transitioned to {} state.", group.groupId(), group.stateAsString()); - } else if (group.isInState(COMPLETING_REBALANCE) || group.isInState(STABLE)) { - if (!group.hasReceivedSyncFromAllMembers()) { - Set<String> pendingSyncMembers = new HashSet<>(group.allPendingSyncMembers()); - pendingSyncMembers.forEach(memberId -> { - group.remove(memberId); - timer.cancel(classicGroupHeartbeatKey(group.groupId(), memberId)); - }); - - log.debug("Group {} removed members who haven't sent their sync requests: {}", - group.groupId(), pendingSyncMembers); - - return prepareRebalance(group, "Removing " + pendingSyncMembers + " on pending sync request expiration"); + if (containsClassicGroup(groupId)) { Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record> maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule( - ClassicGroup group, + String groupId, int delayMs, int remainingMs ) { - if (group.newMemberAdded() && remainingMs != 0) { - // A new member was added. Extend the delay. - group.setNewMemberAdded(false); - int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); - int newRemainingMs = Math.max(remainingMs - delayMs, 0); - - timer.schedule( - classicGroupJoinKey(group.groupId()), - newDelayMs, - TimeUnit.MILLISECONDS, - false, - () -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) - ); + if (containsClassicGroup(groupId)) { + ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false); + if (group.newMemberAdded() && remainingMs != 0) { + // A new member was added. Extend the delay. + group.setNewMemberAdded(false); + int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); + int newRemainingMs = Math.max(remainingMs - delayMs, 0); + + timer.schedule( + classicGroupJoinKey(group.groupId()), + newDelayMs, + TimeUnit.MILLISECONDS, + false, + () -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, newRemainingMs) + ); + } else { + // No more time remaining. Complete the join phase. + return completeClassicGroupJoin(group); + } } else { - // No more time remaining. Complete the join phase. - return completeClassicGroupJoin(group); + log.info("Group {} is null or not a classic group, skipping the initial rebalance stage.", groupId); Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org