Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dajac commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1611845609


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1611840583


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1611838405


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {

Review Comment:
   got it. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dajac merged PR #15988:
URL: https://github.com/apache/kafka/pull/15988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15988:
URL: https://github.com/apache/kafka/pull/15988#issuecomment-2126326606

   All the 
`org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignorTest`
 related failures are due to https://github.com/apache/kafka/pull/15972. I will 
merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610735381


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610729396


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12143,7 +12183,6 @@ public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions()
 // Consumer group with two members.
 // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
 GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)

Review Comment:
   It actually doesn't matter. This was first added because I copied and pasted 
it from the downgrade unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610728790


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {

Review Comment:
   > the helper checks that the latest state does in fact have all partitions 
released but we want it to rejoin to get the updated assignment
   
   Yes this is correct.
   
   > Will this member be updated to STABLE state in the next 
CurrentAssignmentBuilder#computeNextAssignment
   
   Yes it will in the reconciliation part in the 
`classicGroupJoinToConsumerGroup` 
https://github.com/apache/kafka/blob/27a6c156c49e375edea0e6f33a35c64c615db1b5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1737-L1745



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610481770


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610471371


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610462585


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),

Review Comment:
   nit: Collections.singletonList("foo")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610460306


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),

Review Comment:
   nit: i think we can use `Collections.singletonList("foo"),`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610459144


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12143,7 +12183,6 @@ public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions()
 // Consumer group with two members.
 // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
 GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)

Review Comment:
   why were these removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610456745


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {

Review Comment:
   i'm not sure i fully understand this part.
   
   UNRELEASED_PARTITIONS means that the member is waiting on partitions. 
However, i'm guessing the helper checks that the latest state does in fact have 
all partitions released but we want it to rejoin to get the updated assignment. 
Is this correct?
   
   Will this member be updated to STABLE state in the next 
CurrentAssignmentBuilder#computeNextAssignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610010658


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   makes sense. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dajac commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1609835255


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, 
member.rebalanceTimeoutMs());
+}
+
+return new CoordinatorResult<>(
+Collections.emptyList(),
+new HeartbeatResponseData().setErrorCode(error.code())
+);
+}
+
+/**
+ * Validates that (1) the instance id exists and is mapped to the member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param group The consumer group.
+ * @param memberId  The member id.
+ * @param instanceIdThe instance id.
+ *
+ * @return The ConsumerGruopMember.

Review Comment:
   nit: `The ConsumerGroupMember`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   Yeah. This is correct. With the classic group type, the group state is not 
based on timeline data structures so using a read operation was fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608908254


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   I guess because now we'll also read from the ConsumerGroup related things 
which are timeline data structures. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   > we cancel the join timeout when we first convert to consumer group
   
   We don't cancel the timeout in case the conversion fails and the state needs 
to be reverted. The classic group join timeout does nothing if the group is a 
consumer group.
   
   > when we have a group epoch bump we tell the classic group member we're 
rebalancing and they should send a join request
   
   Yes correct, and the timeout here is for the member instead of the whole 
group. For each member, the rebalance will be something like
   - heartbeat -- if there's an ongoing rebalance, schedule the join timeout
   - join -- cancel the join timeout; schedule the sync timeout
   - sync -- cancel the sync timeout; maybe schedule a join timeout if a new 
rebalance ongoing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608403874


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   so this is similar to the offset fetch path. how come we want to access the 
uncommitted state now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-21 Thread via GitHub


dajac commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608287216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2180,36 +2202,58 @@ private void cancelConsumerGroupRebalanceTimeout(
 }
 
 /**
- * Schedules a sync timeout for the member.
+ * Schedules a join timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
  * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupSyncTimeout(
+private void scheduleConsumerGroupJoinTimeout(
 String groupId,
 String memberId,
 int rebalanceTimeoutMs
 ) {
-String key = consumerGroupSyncKey(groupId, memberId);
-timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
-try {
-ConsumerGroup group = consumerGroup(groupId);
-ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
-log.info("[GroupId {}] Member {} fenced from the group because 
its session expired.",
-groupId, memberId);
+timer.schedule(
+consumerGroupJoinKey(groupId, memberId),
+rebalanceTimeoutMs,
+TimeUnit.MILLISECONDS,
+true,
+() -> consumerGroupFenceMemberOperation(groupId, memberId, "the 
member failed to join within timeout.")

Review Comment:
   nit: `the classic member failed to join within the rebalance timeout`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   I think that we have an issue here. The issue is that the HB continues while 
the rebalance is on-going so it will keep overriding the timer. I wonder if we 
could add the timer only if it does not exist yet (e.g. `scheduleIfAbsent`).



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1753,6 +1753,7 @@ private CoordinatorResult 
classicGroupJoinToConsumerGro
 CompletableFuture appendFuture = new CompletableFuture<>();
 appendFuture.whenComplete((__, t) -> {
 if (t == null) {
+cancelConsumerGroupJoinTimeout(groupId, response.memberId());

Review Comment:
   Could we cover this change in an existing unit test?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2089,25 +2123,13 @@ private void scheduleConsumerGroupSessionTimeout(
 String memberId,
 int sessionTimeoutMs
 ) {
-String key = consumerGroupSessionTimeoutKey(groupId, memberId);
-timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () 
-> {
-try {
-ConsumerGroup group = consumerGroup(groupId);
-ConsumerGroupMember member = 

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-20 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1607136651


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -420,12 +420,11 @@ public CompletableFuture heartbeat(
 );
 }
 
-// Using a read operation is okay here as we ignore the last committed 
offset in the snapshot registry.
-// This means we will read whatever is in the latest snapshot, which 
is how the old coordinator behaves.
-return runtime.scheduleReadOperation(
+return runtime.scheduleWriteOperation(
 "classic-group-heartbeat",
 topicPartitionFor(request.groupId()),
-(coordinator, __) -> coordinator.classicGroupHeartbeat(context, 
request)
+Duration.ofMillis(config.offsetCommitTimeoutMs),

Review Comment:
   not necessarily a comment for this PR but i wonder if we should change the 
name of this config since it's being used for all writes.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   maybe i'm missing something but i don't see where we actually initialize 
CoordinatorResult with records to write to the log



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   we are saying that we cancel the join timeout when we first convert to 
consumer group, then when we have a group epoch bump we tell the classic group 
member we're rebalancing and they should send a join request. is my 
understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-17 Thread via GitHub


dongnuo123 opened a new pull request, #15988:
URL: https://github.com/apache/kafka/pull/15988

   This patch implements the heartbeat api to the members that use the classic 
protocol in a ConsumerGroup.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org