dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record> 
classicGroupJoinExistingMember(
         return EMPTY_RESULT;
     }
 
+    /**
+     * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+     * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+     * completeClassicGroupJoin will only be called if the group is CLASSIC.
+     */
+    private CoordinatorResult<Void, Record> completeClassicGroupJoin(String 
groupId) {
+        if (containsClassicGroup(groupId)) {
+            return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));

Review Comment:
   I am not a fan of this pattern because you effectively have to look up the 
group twice. One option would be to use a try..catch to catch the exception 
thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the 
lookup, 2) verify non-null and group type and return if it fails.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2415,6 +2415,20 @@ private CoordinatorResult<Void, Record> 
classicGroupJoinExistingMember(
         return EMPTY_RESULT;
     }
 
+    /**
+     * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+     * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+     * completeClassicGroupJoin will only be called if the group is CLASSIC.
+     */
+    private CoordinatorResult<Void, Record> completeClassicGroupJoin(String 
groupId) {
+        if (containsClassicGroup(groupId)) {
+            return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));
+        } else {
+            log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   I wonder if we could use `debug` here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record> 
maybeCompleteJoinElseSchedule(
      * Try to complete the join phase of the initial rebalance.
      * Otherwise, extend the rebalance.
      *
-     * @param group The group under initial rebalance.
+     * @param groupId The group under initial rebalance.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> 
tryCompleteInitialRebalanceElseSchedule(
-        ClassicGroup group,
+        String groupId,
         int delayMs,
         int remainingMs
     ) {
-        if (group.newMemberAdded() && remainingMs != 0) {
-            // A new member was added. Extend the delay.
-            group.setNewMemberAdded(false);
-            int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, 
remainingMs);
-            int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
-            timer.schedule(
-                classicGroupJoinKey(group.groupId()),
-                newDelayMs,
-                TimeUnit.MILLISECONDS,
-                false,
-                () -> tryCompleteInitialRebalanceElseSchedule(group, 
newDelayMs, newRemainingMs)
-            );
+        if (containsClassicGroup(groupId)) {

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
             group.rebalanceTimeoutMs(),
             TimeUnit.MILLISECONDS,
             false,
-            () -> expirePendingSync(group, group.generationId()));
+            () -> expirePendingSync(group.groupId(), group.generationId()));
     }
 
     /**
      * Invoked when the heartbeat operation is expired from the timer. 
Possibly remove the member and
      * try complete the join phase.
      *
-     * @param group     The group.
+     * @param groupId   The group id.
      * @param memberId  The member id.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
-        ClassicGroup group,
+        String groupId,
         String memberId
     ) {
-        if (group.isInState(DEAD)) {
-            log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
-                    "had already been unloaded or deleted.",
-                memberId, group.groupId());
-        } else if (group.isPendingMember(memberId)) {
-            log.info("Pending member {} in group {} has been removed after 
session timeout expiration.",
-                memberId, group.groupId());
-
-            return removePendingMemberAndUpdateClassicGroup(group, memberId);
-        } else if (!group.hasMemberId(memberId)) {
-            log.debug("Member {} has already been removed from the group.", 
memberId);
-        } else {
-            ClassicGroupMember member = group.member(memberId);
-            if (!member.hasSatisfiedHeartbeat()) {
-                log.info("Member {} in group {} has failed, removing it from 
the group.",
-                    member.memberId(), group.groupId());
+        if (containsClassicGroup(groupId)) {
+            ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false);
+            if (group.isInState(DEAD)) {
+                log.info("Received notification of heartbeat expiration for 
member {} after group {} " +
+                        "had already been unloaded or deleted.",
+                    memberId, group.groupId());
+            } else if (group.isPendingMember(memberId)) {
+                log.info("Pending member {} in group {} has been removed after 
session timeout expiration.",
+                    memberId, group.groupId());
+
+                return removePendingMemberAndUpdateClassicGroup(group, 
memberId);
+            } else if (!group.hasMemberId(memberId)) {
+                log.debug("Member {} has already been removed from the 
group.", memberId);
+            } else {
+                ClassicGroupMember member = group.member(memberId);
+                if (!member.hasSatisfiedHeartbeat()) {
+                    log.info("Member {} in group {} has failed, removing it 
from the group.",
+                        member.memberId(), group.groupId());
 
-                return removeMemberAndUpdateClassicGroup(
-                    group,
-                    member,
-                    "removing member " + member.memberId() + " on heartbeat 
expiration."
-                );
+                    return removeMemberAndUpdateClassicGroup(
+                        group,
+                        member,
+                        "removing member " + member.memberId() + " on 
heartbeat expiration."
+                    );
+                }
             }
+        } else {
+            log.info("Received notification of heartbeat expiration for member 
{} after group {} " +

Review Comment:
   nit: debug?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
             group.rebalanceTimeoutMs(),
             TimeUnit.MILLISECONDS,
             false,
-            () -> expirePendingSync(group, group.generationId()));
+            () -> expirePendingSync(group.groupId(), group.generationId()));
     }
 
     /**
      * Invoked when the heartbeat operation is expired from the timer. 
Possibly remove the member and
      * try complete the join phase.
      *
-     * @param group     The group.
+     * @param groupId   The group id.
      * @param memberId  The member id.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
-        ClassicGroup group,
+        String groupId,
         String memberId
     ) {
-        if (group.isInState(DEAD)) {
-            log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
-                    "had already been unloaded or deleted.",
-                memberId, group.groupId());
-        } else if (group.isPendingMember(memberId)) {
-            log.info("Pending member {} in group {} has been removed after 
session timeout expiration.",
-                memberId, group.groupId());
-
-            return removePendingMemberAndUpdateClassicGroup(group, memberId);
-        } else if (!group.hasMemberId(memberId)) {
-            log.debug("Member {} has already been removed from the group.", 
memberId);
-        } else {
-            ClassicGroupMember member = group.member(memberId);
-            if (!member.hasSatisfiedHeartbeat()) {
-                log.info("Member {} in group {} has failed, removing it from 
the group.",
-                    member.memberId(), group.groupId());
+        if (containsClassicGroup(groupId)) {

Review Comment:
   nit: Same comment here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2954,38 +2980,42 @@ private void removeSyncExpiration(ClassicGroup group) {
     /**
      * Expire pending sync.
      *
-     * @param group           The group.
+     * @param groupId         The group id.
      * @param generationId    The generation when the pending sync was 
originally scheduled.
      *
      * @return The coordinator result that will be appended to the log.
      * */
     private CoordinatorResult<Void, Record> expirePendingSync(
-        ClassicGroup group,
+        String groupId,
         int generationId
     ) {
-        if (generationId != group.generationId()) {
-            log.error("Received unexpected notification of sync expiration for 
{} with an old " +
-                "generation {} while the group has {}.", group.groupId(), 
generationId, group.generationId());
-        } else {
-            if (group.isInState(DEAD) || group.isInState(EMPTY) || 
group.isInState(PREPARING_REBALANCE)) {
-                log.error("Received unexpected notification of sync expiration 
after group {} already " +
-                    "transitioned to {} state.", group.groupId(), 
group.stateAsString());
-            } else if (group.isInState(COMPLETING_REBALANCE) || 
group.isInState(STABLE)) {
-                if (!group.hasReceivedSyncFromAllMembers()) {
-                    Set<String> pendingSyncMembers = new 
HashSet<>(group.allPendingSyncMembers());
-                    pendingSyncMembers.forEach(memberId -> {
-                        group.remove(memberId);
-                        timer.cancel(classicGroupHeartbeatKey(group.groupId(), 
memberId));
-                    });
-
-                    log.debug("Group {} removed members who haven't sent their 
sync requests: {}",
-                        group.groupId(), pendingSyncMembers);
-
-                    return prepareRebalance(group, "Removing " + 
pendingSyncMembers + " on pending sync request expiration");
+        if (containsClassicGroup(groupId)) {

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2805,31 +2826,36 @@ private CoordinatorResult<Void, Record> 
maybeCompleteJoinElseSchedule(
      * Try to complete the join phase of the initial rebalance.
      * Otherwise, extend the rebalance.
      *
-     * @param group The group under initial rebalance.
+     * @param groupId The group under initial rebalance.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> 
tryCompleteInitialRebalanceElseSchedule(
-        ClassicGroup group,
+        String groupId,
         int delayMs,
         int remainingMs
     ) {
-        if (group.newMemberAdded() && remainingMs != 0) {
-            // A new member was added. Extend the delay.
-            group.setNewMemberAdded(false);
-            int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, 
remainingMs);
-            int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
-            timer.schedule(
-                classicGroupJoinKey(group.groupId()),
-                newDelayMs,
-                TimeUnit.MILLISECONDS,
-                false,
-                () -> tryCompleteInitialRebalanceElseSchedule(group, 
newDelayMs, newRemainingMs)
-            );
+        if (containsClassicGroup(groupId)) {
+            ClassicGroup group = getOrMaybeCreateClassicGroup(groupId, false);
+            if (group.newMemberAdded() && remainingMs != 0) {
+                // A new member was added. Extend the delay.
+                group.setNewMemberAdded(false);
+                int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, 
remainingMs);
+                int newRemainingMs = Math.max(remainingMs - delayMs, 0);
+
+                timer.schedule(
+                    classicGroupJoinKey(group.groupId()),
+                    newDelayMs,
+                    TimeUnit.MILLISECONDS,
+                    false,
+                    () -> 
tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, 
newRemainingMs)
+                );
+            } else {
+                // No more time remaining. Complete the join phase.
+                return completeClassicGroupJoin(group);
+            }
         } else {
-            // No more time remaining. Complete the join phase.
-            return completeClassicGroupJoin(group);
+            log.info("Group {} is null or not a classic group, skipping the 
initial rebalance stage.", groupId);

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to