dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1551577818


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2814,7 +2820,9 @@ private CoordinatorResult<Void, Record> 
tryCompleteInitialRebalanceElseSchedule(
         int delayMs,
         int remainingMs
     ) {
-        if (group.newMemberAdded() && remainingMs != 0) {
+        if (!containsClassicGroup(group.groupId())) {
+            log.info("Group {} is null or not a classic group, skipping the 
initial rebalance stage.", group.groupId());
+        } else if (group.newMemberAdded() && remainingMs != 0) {

Review Comment:
   Same comment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2451,6 +2451,8 @@ private CoordinatorResult<Void, Record> 
completeClassicGroupJoin(
 
         if (group.isInState(DEAD)) {
             log.info("Group {} is dead, skipping rebalance stage.", groupId);
+        } else if (!containsClassicGroup(group.groupId())) {
+            log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   I find this check a little unexpected here because the method received a 
`ClassicGroup`. It seems that `completeClassicGroupJoin` is called from two 
different contexts. In the first one, it is called in the regular processing of 
the request so the `ClassicGroup` is available and we know that it is fine. In 
the second one, it is called from an expired timer. In this case, we need to 
validate that the group still exists. Therefore, I wonder whether we should add 
overload to `completeClassicGroupJoin` which takes the group id and looks up 
the group based on it. We could then use it in the "timer context". What do you 
think?
   
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2963,7 +2971,9 @@ private CoordinatorResult<Void, Record> expirePendingSync(
         ClassicGroup group,
         int generationId
     ) {
-        if (generationId != group.generationId()) {
+        if (!containsClassicGroup(group.groupId())) {

Review Comment:
   It looks like that we could change the param here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2553,6 +2555,10 @@ private CoordinatorResult<Void, Record> 
expireClassicGroupMemberHeartbeat(
             log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
                     "had already been unloaded or deleted.",
                 memberId, group.groupId());
+        } else if (!containsClassicGroup(group.groupId())) {
+            log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
+                    "had already been deleted or upgraded.",
+                memberId, group.groupId());

Review Comment:
   I have a similar comment for this one. However, in this case, we could just 
replace the `group` argument by `groupId` and do the lookup here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to