[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715689#comment-16715689
 ] 

ASF GitHub Bot commented on KAFKA-7610:
---------------------------------------

guozhangwang closed pull request #5962: KAFKA-7610; Proactively timeout new 
group members if rebalance is delayed
URL: https://github.com/apache/kafka/pull/5962
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 5f16acb6a85..93775182570 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -26,11 +26,11 @@ import kafka.server.DelayedOperation
 private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       group: GroupMetadata,
                                       member: MemberMetadata,
-                                      heartbeatDeadline: Long,
-                                      sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout, Some(group.lock)) {
+                                      deadline: Long,
+                                      timeoutMs: Long)
+  extends DelayedOperation(timeoutMs, Some(group.lock)) {
 
-  override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
heartbeatDeadline)
+  override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
deadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index db89f14592f..007c6eea75a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int,
         case Empty | Dead =>
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
-            group.invokeJoinCallback(member, joinError(member.memberId, 
Errors.NOT_COORDINATOR))
+            group.maybeInvokeJoinCallback(member, joinError(member.memberId, 
Errors.NOT_COORDINATOR))
           }
 
           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
@@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int,
    * Complete existing DelayedHeartbeats for the given member and schedule the 
next one
    */
   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, 
member: MemberMetadata) {
+    completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
+  }
+
+  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: 
MemberMetadata, timeoutMs: Long): Unit = {
     // complete current heartbeat expectation
     member.latestHeartbeat = time.milliseconds()
     val memberKey = MemberKey(member.groupId, member.memberId)
     heartbeatPurgatory.checkAndComplete(memberKey)
 
     // reschedule the next heartbeat expiration deadline
-    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, 
newHeartbeatDeadline, member.sessionTimeoutMs)
+    val deadline = member.latestHeartbeat + timeoutMs
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, 
timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
   }
 
@@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int,
     val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, clientId, 
clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
+
+    member.isNew = true
+
     // update the newMemberAdded flag to indicate that the join group can be 
further delayed
     if (group.is(PreparingRebalance) && group.generationId == 0)
       group.newMemberAdded = true
 
     group.add(member, callback)
+
+    // The session timeout does not affect new members since they do not have 
their memberId and
+    // cannot send heartbeats. Furthermore, we cannot detect disconnects 
because sockets are muted
+    // while the JoinGroup is in purgatory. If the client does disconnect 
(e.g. because of a request
+    // timeout during a long rebalance), they may simply retry which will lead 
to a lot of defunct
+    // members in the rebalance. To prevent this going on indefinitely, we 
timeout JoinGroup requests
+    // for new members. If the new member is still there, we expect it to 
retry.
+    completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
+
     maybePrepareRebalance(group, s"Adding new member $memberId")
     member
   }
@@ -751,7 +767,13 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def removeMemberAndUpdateGroup(group: GroupMetadata, member: 
MemberMetadata, reason: String) {
+    // New members may timeout with a pending JoinGroup while the group is 
still rebalancing, so we have
+    // to invoke the callback before removing the member. We return 
UNKNOWN_MEMBER_ID so that the consumer
+    // will retry the JoinGroup request if is still active.
+    group.maybeInvokeJoinCallback(member, joinError(NoMemberId, 
Errors.UNKNOWN_MEMBER_ID))
+
     group.remove(member.memberId)
+
     group.currentState match {
       case Dead | Empty =>
       case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
@@ -813,8 +835,9 @@ class GroupCoordinator(val brokerId: Int,
               leaderId = group.leaderOrNull,
               error = Errors.NONE)
 
-            group.invokeJoinCallback(member, joinResult)
+            group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
+            member.isNew = false
           }
         }
       }
@@ -823,7 +846,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, 
heartbeatDeadline: Long, forceComplete: () => Boolean) = {
     group.inLock {
-      if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+      if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving)
         forceComplete()
       else false
     }
