[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
dajac commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r781968801 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1690,6 +1690,7 @@ class KafkaApis(val requestChannel: RequestChannel, joinGroupRequest.data.protocolType, protocols, sendResponseCallback, +Option(joinGroupRequest.data.reason()), Review comment: nit: You can remove the parenthesis after `reason`. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to '" + exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")"; Review comment: Also, it might be better to use `synchronized (AbstractCoordinator.this) { }` to mutate both `rejoinReason` and `rejoinNeeded` in order to ensure that they are consistent with each others. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to '" + exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")"; Review comment: nit: Would it make sense to use `String.format` like we did at L460? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
dajac commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r771371068 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int, // for new members. If the new member is still there, we expect it to retry. completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) -maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId") +maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId. Member joined due to $reason") Review comment: The output log is quite hard to follow at the moment. Example: ``` [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance group test in state PreparingRebalance with old generation 1 (__consumer_offsets-48) (reason: Adding new member console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id None. Member joined due to rebalance failed due to class org.apache.kafka.common.errors.MemberIdRequiredException error: The group member needs to have a valid member id before actually entering a consumer group.) (kafka.coordinator.group.GroupCoordinator) ``` How about doing the following? For each reason, we could add `; client reason: $reason`. With this, we will always have (reason: ; client reason: ...) in each rebalance logs. It might be clearer. What do you think? ## File path: clients/src/main/resources/common/message/JoinGroupResponse.json ## @@ -31,7 +31,9 @@ // Version 6 is the first flexible version. // // Starting from version 7, the broker sends back the Protocol Type to the client (KIP-559). - "validVersions": "0-7", + // + // Version 8 adds the Reason field (KIP-800). Review comment: nit: Should we rather say Version 8 is the same as version 7. here? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to " + exception.getClass() + " error: " + exception.getMessage(); Review comment: Example on the broker side: ``` [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance group test in state PreparingRebalance with old generation 1 (__consumer_offsets-48) (reason: Adding new member console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id None. Member joined due to rebalance failed due to class org.apache.kafka.common.errors.MemberIdRequiredException error: The group member needs to have a valid member id before actually entering a consumer group.) (kafka.coordinator.group.GroupCoordinator) ``` * Should we only get the `getSimpleName` of the class? * There are many `:` in the log. I wonder if we could remove the one we've put here. Perhaps, we could use the following pattern: `rebalance failed due to '$message' ($class)`. What do you think? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -181,6 +182,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { +val joinReason = reason.getOrElse("unknown reason") Review comment: If we do this, it might be better to not use an `Option` after all. We could simply provided the default reason to `handleJoinGroup` if none is provided. Also, how about using `not provided` instead of `unknown reason`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
dajac commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r769662220 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2300,21 +2300,32 @@ public ConsumerGroupMetadata groupMetadata() { * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not * use this API. * + * @param reason The reason why the new rebalance is needed. + * * @throws java.lang.IllegalStateException if the consumer does not use group subscription */ @Override -public void enforceRebalance() { +public void enforceRebalance(final String reason) { acquireAndEnsureOpen(); try { if (coordinator == null) { throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group."); } -coordinator.requestRejoin("rebalance enforced by user"); +String defaultReason = "rebalance enforced by user"; Review comment: nit: Should we define this one as a constant? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = exception.getMessage(); Review comment: I wonder if we should put a bit more information here. For instance, we could say `rebalance failed due to %s error: %s` where the first `%s` would be the exception's class and the second the message. What do you think? ## File path: clients/src/main/resources/common/message/JoinGroupRequest.json ## @@ -54,6 +56,9 @@ "about": "The protocol name." }, { "name": "Metadata", "type": "bytes", "versions": "0+", "about": "The protocol metadata." } -]} +]}, +{ "name": "Reason", "type": "string", + "versions": "8+", "nullableVersions": "8+","default": "null", Review comment: Could we also update `RequestResponseTest` to cover this change? ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -541,8 +579,17 @@ private void expectDisconnectInJoinGroup( }, null, true); } +private void expectJoinGroup( +String expectedMemberId, +int responseGeneration, +String responseMemberId Review comment: nit: Indentation is off. ## File path: clients/src/main/resources/common/message/JoinGroupRequest.json ## @@ -30,7 +30,9 @@ // Version 6 is the first flexible version. // // Version 7 is the same as version 6. - "validVersions": "0-7", + // + // Version 8 adds the Reason field (KIP-800). + "validVersions": "0-8", Review comment: We need to bump the request version as well. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -163,6 +163,7 @@ class GroupCoordinator(val brokerId: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback, + reason: String = null, Review comment: I would rather use `Option[String]` here. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -133,6 +133,7 @@ public boolean hasNotJoinedGroup() { protected final ConsumerNetworkClient client; private Node coordinator = null; +private String rejoinReason = "initialized abstract coordinator"; Review comment: The reason is a bit weird. I wonder if we could just leave it empty in the beginning. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { +if (reason != null) + info(s"memberId=$memberId with groupInstanceId=$groupInstanceId is attempting to join groupId=$groupId due to: $reason") Review comment: We already log a few messages when a member (re-)joins. I wonder if we should pass the reason down and use it in all the existing messages that we already have. What do you think? I need to look at this a bit more myself. ## File path: clients/src/main/resources/common/message/JoinGroupRequest.json ## @@ -54,6 +56,9 @@ "about": "The protocol name." }, { "name": "Metadata", "type": "bytes", "versions": "0+", "about": "The protocol metadata." } -]} +]}, +{ "name": "Reason",