[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715689#comment-16715689 ]
ASF GitHub Bot commented on KAFKA-7610: --------------------------------------- guozhangwang closed pull request #5962: KAFKA-7610; Proactively timeout new group members if rebalance is delayed URL: https://github.com/apache/kafka/pull/5962 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 5f16acb6a85..93775182570 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -26,11 +26,11 @@ import kafka.server.DelayedOperation private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, - heartbeatDeadline: Long, - sessionTimeout: Long) - extends DelayedOperation(sessionTimeout, Some(group.lock)) { + deadline: Long, + timeoutMs: Long) + extends DelayedOperation(timeoutMs, Some(group.lock)) { - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) - override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _) + override def onExpiration() = coordinator.onExpireHeartbeat(group, member, deadline) override def onComplete() = coordinator.onCompleteHeartbeat() } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index db89f14592f..007c6eea75a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int, case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { - group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) + group.maybeInvokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) } joinPurgatory.checkAndComplete(GroupKey(group.groupId)) @@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int, * Complete existing DelayedHeartbeats for the given member and schedule the next one */ private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { + completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs) + } + + private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline - val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) + val deadline = member.latestHeartbeat + timeoutMs + val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, timeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) } @@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int, val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) + + member.isNew = true + // update the newMemberAdded flag to indicate that the join group can be further delayed if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true group.add(member, callback) + + // The session timeout does not affect new members since they do not have their memberId and + // cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted + // while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request + // timeout during a long rebalance), they may simply retry which will lead to a lot of defunct + // members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests + // for new members. If the new member is still there, we expect it to retry. + completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) + maybePrepareRebalance(group, s"Adding new member $memberId") member } @@ -751,7 +767,13 @@ class GroupCoordinator(val brokerId: Int, } private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String) { + // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have + // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer + // will retry the JoinGroup request if is still active. + group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID)) + group.remove(member.memberId) + group.currentState match { case Dead | Empty => case Stable | CompletingRebalance => maybePrepareRebalance(group, reason) @@ -813,8 +835,9 @@ class GroupCoordinator(val brokerId: Int, leaderId = group.leaderOrNull, error = Errors.NONE) - group.invokeJoinCallback(member, joinResult) + group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) + member.isNew = false } } } @@ -823,7 +846,7 @@ class GroupCoordinator(val brokerId: Int, def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { group.inLock { - if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving) + if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) forceComplete() else false } @@ -831,7 +854,7 @@ class GroupCoordinator(val brokerId: Int, def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock { - if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { + if (!member.shouldKeepAlive(heartbeatDeadline)) { info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration") } @@ -844,11 +867,6 @@ class GroupCoordinator(val brokerId: Int, def partitionFor(group: String): Int = groupManager.partitionFor(group) - private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = - member.awaitingJoinCallback != null || - member.awaitingSyncCallback != null || - member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline - private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId) private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId) @@ -865,6 +883,7 @@ object GroupCoordinator { val NoMembers = List[MemberSummary]() val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers) val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) + val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000 def apply(config: KafkaConfig, zkClient: KafkaZkClient, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index cbe78e980b6..e2d9c5f4760 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -220,7 +220,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } member.awaitingJoinCallback = callback if (member.awaitingJoinCallback != null) - numMembersAwaitingJoin += 1; + numMembersAwaitingJoin += 1 } def remove(memberId: String) { @@ -300,19 +300,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState member.supportedProtocols = protocols if (callback != null && member.awaitingJoinCallback == null) { - numMembersAwaitingJoin += 1; + numMembersAwaitingJoin += 1 } else if (callback == null && member.awaitingJoinCallback != null) { - numMembersAwaitingJoin -= 1; + numMembersAwaitingJoin -= 1 } member.awaitingJoinCallback = callback } - def invokeJoinCallback(member: MemberMetadata, - joinGroupResult: JoinGroupResult) : Unit = { + def maybeInvokeJoinCallback(member: MemberMetadata, + joinGroupResult: JoinGroupResult) : Unit = { if (member.awaitingJoinCallback != null) { member.awaitingJoinCallback(joinGroupResult) member.awaitingJoinCallback = null - numMembersAwaitingJoin -= 1; + numMembersAwaitingJoin -= 1 } } diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index b082b9bad2e..8649b3e70f7 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -64,6 +64,7 @@ private[group] class MemberMetadata(val memberId: String, var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false + var isNew: Boolean = false def protocols = supportedProtocols.map(_._1).toSet @@ -78,6 +79,13 @@ private[group] class MemberMetadata(val memberId: String, } } + def shouldKeepAlive(deadlineMs: Long): Boolean = { + if (awaitingJoinCallback != null) + !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs + else awaitingSyncCallback != null || + latestHeartbeat + sessionTimeoutMs > deadlineMs + } + /** * Check if the provided protocol metadata matches the currently stored metadata. */ diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index c1623427f0f..1ef695e94b0 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -56,8 +56,8 @@ class GroupCoordinatorTest extends JUnitSuite { val ClientId = "consumer-test" val ClientHost = "localhost" - val ConsumerMinSessionTimeout = 10 - val ConsumerMaxSessionTimeout = 1000 + val GroupMinSessionTimeout = 10 + val GroupMaxSessionTimeout = 10 * 60 * 1000 val DefaultRebalanceTimeout = 500 val DefaultSessionTimeout = 500 val GroupInitialRebalanceDelay = 50 @@ -80,8 +80,8 @@ class GroupCoordinatorTest extends JUnitSuite { @Before def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) - props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, GroupMinSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, GroupMaxSessionTimeout.toString) props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator val ret = mutable.Map[String, Map[Int, Seq[Int]]]() @@ -194,7 +194,7 @@ class GroupCoordinatorTest extends JUnitSuite { def testJoinGroupSessionTimeoutTooSmall() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1) + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1) val joinGroupError = joinGroupResult.error assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) } @@ -203,7 +203,7 @@ class GroupCoordinatorTest extends JUnitSuite { def testJoinGroupSessionTimeoutTooLarge() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1) + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1) val joinGroupError = joinGroupResult.error assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) } @@ -262,6 +262,49 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) } + @Test + def testNewMemberJoinExpiration(): Unit = { + // This tests new member expiration during a protracted rebalance. We first create a + // group with one member which uses a large value for session timeout and rebalance timeout. + // We then join with one new member and let the rebalance hang while we await the first member. + // The new member join timeout expires and its JoinGroup request is failed. + + val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 + val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2 + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + sessionTimeout, rebalanceTimeout) + val firstMemberId = firstJoinResult.memberId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + val groupOpt = groupCoordinator.groupManager.getGroup(groupId) + assertTrue(groupOpt.isDefined) + val group = groupOpt.get + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + + EasyMock.reset(replicaManager) + + val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + rebalanceTimeout, sessionTimeout) + assertFalse(responseFuture.isCompleted) + + assertEquals(2, group.allMembers.size) + assertEquals(1, group.allMemberMetadata.count(_.isNew)) + + val newMember = group.allMemberMetadata.find(_.isNew).get + assertNotEquals(firstMemberId, newMember.memberId) + + timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 1) + assertTrue(responseFuture.isCompleted) + + val response = Await.result(responseFuture, Duration(0, TimeUnit.MILLISECONDS)) + assertEquals(Errors.UNKNOWN_MEMBER_ID, response.error) + assertEquals(1, group.allMembers.size) + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + assertEquals(firstMemberId, group.allMembers.head) + } + @Test def testJoinGroupInconsistentGroupProtocol() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Detect consumer failures in initial JoinGroup > --------------------------------------------- > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Jason Gustafson > Assignee: Boyang Chen > Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)