dajac commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1808866121
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3199,7 +3201,7 @@ public
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
RequestContext context,
ConsumerGroupHeartbeatRequestData request
) throws ApiException {
- throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
+ throwIfConsumerGroupHeartbeatRequestIsInvalid(request,
context.apiVersion());
if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH ||
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
Review Comment:
Do we need to update share group logic too?
##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -19,7 +19,8 @@
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
// Version 1 adds SubscribedTopicRegex (KIP-848).
- "validVersions": "0-1",
+ // Version 2 requires the consumer to generate their own Member ID
+ "validVersions": "0-2",
Review Comment:
We must be careful with this change. Version 1 is actually unstable so if we
bump it, it would become stable. This is not what we want. Here, I suggest to
not bump and to modify version 1. We can do this because it is not stable yet.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,13 +1293,12 @@ private void throwIfNull(
* Validates the request.
*
* @param request The request to validate.
- *
+ * @param apiVersion The version of ConsumerGroupHeartbeat RPC
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
- ConsumerGroupHeartbeatRequestData request
- ) throws InvalidRequestException, UnsupportedAssignorException {
+ ConsumerGroupHeartbeatRequestData request, short apiVersion) throws
InvalidRequestException, UnsupportedAssignorException {
Review Comment:
nit: Let's keep the style of the code coherent with the existing code please.
```
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
ConsumerGroupHeartbeatRequestData request,
short apiVersion
) throws InvalidRequestException, UnsupportedAssignorException {
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10225,53 +10251,61 @@ public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
.setClassicMemberMetadata(null)
.build();
- List<CoordinatorRecord> expectedRecords = Arrays.asList(
- // The existing classic group tombstone.
-
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
- // Create the new consumer group with the static member.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedClassicMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, expectedClassicMember.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember),
-
- // Remove the static member because the rejoining member replaces
it.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
- // Create the new static member.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedReplacingConsumerMember),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedReplacingConsumerMember),
-
- // The static member rejoins the new consumer group.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedFinalConsumerMember),
-
- // The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1));
- }
- }),
+ List<CoordinatorRecord> expectedRecords = new ArrayList<>();
+ // The existing classic group tombstone.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+
+ // Create the new consumer group with the static member.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedClassicMember));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
0));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId,
+ expectedClassicMember.assignedPartitions()));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember));
+
+ // Remove the static member because the rejoining member replaces it.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId));
+
+ // Create the new static member.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedReplacingConsumerMember));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId,
+ mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedReplacingConsumerMember));
+
+ // The static member rejoins the new consumer group.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedFinalConsumerMember));
+
+ // The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+ Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
1))));
+
+ // Newly joining static member bumps the group epoch. A new target
assignment is computed.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1));
+
+ // If the memberId is generated by the consumer itself, the consumer
should retain this memberId.
+ // As a result, the record won't contain a new target assignment
record.
+ // If the memberId is not consumer-generated, add a new target
assignment record to the expected records,
+ // since a different memberId will be considered as a new member.
+ if (!isConsumerGeneratedMemberId) {
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId,
+ mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+ }
- // Newly joining static member bumps the group epoch. A new target
assignment is computed.
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1));
- // The newly created static member takes the assignment from the
existing member.
- // Bump its member epoch and transition to STABLE.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedFinalConsumerMember)
- );
+ // The newly created static member takes the assignment from the
existing member.
+ // Bump its member epoch and transition to STABLE.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedFinalConsumerMember));
assertRecordsEquals(expectedRecords, result.records());
context.assertSessionTimeout(groupId, newMemberId, 45000);
}
- @Test
- public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testConsumerGroupHeartbeatFromExistingClassicStaticMember(boolean
isConsumerGeneratedMemberId) {
Review Comment:
We should perhaps use the last version in this test and generate the member
id on the client side.
##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -569,7 +569,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
protected def consumerGroupHeartbeat(
groupId: String,
- memberId: String = "",
+ memberId: String = Uuid.randomUuid().toString,
Review Comment:
You can pass the version and use it. See `commitOffset` in this file as an
example. We should also update ConsumerGroupHeartbeatRequestTest.scala btw.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -599,13 +599,20 @@ public MemberState consumerGroupMemberState(
.state();
}
+
public CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
+ ) {
+ return
this.consumerGroupHeartbeat(ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(),
request);
+ }
+
+ public CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> consumerGroupHeartbeat(
+ short apiVersion, ConsumerGroupHeartbeatRequestData request
Review Comment:
nit1: Let's put apiVersion as the second argument. This is usually what we
do.
nit2: Let's put one argument per line to follow the style of the code in
this file.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -266,7 +275,9 @@ public String groupId() {
}
/**
- * @return Member ID assigned by the server to this member when it joins
the consumer group.
+ * Returns the Member ID that is generated at startup and remains
unchanged for the entire lifetime of the process.
+ *
+ * @return Member ID that does not change during the process's lifetime.
Review Comment:
nit: Let's keep the old style to reduce the comment.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10117,8 +10130,9 @@ public void
testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
assertEquals(group,
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
}
- @Test
- public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember(boolean
isConsumerGeneratedMemberId) {
Review Comment:
Why are we changing those tests in particular?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1317,6 +1316,9 @@ private void
throwIfConsumerGroupHeartbeatRequestIsInvalid(
if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
}
+ if (apiVersion >= 2) {
Review Comment:
nit: Let's add a constant to ConsumerGroupHeartbeatRequest for the required
version. It will make the code clearer too.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -599,13 +599,20 @@ public MemberState consumerGroupMemberState(
.state();
}
+
Review Comment:
nit: We can remove this empty line.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10225,53 +10251,61 @@ public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
.setClassicMemberMetadata(null)
.build();
- List<CoordinatorRecord> expectedRecords = Arrays.asList(
- // The existing classic group tombstone.
-
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
- // Create the new consumer group with the static member.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedClassicMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, expectedClassicMember.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember),
-
- // Remove the static member because the rejoining member replaces
it.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
- // Create the new static member.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedReplacingConsumerMember),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedReplacingConsumerMember),
-
- // The static member rejoins the new consumer group.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedFinalConsumerMember),
-
- // The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1));
- }
- }),
+ List<CoordinatorRecord> expectedRecords = new ArrayList<>();
+ // The existing classic group tombstone.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+
+ // Create the new consumer group with the static member.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedClassicMember));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
0));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId,
+ expectedClassicMember.assignedPartitions()));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember));
+
+ // Remove the static member because the rejoining member replaces it.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId));
+
+ // Create the new static member.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedReplacingConsumerMember));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId,
+ mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedReplacingConsumerMember));
+
+ // The static member rejoins the new consumer group.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedFinalConsumerMember));
+
+ // The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+ Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
1))));
+
+ // Newly joining static member bumps the group epoch. A new target
assignment is computed.
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1));
+
+ // If the memberId is generated by the consumer itself, the consumer
should retain this memberId.
+ // As a result, the record won't contain a new target assignment
record.
+ // If the memberId is not consumer-generated, add a new target
assignment record to the expected records,
+ // since a different memberId will be considered as a new member.
+ if (!isConsumerGeneratedMemberId) {
+
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId,
+ mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+ }
Review Comment:
Hum... I am not sure about this change. My understanding is that we wanted
to test a static member rejoining, assuming after a restart so the member id
must be different.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]