dajac commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1819075308


##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java:
##########
@@ -37,6 +37,14 @@ public class ConsumerGroupHeartbeatRequest extends 
AbstractRequest {
      */
     public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
 
+    /**
+     * The version from which consumers are required to generate their own 
memberId.

Review Comment:
   nit: Let's use `member id` here and below.



##########
clients/src/main/resources/common/message/DescribeGroupsResponse.json:
##########
@@ -48,7 +48,7 @@
       { "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+",
         "about": "The group members.", "fields": [
         { "name": "MemberId", "type": "string", "versions": "0+",
-          "about": "The member ID assigned by the group coordinator." },
+          "about": "The member ID is either assigned by the group coordinator 
or generated by consumer itself." },

Review Comment:
   nit: I would simplify it to `The member id`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,21 +1294,28 @@ private void throwIfNull(
      * Validates the request.
      *
      * @param request The request to validate.
-     *
+     * @param apiVersion The version of ConsumerGroupHeartbeat RPC
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
     private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
-        ConsumerGroupHeartbeatRequestData request
+            ConsumerGroupHeartbeatRequestData request,
+            short apiVersion
     ) throws InvalidRequestException, UnsupportedAssignorException {
+        if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
+            request.memberEpoch() > 0 ||
+            request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH
+        ) {
+            throwIfNull(request.memberId(), "MemberId can't be null.");

Review Comment:
   Can it be null? It does not seem to be possible from the RPC definition.



##########
clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json:
##########
@@ -28,7 +28,7 @@
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The group identifier." },
     { "name": "MemberId", "type": "string", "versions": "0+",
-      "about": "The member ID generated by the coordinator. The member ID must 
be kept during the entire lifetime of the member." },
+      "about": "The member ID generated by the consumer. The member ID must be 
kept during the entire lifetime of the consumer process." },

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -82,15 +84,17 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     // We test DeleteGroups on empty and non-empty groups. Here we create the 
non-empty group.
     joinConsumerGroup(
       groupId = "grp-non-empty",
-      useNewProtocol = useNewProtocol
+      useNewProtocol = useNewProtocol,
+      memberId = Uuid.randomUuid.toString

Review Comment:
   It is a bit weird to have the member id here because the classic protocol 
does not use it. I wonder if we should just hide this detail and generate the 
member id within the joinConsumerGroup. What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -944,23 +941,20 @@ public boolean sameRequest(final OffsetFetchRequestState 
request) {
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
 
             OffsetFetchRequest.Builder builder;
-            if (memberInfo.memberId.isPresent() && 
memberInfo.memberEpoch.isPresent()) {
-                builder = new OffsetFetchRequest.Builder(
-                        groupId,
-                        memberInfo.memberId.get(),
-                        memberInfo.memberEpoch.get(),
-                        true,
-                        new ArrayList<>(this.requestedPartitions),
-                        throwOnFetchStableOffsetUnsupported);
-            } else {
-                // Building request without passing member ID/epoch to leave 
the logic to choose
-                // default values when not present on the request builder.
-                builder = new OffsetFetchRequest.Builder(
-                        groupId,
-                        true,
-                        new ArrayList<>(this.requestedPartitions),
-                        throwOnFetchStableOffsetUnsupported);
-            }
+            // Building request without passing member ID/epoch to leave the 
logic to choose
+            // default values when not present on the request builder.
+            builder = memberInfo.memberEpoch.map(epoch -> new 
OffsetFetchRequest.Builder(
+                            groupId,
+                            memberInfo.memberId,
+                            epoch,
+                            true,
+                            new ArrayList<>(this.requestedPartitions),
+                            throwOnFetchStableOffsetUnsupported))
+                    .orElseGet(() -> new OffsetFetchRequest.Builder(
+                            groupId,
+                            true,
+                            new ArrayList<>(this.requestedPartitions),
+                            throwOnFetchStableOffsetUnsupported));

Review Comment:
   For my understanding, this means that we only set the member id when we have 
a valid epoch too. In other words, we don't set the member id if we are not 
actively part of a group. Is my understanding correct?
   
   I wonder if we should do the same for the groupMetadata in order to be 
consistent. What do you think?



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -39,7 +39,7 @@
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "The top-level error message, or null if there was no error." },
     { "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+      "about": "The member id is generated by the consumer and provided by the 
consumer for all requests." },

Review Comment:
   I wonder if we should precise that in version 0, it is generated by the 
group coordinator. However in version 1+, it is basically the value received 
from the consumer?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,21 +1294,28 @@ private void throwIfNull(
      * Validates the request.
      *
      * @param request The request to validate.
-     *
+     * @param apiVersion The version of ConsumerGroupHeartbeat RPC
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
     private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
-        ConsumerGroupHeartbeatRequestData request
+            ConsumerGroupHeartbeatRequestData request,
+            short apiVersion

Review Comment:
   nit: The indentation is incorrect. It should be 4 spaces.



##########
clients/src/main/resources/common/message/OffsetFetchRequest.json:
##########
@@ -54,7 +54,7 @@
       { "name": "GroupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
         "about": "The group ID."},
       { "name": "MemberId", "type": "string", "versions": "9+", 
"nullableVersions": "9+", "default": "null", "ignorable": true,
-        "about": "The member ID assigned by the group coordinator if using the 
new consumer protocol (KIP-848)." },
+        "about": "The member ID generated by the consumer if using the new 
consumer protocol (KIP-848, KIP-1082)." },

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10226,48 +10256,48 @@ public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
             .build();
 
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
-            // The existing classic group tombstone.
-            
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
-            // Create the new consumer group with the static member.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedClassicMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedClassicMember),
-
-            // Remove the static member because the rejoining member replaces 
it.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
+                // The existing classic group tombstone.

Review Comment:
   Are they any changes in this block? 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -281,13 +294,17 @@ public void testMemberIdGeneration() {
         ));
 
         CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
-            new ConsumerGroupHeartbeatRequestData()
-                .setGroupId("group-foo")
-                .setMemberEpoch(0)
-                .setServerAssignor("range")
-                .setRebalanceTimeoutMs(5000)
-                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-                .setTopicPartitions(Collections.emptyList()));
+                // The consumer generates its own Member ID starting from 
version 1 of the ConsumerGroupHeartbeat RPC.
+                // Therefore, this test case is specific to earlier versions 
of the RPC.
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId("group-foo")
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList()),
+                (short) (CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION - 1)

Review Comment:
   nit: Let's use `0`. It is more explicit.



##########
core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala:
##########
@@ -242,50 +243,54 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       assignment = assignment(List(0, 1, 2))
     )
 
-    // The joining request with a consumer group member 2 is accepted.
-    val memberId2 = consumerGroupHeartbeat(
-      groupId = groupId,
-      rebalanceTimeoutMs = 5 * 60 * 1000,
-      subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty,
-      expectedError = Errors.NONE
-    ).memberId
-
-    // The group has become a consumer group.
-    assertEquals(
-      List(
-        new ListGroupsResponseData.ListedGroup()
-          .setGroupId(groupId)
-          .setProtocolType("consumer")
-          .setGroupState(ConsumerGroupState.RECONCILING.toString)
-          .setGroupType(Group.GroupType.CONSUMER.toString)
-      ),
-      listGroups(
-        statesFilter = List.empty,
-        typesFilter = List(Group.GroupType.CONSUMER.toString)
+    for (version <- ApiKeys.CONSUMER_GROUP_HEARTBEAT.oldestVersion() to 
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)) {

Review Comment:
   Testing all the versions in this suite does not seem necessary. We can focus 
on testing the last one. However, let's ensure that we test all the versions in 
ConsumerGroupHeartbeatRequestTest.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1335,23 +1342,22 @@ private void 
throwIfConsumerGroupHeartbeatRequestIsInvalid(
      * Validates the ShareGroupHeartbeat request.
      *
      * @param request The request to validate.
-     *
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
     private void throwIfShareGroupHeartbeatRequestIsInvalid(
         ShareGroupHeartbeatRequestData request
     ) throws InvalidRequestException, UnsupportedAssignorException {
+        throwIfNull(request.memberId(), "MemberId can't be null.");

Review Comment:
   ditto. I am not sure that it is necessary.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -281,13 +294,17 @@ public void testMemberIdGeneration() {
         ));
 
         CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
-            new ConsumerGroupHeartbeatRequestData()
-                .setGroupId("group-foo")
-                .setMemberEpoch(0)
-                .setServerAssignor("range")
-                .setRebalanceTimeoutMs(5000)
-                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-                .setTopicPartitions(Collections.emptyList()));
+                // The consumer generates its own Member ID starting from 
version 1 of the ConsumerGroupHeartbeat RPC.
+                // Therefore, this test case is specific to earlier versions 
of the RPC.
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId("group-foo")
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList()),
+                (short) (CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION - 1)

Review Comment:
   nit: Indentation seems incorrect. There are other cases in this file. I let 
you check them.



##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -36,6 +37,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     )
   )
   def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    // TODO fix

Review Comment:
   Is it still relevant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to