dajac commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652188256



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
+        }
+      }
+    }
+  }
+
+  private def removePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(

Review comment:
       Definitely. I will add that.
   
   I cannot really think of any other doable alternatives. At least, we know 
that the leader has sent the SyncGroup as the group transitioned to stable.
   
   Throwing few ideas:
   * Could we store the pending set in the log? That does not seem really 
doable as the set could be updated before the write is acknowledged. Therefore 
we could think that a member is pending after a failover but it might not be.
   * We could transition to Stable only when all pending members have joined. 
We already discussed this and agreed that this is not ideal as it would delay 
processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to