bbejeck commented on code in PR #18044:
URL: https://github.com/apache/kafka/pull/18044#discussion_r1870232370


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -148,7 +149,7 @@ public int hashCode() {
 
     private final String groupId;
 
-    private String memberId = "";
+    private final String memberId = Uuid.randomUuid().toString();

Review Comment:
   member id generated here, can be final since it's not going to change for 
the lifetime of the client



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -325,24 +329,24 @@ public void testStreamsRequestValidation() {
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)
+                .setMemberId(memberId)
                 .setRebalanceTimeoutMs(5000)
                 .setActiveTasks(Collections.emptyList())
                 .setStandbyTasks(Collections.emptyList())));
         assertEquals("WarmupTasks must be empty when (re-)joining.", 
ex.getMessage());
 
-        // MemberId must be non-empty in all requests except for the first one 
where it
-        // could be empty (epoch != 0).
+        // MemberId must be non-empty in all requests
         ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
-                .setMemberEpoch(1)));
+                .setMemberEpoch(0)));

Review Comment:
   Member id is present in first heartbeat request, so member epoch is set to 0 
here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -449,7 +454,7 @@ public void 
testJoiningNonExistingStreamsGroupNoMissingTopics() {
         assertEquals(Errors.NONE.code(), response.errorCode());
         assertFalse(response.memberId().isEmpty());
         assertEquals(1, response.memberEpoch());
-        assertTrue(response.activeTasks().isEmpty());
+        assertFalse(response.activeTasks().isEmpty());

Review Comment:
   Updated this condition since the member id is present in the initial 
request, the client doesn't need to make a second request solely for task 
assignment.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -930,8 +930,4 @@ public void onAllTasksLostCallbackCompleted(final 
StreamsOnAllTasksLostCallbackC
             future.complete(null);
         }
     }
-
-    private String memberIdInfoForLog() {
-        return (memberId == null || memberId.isEmpty()) ? "<no ID>" : memberId;
-    }

Review Comment:
   Removed - as stated above.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -464,29 +469,34 @@ public void 
testJoiningNonExistingStreamsGroupNoMissingTopics() {
             .build();
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member)));
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1)));
-        assertTrue(coordinatorRecords.contains(
-            
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
                 groupId,
                 member.memberId(),
-                Collections.emptyMap(),
+                Map.of(subtopologyId, new HashSet<>(List.of(0, 1, 2))),

Review Comment:
   Same here regarding task assignment



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -464,29 +469,34 @@ public void 
testJoiningNonExistingStreamsGroupNoMissingTopics() {
             .build();
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member)));
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1)));
-        assertTrue(coordinatorRecords.contains(
-            
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
                 groupId,
                 member.memberId(),
-                Collections.emptyMap(),
+                Map.of(subtopologyId, new HashSet<>(List.of(0, 1, 2))),
                 Collections.emptyMap(),
                 Collections.emptyMap()
-            )
-        ));
+        )));
         assertTrue(coordinatorRecords.contains(
             CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(
                 groupId,
                 topology
             )
         ));
+
+        StreamsGroupHeartbeatRequestData.TaskIds ownedActiveTasks = new 
StreamsGroupHeartbeatRequestData.TaskIds();
+        ownedActiveTasks.setSubtopologyId(subtopologyId);
+        ownedActiveTasks.setPartitions(List.of(0, 1, 2));
         StreamsGroupMember updatedMember = new 
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member)
             .withTargetAssignment(
                 1,
-                new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap())
+                new 
org.apache.kafka.coordinator.group.streams.Assignment(Map.of(subtopologyId, 
Set.of(0, 1, 2)), Collections.emptyMap(), Collections.emptyMap())
             )
-            .withOwnedActiveTasks(Collections.emptyList())
+            .withOwnedActiveTasks(List.of(ownedActiveTasks))
             .withOwnedStandbyTasks(Collections.emptyList())
             .withOwnedWarmupTasks(Collections.emptyList())
+            .withCurrentActiveTaskProcessId((s, p) -> null)
+            .withCurrentStandbyTaskProcessIds((s, p) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((s, p) -> Collections.emptySet())

Review Comment:
   Also, here, all changes made to reflect that assignment can occur in the 
initial request.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -292,11 +293,12 @@ public void testStreamsRequestValidation() {
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("   ")));
         assertEquals("GroupId can't be empty.", ex.getMessage());
-
+        
         // RebalanceTimeoutMs must be present in the first request (epoch == 
0).
         ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
+                .setMemberId(memberId)

Review Comment:
   Adding the member id as it will be present in the initial heartbeat request 
- same for below



##########
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java:
##########
@@ -37,6 +37,14 @@ public class StreamsGroupHeartbeatRequest extends 
AbstractRequest {
      */
     public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
 
+    /**
+     * The version from which consumers are required to generate their own 
member id.
+     *
+     * <p>Starting from this version, member id must be generated by the 
consumer instance
+     * instead of being provided by the server.</p>
+     */
+    public static final int 
STREAMS_CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
+

Review Comment:
   Following the pattern from the KIP-1082 implementation



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -629,14 +641,10 @@ public void 
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
                 )
         );
         StreamsGroupHeartbeatRequestData heartbeatToCreateGroup =
-            buildFirstStreamsGroupHeartbeatRequest(groupId, topology, 
processId, rebalanceTimeoutMs);
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topology, 
processId, rebalanceTimeoutMs, memberId);
         prepareStreamsGroupAssignment(assignor, 
heartbeatToCreateGroup.memberId(), "subtopology-id");
-        context.streamsGroupHeartbeat(heartbeatToCreateGroup);

Review Comment:
   Removed this initial heartbeat request as the first one contains the member 
id making assignment and group creation possible with the first heartbeat 
request.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -717,14 +725,15 @@ public void 
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
     }
 
     private StreamsGroupHeartbeatRequestData 
buildFirstStreamsGroupHeartbeatRequest(
-        final String groupId,
-        final StreamsGroupHeartbeatRequestData.Topology topology,
-        final String processId,
-        final int rebalanceTimeoutMs) {
+            final String groupId,
+            final StreamsGroupHeartbeatRequestData.Topology topology,
+            final String processId,
+            final int rebalanceTimeoutMs,
+            final String memberId) {
 
         return new StreamsGroupHeartbeatRequestData()
             .setGroupId(groupId)
-            .setMemberId("")
+            .setMemberId(memberId)

Review Comment:
   member id always present in heartbeat requests now



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -676,13 +684,13 @@ public void 
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
         StreamsGroupHeartbeatResponseData response = 
result.response().responseData();
         assertEquals(Errors.NONE.code(), response.errorCode());
         assertFalse(response.memberId().isEmpty());
-        assertEquals(2, response.memberEpoch());
+        assertEquals(1, response.memberEpoch());
         assertTrue(response.activeTasks().isEmpty());
         assertTrue(response.standbyTasks().isEmpty());
         assertTrue(response.warmupTasks().isEmpty());
         List<CoordinatorRecord> coordinatorRecords = result.records();
-        assertEquals(5, coordinatorRecords.size());
-        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 2)));
+        assertEquals(7, coordinatorRecords.size());
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 1)));

Review Comment:
   Member epoch reduced due to group creation and assignment occurring in first 
epoch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to