Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac merged PR #15533:
URL: https://github.com/apache/kafka/pull/15533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


lianetm commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2007369164

   LGTM, thanks!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-200748

   @jeffkbkim @jolshan @lianetm Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1530489017


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1904,16 +2000,7 @@ public void testReconciliationProcess() {
 new ConsumerGroupHeartbeatResponseData()
 .setMemberId(memberId1)
 .setMemberEpoch(11)
-.setHeartbeatIntervalMs(5000)

Review Comment:
   Yep. This is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2007332396

   > They are the only three optional fields, right?
   
   There are actually mandatory in full requests.
   
   > Also, my understanding is that there are cases where we don't set 
subscribedTopicNames and rebalanceTimeoutMs but do set ownedTopicPartitions. Do 
you have examples of this case?
   
   rebalanceTimeoutMs will be set once when joining because it never changes 
afterwards. subscribedTopicNames is set when joining too or when it is updated 
on the client. ownedTopicPartitions is set when joining too or when the 
ownership changes on the client. The client "acks" its owned partitions. 
Overall, in non joining request, they are all set independently.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1530481757


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   @jolshan Yes. You're understanding is correct. I updated the comment to 
explain it as it was explained in the description.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1650,6 +1650,102 @@ public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testConsumerGroupHeartbeatFullResponse() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing two members.

Review Comment:
   Updated the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1529489216


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1904,16 +2000,7 @@ public void testReconciliationProcess() {
 new ConsumerGroupHeartbeatResponseData()
 .setMemberId(memberId1)
 .setMemberEpoch(11)
-.setHeartbeatIntervalMs(5000)

Review Comment:
   so we are no longer sending a reassignment because we were incorrectly doing 
so before? (ie they weren't full requests and didn't need new assignments)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1529486638


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   I think I'm missing how 
   ```(rebalanceTimeoutMs != -1 && subscribedTopicNames != null && 
ownedTopicPartitions != null)```
   maps to 
   ```//  It does so when joining or rejoining the group; or on any errors 
(e.g. timeout).```
   
   Are we instead saying that this particular case -- ie setting all these 3 
fields indicates a "full" request? I think we should update the comment to make 
this a bit clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jeffkbkim commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2005338241

   > This patch changes the logic to check ownedTopicPartitions, 
subscribedTopicNames and rebalanceTimeoutMs as they are the only three non 
optional fields.
   
   They are the only three optional fields, right?
   
   Also, my understanding is that there are cases where we don't set 
subscribedTopicNames and rebalanceTimeoutMs but do set ownedTopicPartitions. Do 
you have examples of this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


lianetm commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1528798734


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1650,6 +1650,102 @@ public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testConsumerGroupHeartbeatFullResponse() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing two members.

Review Comment:
   group containing 1 member here right? 2 partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


lianetm commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1528793006


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   nice, perfectly aligned with the client side. Just for the record, along 
with the `rebalanceTimeout`, `topics` and `assignment`, the client will also 
include the server assignor in any full request, but only if it's configured, 
so agree on not including it here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-14 Thread via GitHub


dajac opened a new pull request, #15533:
URL: https://github.com/apache/kafka/pull/15533

   This patch fixes a bug in the logic which decides when a full 
ConsumerGroupHeartbeat response must be returned to the client. Prior to it, 
the logic only relies on the `ownedTopicPartitions` field to check whether the 
response was a full response. This is not enough because `ownedTopicPartitions` 
is also set in different situations. This patch changes the logic to check 
`ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they 
are the only three non optional fields.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org