hachikuji commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652141670



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
+        }
+      }
+    }
+  }
+
+  private def removePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(

Review comment:
       One thing we seem to be missing is a call to `removeSyncExpiration` when 
the group is unloaded in `onGroupUnloaded`. That brings up an interesting 
limitation. When a group is loaded, there is no way to tell which followers 
have or have not sent the SyncGroup. The only reasonable thing to do is 
probably assume that they all did indeed send it since we could not rely on the 
request being resent if it had already been successfully received.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is 
completing the rebalance.
+ *
+ * Whenever a SyncGroup is received, checks that we received all the SyncGroup 
request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a 
SyncGroup requests
+ * are removed from the group. If any members is removed, the group is 
rebalanced.
+ */
+private[group] class DelayedSync(

Review comment:
       There are a set of metrics that get exposed automatically for each 
purgatory we add. I'm considering if these will be useful or if we should add a 
flag to let us disable them.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
+        }
+      }
+    }
+  }
+
+  private def removePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    group.clearPendingSyncMembers()
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    val delayedSync = new DelayedSync(this, group, group.generationId, 
group.rebalanceTimeoutMs)
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+    group: GroupMetadata,
+    generationId: Int,
+    forceComplete: () => Boolean
+  ): Boolean = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        forceComplete()
+      } else {
+        group.currentState match {
+          case Dead | Empty | PreparingRebalance =>
+            forceComplete()
+          case CompletingRebalance | Stable =>
+            if (group.hasReceivedSyncFromAllMembers())
+              forceComplete()
+            else false
+        }
+      }
+    }
+  }
+
+  def onExpirePendingSync(
+    group: GroupMetadata,
+    generationId: Int
+  ): Unit = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        debug(s"Received unexpected notification of sync expiration for 
${group.groupId} " +
+          s" with an old generation $generationId.")
+      } else {
+        group.currentState match {
+          case Dead | Empty | PreparingRebalance =>
+            debug(s"Received unexpected notification of sync expiration after 
group ${group.groupId} " +
+              s"already transitioned to the ${group.currentState} state.")
+
+          case CompletingRebalance | Stable =>
+            if (!group.hasAllMembersJoined) {

Review comment:
       Is this check right? Sync expiration begins after all members have 
joined. Did you mean to check the pending sync set?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
             group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
             member.isNew = false
+
+            group.addPendingSyncMember(member.memberId)
           }
+
+          schedulePendingSync(group)
+        }
+      }
+    }
+  }
+
+  private def removePendingSyncMember(
+    group: GroupMetadata,
+    memberId: String
+  ): Unit = {
+    group.removePendingSyncMember(memberId)
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    group.clearPendingSyncMembers()
+    maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+    group: GroupMetadata
+  ): Unit = {
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+    group: GroupMetadata
+  ): Unit = {
+    val delayedSync = new DelayedSync(this, group, group.generationId, 
group.rebalanceTimeoutMs)
+    val groupKey = GroupKey(group.groupId)
+    syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+    group: GroupMetadata,
+    generationId: Int,
+    forceComplete: () => Boolean
+  ): Boolean = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        forceComplete()
+      } else {
+        group.currentState match {
+          case Dead | Empty | PreparingRebalance =>
+            forceComplete()
+          case CompletingRebalance | Stable =>
+            if (group.hasReceivedSyncFromAllMembers())
+              forceComplete()
+            else false
+        }
+      }
+    }
+  }
+
+  def onExpirePendingSync(
+    group: GroupMetadata,
+    generationId: Int
+  ): Unit = {
+    group.inLock {
+      if (generationId != group.generationId) {
+        debug(s"Received unexpected notification of sync expiration for 
${group.groupId} " +
+          s" with an old generation $generationId.")

Review comment:
       Might be useful to mention the current generation as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to