This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c66d66dc67b KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533) c66d66dc67b is described below commit c66d66dc67b3aacb60f438bb4c5c1c132e8be4f2 Author: David Jacot <dja...@confluent.io> AuthorDate: Tue Mar 19 20:48:41 2024 +0000 KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533) This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields. Reviewers: Lianet Magrans <liane...@gmail.com>, Jeff Kim <jeff....@confluent.io>, Justine Olshan <jols...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 11 +- .../group/GroupMetadataManagerTest.java | 114 ++++++++++++++++++--- 2 files changed, 105 insertions(+), 20 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 48f0618c55d..0a789fa9630 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1227,10 +1227,13 @@ public class GroupMetadataManager { .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs); // The assignment is only provided in the following cases: - // 1. The member reported its owned partitions; - // 2. The member just joined or rejoined to group (epoch equals to zero); - // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { + // 1. The member sent a full request. It does so when joining or rejoining the group with zero + // as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields + // (rebalanceTimeoutMs, subscribedTopicNames and ownedTopicPartitions) to detect a full request + // as those must be set in a full request. + // 2. The member's assignment has been updated. + boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null); + if (isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index e9304407cdd..43703059915 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -1650,6 +1650,102 @@ public class GroupMetadataManagerTest { .setTopicPartitions(Collections.emptyList()))); } + @Test + public void testConsumerGroupHeartbeatFullResponse() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with an empty consumer group. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .build(); + + // Prepare new assignment for the group. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + } + } + )); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result; + + // A full response should be sent back on joining. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1))))), + result.response() + ); + + // Otherwise, a partial response should be sent back. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().memberEpoch())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + // A full response should be sent back when the member sends + // a full request again. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().memberEpoch()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1))))), + result.response() + ); + } + @Test public void testReconciliationProcess() { String groupId = "fooup"; @@ -1904,16 +2000,7 @@ public class GroupMetadataManagerTest { new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000) - .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(Arrays.asList( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(Arrays.asList(0, 1)), - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(barTopicId) - .setPartitions(Collections.singletonList(0)) - ))), + .setHeartbeatIntervalMs(5000), result.response() ); @@ -3057,12 +3144,7 @@ public class GroupMetadataManagerTest { new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(2) - .setHeartbeatIntervalMs(5000) - .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(Collections.singletonList( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(Arrays.asList(0, 1))))), + .setHeartbeatIntervalMs(5000), result.response() );