[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r772798218 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to " + exception.getClass() + " error: " + exception.getMessage(); Review comment: thanks for the suggestions. i'll go ahead with david's suggestion as we're updating `Member joined due to` to `client reason:` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r772797072 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -181,6 +182,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { +val joinReason = reason.getOrElse("unknown reason") Review comment: good idea! i'll update the leave group as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r772795815 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } +@Test +public void testRejoinReason() { +setupCoordinator(); + +String memberId = "memberId"; +int generation = 5; + +// test initial reason +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +expectJoinGroup("", "initialized abstract coordinator", generation, memberId); + +// successful sync group response should reset reason +expectSyncGroup(generation, memberId); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// Force a rebalance +expectJoinGroup(memberId, "Manual test trigger", generation, memberId); +expectSyncGroup(generation, memberId); +coordinator.requestRejoin("Manual test trigger"); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// max group size reached + mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)); +coordinator.requestRejoin("Manual test trigger 2"); +try { +coordinator.joinGroupIfNeeded(mockTime.timer(100L)); +} catch (GroupMaxSizeReachedException e) { Review comment: ah gotcha. makes sense, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r770279783 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -559,8 +606,10 @@ private void expectJoinGroup( return false; } JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) body).data(); +boolean isReasonMatching = expectedReason == null || joinGroupRequest.reason().equals(expectedReason); Review comment: i added a comment. the reason is never set to null in AbstractCoordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r770278660 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } +@Test +public void testRejoinReason() { +setupCoordinator(); + +String memberId = "memberId"; +int generation = 5; + +// test initial reason +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +expectJoinGroup("", "initialized abstract coordinator", generation, memberId); + +// successful sync group response should reset reason +expectSyncGroup(generation, memberId); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// Force a rebalance +expectJoinGroup(memberId, "Manual test trigger", generation, memberId); +expectSyncGroup(generation, memberId); +coordinator.requestRejoin("Manual test trigger"); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// max group size reached + mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)); +coordinator.requestRejoin("Manual test trigger 2"); +try { +coordinator.joinGroupIfNeeded(mockTime.timer(100L)); +} catch (GroupMaxSizeReachedException e) { Review comment: we do not want to stop when an exception is thrown - we need to verify that the reason is updated to the appropriate exception message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r770278248 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { +if (reason != null) + info(s"memberId=$memberId with groupInstanceId=$groupInstanceId is attempting to join groupId=$groupId due to: $reason") Review comment: yeah, that was my original thoughts as well. i believe you are referring to `maybePrepareRebalance(group: GroupMetadata, reason: String)`. methods that are referencing this are: - `addMemberAndRebalance` - `doSyncGroup` (NA) - `removeMemberAndUpdateGroup` (NA) - `updateMemberAndRebalance` - `updateStaticMemberAndRebalance` i've passed the reason into all parent methods. also, i've updated `updateStaticMemberAndRebalance`'s direct call to `prepareRebalance()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r770277817 ## File path: clients/src/main/resources/common/message/JoinGroupRequest.json ## @@ -30,7 +30,9 @@ // Version 6 is the first flexible version. // // Version 7 is the same as version 6. - "validVersions": "0-7", + // + // Version 8 adds the Reason field (KIP-800). + "validVersions": "0-8", Review comment: bumped the response version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
jeffkbkim commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r763464514 ## File path: clients/src/main/resources/common/message/JoinGroupRequest.json ## @@ -30,7 +30,9 @@ // Version 6 is the first flexible version. // // Version 7 is the same as version 6. Review comment: thanks for the comment. these lines aren't being removed. i think github ui is marking red for json comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org