Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1611845609 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1611840583 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1611838405 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { Review Comment: got it. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dajac merged PR #15988: URL: https://github.com/apache/kafka/pull/15988 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dajac commented on PR #15988: URL: https://github.com/apache/kafka/pull/15988#issuecomment-2126326606 All the `org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignorTest` related failures are due to https://github.com/apache/kafka/pull/15972. I will merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610735381 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610729396 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) Review Comment: It actually doesn't matter. This was first added because I copied and pasted it from the downgrade unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610728790 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { Review Comment: > the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment Yes this is correct. > Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment Yes it will in the reconciliation part in the `classicGroupJoinToConsumerGroup` https://github.com/apache/kafka/blob/27a6c156c49e375edea0e6f33a35c64c615db1b5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1737-L1745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610481770 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610471371 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610462585 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), Review Comment: nit: Collections.singletonList("foo") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610460306 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), Review Comment: nit: i think we can use `Collections.singletonList("foo"),` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610459144 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) Review Comment: why were these removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610456745 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { Review Comment: i'm not sure i fully understand this part. UNRELEASED_PARTITIONS means that the member is waiting on partitions. However, i'm guessing the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment. Is this correct? Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610010658 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: makes sense. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1609835255 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, member.rebalanceTimeoutMs()); +} + +return new CoordinatorResult<>( +Collections.emptyList(), +new HeartbeatResponseData().setErrorCode(error.code()) +); +} + +/** + * Validates that (1) the instance id exists and is mapped to the member id + * if the group instance id is provided; and (2) the member id exists in the group. + * + * @param group The consumer group. + * @param memberId The member id. + * @param instanceIdThe instance id. + * + * @return The ConsumerGruopMember. Review Comment: nit: `The ConsumerGroupMember`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: Yeah. This is correct. With the classic group type, the group state is not based on timeline data structures so using a read operation was fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608908254 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: I guess because now we'll also read from the ConsumerGroup related things which are timeline data structures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: > we cancel the join timeout when we first convert to consumer group We don't cancel the timeout in case the conversion fails and the state needs to be reverted. The classic group join timeout does nothing if the group is a consumer group. > when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request Yes correct, and the timeout here is for the member instead of the whole group. For each member, the rebalance will be something like - heartbeat -- if there's an ongoing rebalance, schedule the join timeout - join -- cancel the join timeout; schedule the sync timeout - sync -- cancel the sync timeout; maybe schedule a join timeout if a new rebalance ongoing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608403874 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: so this is similar to the offset fetch path. how come we want to access the uncommitted state now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608287216 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2180,36 +2202,58 @@ private void cancelConsumerGroupRebalanceTimeout( } /** - * Schedules a sync timeout for the member. + * Schedules a join timeout for the member. * * @param groupId The group id. * @param memberId The member id. * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupSyncTimeout( +private void scheduleConsumerGroupJoinTimeout( String groupId, String memberId, int rebalanceTimeoutMs ) { -String key = consumerGroupSyncKey(groupId, memberId); -timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { -try { -ConsumerGroup group = consumerGroup(groupId); -ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -log.info("[GroupId {}] Member {} fenced from the group because its session expired.", -groupId, memberId); +timer.schedule( +consumerGroupJoinKey(groupId, memberId), +rebalanceTimeoutMs, +TimeUnit.MILLISECONDS, +true, +() -> consumerGroupFenceMemberOperation(groupId, memberId, "the member failed to join within timeout.") Review Comment: nit: `the classic member failed to join within the rebalance timeout`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: I think that we have an issue here. The issue is that the HB continues while the rebalance is on-going so it will keep overriding the timer. I wonder if we could add the timer only if it does not exist yet (e.g. `scheduleIfAbsent`). ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1753,6 +1753,7 @@ private CoordinatorResult classicGroupJoinToConsumerGro CompletableFuture appendFuture = new CompletableFuture<>(); appendFuture.whenComplete((__, t) -> { if (t == null) { +cancelConsumerGroupJoinTimeout(groupId, response.memberId()); Review Comment: Could we cover this change in an existing unit test? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2089,25 +2123,13 @@ private void scheduleConsumerGroupSessionTimeout( String memberId, int sessionTimeoutMs ) { -String key = consumerGroupSessionTimeoutKey(groupId, memberId); -timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { -try { -ConsumerGroup group = consumerGroup(groupId); -ConsumerGroupMember member =
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1607136651 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -420,12 +420,11 @@ public CompletableFuture heartbeat( ); } -// Using a read operation is okay here as we ignore the last committed offset in the snapshot registry. -// This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves. -return runtime.scheduleReadOperation( +return runtime.scheduleWriteOperation( "classic-group-heartbeat", topicPartitionFor(request.groupId()), -(coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) +Duration.ofMillis(config.offsetCommitTimeoutMs), Review Comment: not necessarily a comment for this PR but i wonder if we should change the name of this config since it's being used for all writes. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param contextThe request context. * @param requestThe actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ -public HeartbeatResponseData classicGroupHeartbeat( +public CoordinatorResult classicGroupHeartbeat( Review Comment: maybe i'm missing something but i don't see where we actually initialize CoordinatorResult with records to write to the log ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { +error = Errors.REBALANCE_IN_PROGRESS; +scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: we are saying that we cancel the join timeout when we first convert to consumer group, then when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request. is my understanding correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org