@@ -831,7 +854,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, 
heartbeatDeadline: Long) {
     group.inLock {
-      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
+      if (!member.shouldKeepAlive(heartbeatDeadline)) {
         info(s"Member ${member.memberId} in group ${group.groupId} has failed, 
removing it from the group")
         removeMemberAndUpdateGroup(group, member, s"removing member 
${member.memberId} on heartbeat expiration")
       }
@@ -844,11 +867,6 @@ class GroupCoordinator(val brokerId: Int,
 
   def partitionFor(group: String): Int = groupManager.partitionFor(group)
 
-  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: 
Long) =
-    member.awaitingJoinCallback != null ||
-      member.awaitingSyncCallback != null ||
-      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
-
   private def isCoordinatorForGroup(groupId: String) = 
groupManager.isGroupLocal(groupId)
 
   private def isCoordinatorLoadInProgress(groupId: String) = 
groupManager.isGroupLoading(groupId)
@@ -865,6 +883,7 @@ object GroupCoordinator {
   val NoMembers = List[MemberSummary]()
   val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, 
NoMembers)
+  val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000
 
   def apply(config: KafkaConfig,
             zkClient: KafkaZkClient,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index cbe78e980b6..e2d9c5f4760 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -220,7 +220,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     member.supportedProtocols.foreach{ case (protocol, _) => 
supportedProtocols(protocol) += 1 }
     member.awaitingJoinCallback = callback
     if (member.awaitingJoinCallback != null)
-      numMembersAwaitingJoin += 1;
+      numMembersAwaitingJoin += 1
   }
 
   def remove(memberId: String) {
@@ -300,19 +300,19 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     member.supportedProtocols = protocols
 
     if (callback != null && member.awaitingJoinCallback == null) {
-      numMembersAwaitingJoin += 1;
+      numMembersAwaitingJoin += 1
     } else if (callback == null && member.awaitingJoinCallback != null) {
-      numMembersAwaitingJoin -= 1;
+      numMembersAwaitingJoin -= 1
     }
     member.awaitingJoinCallback = callback
   }
 
-  def invokeJoinCallback(member: MemberMetadata,
-                         joinGroupResult: JoinGroupResult) : Unit = {
+  def maybeInvokeJoinCallback(member: MemberMetadata,
+                              joinGroupResult: JoinGroupResult) : Unit = {
     if (member.awaitingJoinCallback != null) {
       member.awaitingJoinCallback(joinGroupResult)
       member.awaitingJoinCallback = null
-      numMembersAwaitingJoin -= 1;
+      numMembersAwaitingJoin -= 1
     }
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index b082b9bad2e..8649b3e70f7 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -64,6 +64,7 @@ private[group] class MemberMetadata(val memberId: String,
   var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
+  var isNew: Boolean = false
 
   def protocols = supportedProtocols.map(_._1).toSet
 
@@ -78,6 +79,13 @@ private[group] class MemberMetadata(val memberId: String,
     }
   }
 
+  def shouldKeepAlive(deadlineMs: Long): Boolean = {
+    if (awaitingJoinCallback != null)
+      !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > 
deadlineMs
+    else awaitingSyncCallback != null ||
+      latestHeartbeat + sessionTimeoutMs > deadlineMs
+  }
+
   /**
    * Check if the provided protocol metadata matches the currently stored 
metadata.
    */
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c1623427f0f..1ef695e94b0 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -56,8 +56,8 @@ class GroupCoordinatorTest extends JUnitSuite {
 
   val ClientId = "consumer-test"
   val ClientHost = "localhost"
-  val ConsumerMinSessionTimeout = 10
-  val ConsumerMaxSessionTimeout = 1000
+  val GroupMinSessionTimeout = 10
+  val GroupMaxSessionTimeout = 10 * 60 * 1000
   val DefaultRebalanceTimeout = 500
   val DefaultSessionTimeout = 500
   val GroupInitialRebalanceDelay = 50
@@ -80,8 +80,8 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Before
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, 
ConsumerMinSessionTimeout.toString)
-    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, 
ConsumerMaxSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, 
GroupMinSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, 
GroupMaxSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, 
GroupInitialRebalanceDelay.toString)
     // make two partitions of the group topic to make sure some partitions are 
not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
@@ -194,7 +194,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooSmall() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, 
protocols, sessionTimeout = ConsumerMinSessionTimeout - 1)
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, 
protocols, sessionTimeout = GroupMinSessionTimeout - 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
@@ -203,7 +203,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooLarge() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, 
protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1)
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, 
protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
@@ -262,6 +262,49 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
   }
 
+  @Test
+  def testNewMemberJoinExpiration(): Unit = {
+    // This tests new member expiration during a protracted rebalance. We 
first create a
+    // group with one member which uses a large value for session timeout and 
rebalance timeout.
+    // We then join with one new member and let the rebalance hang while we 
await the first member.
+    // The new member join timeout expires and its JoinGroup request is failed.
+
+    val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+    val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2
+
+    val firstJoinResult = joinGroup(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      sessionTimeout, rebalanceTimeout)
+    val firstMemberId = firstJoinResult.memberId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
+    assertTrue(groupOpt.isDefined)
+    val group = groupOpt.get
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+    EasyMock.reset(replicaManager)
+
+    val responseFuture = sendJoinGroup(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      rebalanceTimeout, sessionTimeout)
+    assertFalse(responseFuture.isCompleted)
+
+    assertEquals(2, group.allMembers.size)
+    assertEquals(1, group.allMemberMetadata.count(_.isNew))
+
+    val newMember = group.allMemberMetadata.find(_.isNew).get
+    assertNotEquals(firstMemberId, newMember.memberId)
+
+    timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 1)
+    assertTrue(responseFuture.isCompleted)
+
+    val response = Await.result(responseFuture, Duration(0, 
TimeUnit.MILLISECONDS))
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, response.error)
+    assertEquals(1, group.allMembers.size)
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+    assertEquals(firstMemberId, group.allMembers.head)
+  }
+
   @Test
   def testJoinGroupInconsistentGroupProtocol() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Detect consumer failures in initial JoinGroup
> ---------------------------------------------
>
>                 Key: KAFKA-7610
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7610
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Jason Gustafson
>            Assignee: Boyang Chen
>            Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to