[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2022-01-11 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r781968801



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1690,6 +1690,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 joinGroupRequest.data.protocolType,
 protocols,
 sendResponseCallback,
+Option(joinGroupRequest.data.reason()),

Review comment:
   nit: You can remove the parenthesis after `reason`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to '" + 
exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")";

Review comment:
   Also, it might be better to use `synchronized (AbstractCoordinator.this) 
{ }` to mutate both `rejoinReason` and `rejoinNeeded` in order to ensure that 
they are consistent with each others.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to '" + 
exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")";

Review comment:
   nit: Would it make sense to use `String.format` like we did at L460?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-17 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r771371068



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int,
 // for new members. If the new member is still there, we expect it to 
retry.
 completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId")
+maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId. Member joined due to $reason")

Review comment:
   The output log is quite hard to follow at the moment. Example:
   
   ```
   [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance 
group test in state PreparingRebalance with old generation 1 
(__consumer_offsets-48) (reason: Adding new member 
console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id 
None. Member joined due to rebalance failed due to class 
org.apache.kafka.common.errors.MemberIdRequiredException error: The group 
member needs to have a valid member id before actually entering a consumer 
group.) (kafka.coordinator.group.GroupCoordinator)
   ```
   
   How about doing the following? For each reason, we could add `; client 
reason: $reason`. With this, we will always have (reason: ; client reason: 
...) in each rebalance logs. It might be clearer. What do you think?

##
File path: clients/src/main/resources/common/message/JoinGroupResponse.json
##
@@ -31,7 +31,9 @@
   // Version 6 is the first flexible version.
   //
   // Starting from version 7, the broker sends back the Protocol Type to the 
client (KIP-559).
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).

Review comment:
   nit: Should we rather say Version 8 is the same as version 7. here?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to " + 
exception.getClass() + " error: " + exception.getMessage();

Review comment:
   Example on the broker side:
   ```
   [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance 
group test in state PreparingRebalance with old generation 1 
(__consumer_offsets-48) (reason: Adding new member 
console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id 
None. Member joined due to rebalance failed due to class 
org.apache.kafka.common.errors.MemberIdRequiredException error: The group 
member needs to have a valid member id before actually entering a consumer 
group.) (kafka.coordinator.group.GroupCoordinator)
   ```
   
   * Should we only get the `getSimpleName` of the class?
   * There are many `:` in the log. I wonder if we could remove the one we've 
put here. Perhaps, we could use the following pattern: `rebalance failed due to 
'$message' ($class)`. What do you think?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -181,6 +182,7 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
 case Some(group) =>
   group.inLock {
+val joinReason = reason.getOrElse("unknown reason")

Review comment:
   If we do this, it might be better to not use an `Option` after all. We 
could simply provided the default reason to `handleJoinGroup` if none is 
provided. Also, how about using `not provided` instead of `unknown reason`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r769662220



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2300,21 +2300,32 @@ public ConsumerGroupMetadata groupMetadata() {
  * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 
ConsumerPartitionAssignor}, you should not
  * use this API.
  *
+ * @param reason The reason why the new rebalance is needed.
+ *
  * @throws java.lang.IllegalStateException if the consumer does not use 
group subscription
  */
 @Override
-public void enforceRebalance() {
+public void enforceRebalance(final String reason) {
 acquireAndEnsureOpen();
 try {
 if (coordinator == null) {
 throw new IllegalStateException("Tried to force a rebalance 
but consumer does not have a group.");
 }
-coordinator.requestRejoin("rebalance enforced by user");
+String defaultReason = "rebalance enforced by user";

Review comment:
   nit: Should we define this one as a constant?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = exception.getMessage();

Review comment:
   I wonder if we should put a bit more information here. For instance, we 
could say `rebalance failed due to %s error: %s` where the first `%s` would be 
the exception's class and the second the message. What do you think?

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -54,6 +56,9 @@
 "about": "The protocol name." },
   { "name": "Metadata", "type": "bytes", "versions": "0+",
 "about": "The protocol metadata." }
-]}
+]},
+{ "name": "Reason", "type": "string",
+  "versions": "8+", "nullableVersions": "8+","default": "null",

Review comment:
   Could we also update `RequestResponseTest` to cover this change?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -541,8 +579,17 @@ private void expectDisconnectInJoinGroup(
 }, null, true);
 }
 
+private void expectJoinGroup(
+String expectedMemberId,
+int responseGeneration,
+String responseMemberId

Review comment:
   nit: Indentation is off.

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -30,7 +30,9 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is the same as version 6.
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).
+  "validVersions": "0-8",

Review comment:
   We need to bump the request version as well.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -163,6 +163,7 @@ class GroupCoordinator(val brokerId: Int,
   protocolType: String,
   protocols: List[(String, Array[Byte])],
   responseCallback: JoinCallback,
+  reason: String = null,

Review comment:
   I would rather use `Option[String]` here.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -133,6 +133,7 @@ public boolean hasNotJoinedGroup() {
 protected final ConsumerNetworkClient client;
 
 private Node coordinator = null;
+private String rejoinReason = "initialized abstract coordinator";

Review comment:
   The reason is a bit weird. I wonder if we could just leave it empty in 
the beginning.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
 case Some(group) =>
   group.inLock {
+if (reason != null)
+  info(s"memberId=$memberId with groupInstanceId=$groupInstanceId 
is attempting to join groupId=$groupId due to: $reason")

Review comment:
   We already log a few messages when a member (re-)joins. I wonder if we 
should pass the reason down and use it in all the existing messages that we 
already have. What do you think? I need to look at this a bit more myself.

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -54,6 +56,9 @@
 "about": "The protocol name." },
   { "name": "Metadata", "type": "bytes", "versions": "0+",
 "about": "The protocol metadata." }
-]}
+]},
+{ "name": "Reason",