lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3272768189


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember 
getOrMaybeSubscribeStaticConsumerGroupMember(
         }
     }
 
+    /**
+     * Gets an existing static Streams group member or creates/replaces one 
for static membership.
+     *
+     * If the member is joining:
+     * 1. Creates a new static member when no member exists for the instance 
ID.
+     * 2. Replaces the previous static member when the instance ID is released.
+     *
+     * If the member is not joining, validates static member identity and 
member epoch
+     * and returns the existing static member.
+     *
+     * @param group                 The streams group.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param instanceId            The static instance id from the request.
+     * @param ownedActiveTasks      The owned active tasks from the request.
+     * @param ownedStandbyTasks     The owned standby tasks from the request.
+     * @param ownedWarmupTasks      The owned warmup tasks from the request.
+     * @param memberIsJoining       Whether the member is joining (epoch 0).
+     * @param records               The records accumulator used for member 
replacement.
+     *
+     * @return The resolved streams group member.
+     */
+    private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(
+            StreamsGroup group,
+            String memberId,
+            int memberEpoch,
+            String instanceId,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
+            boolean memberIsJoining,
+            List<CoordinatorRecord> records
+    ) {
+        StreamsGroupMember existingStaticMemberOrNull = 
group.staticMember(instanceId);
+        if (memberIsJoining) {
+            // A new static member joins or the existing static member rejoins.
+            if (existingStaticMemberOrNull == null) {
+                // New static member.
+                StreamsGroupMember newMember = 
group.getOrCreateDefaultMember(memberId);
+                log.info("[GroupId {}][MemberId {}] Static member {} with 
instance id {} joins the streams group.",
+                        group.groupId(), memberId, memberId, instanceId);
+                return newMember;
+            } else {
+                throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull, 
group.groupId(), memberId, instanceId);
+
+                // Copy the member but with its new member id.
+                StreamsGroupMember newMember = new 
StreamsGroupMember.Builder(existingStaticMemberOrNull, memberId)
+                        .setMemberEpoch(0)
+                        .setPreviousMemberEpoch(0)
+                        .build();
+
+                // Generate the records to replace the member. We don't care 
about the regular expression
+                // here because it is taken care of later after the static 
membership replacement.

Review Comment:
   nit: this comment is carried over from the consumer helper - streams has no 
regex subscriptions



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember 
getOrMaybeSubscribeStaticConsumerGroupMember(
         }
     }
 
+    /**
+     * Gets an existing static Streams group member or creates/replaces one 
for static membership.
+     *
+     * If the member is joining:
+     * 1. Creates a new static member when no member exists for the instance 
ID.
+     * 2. Replaces the previous static member when the instance ID is released.
+     *
+     * If the member is not joining, validates static member identity and 
member epoch
+     * and returns the existing static member.
+     *
+     * @param group                 The streams group.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param instanceId            The static instance id from the request.
+     * @param ownedActiveTasks      The owned active tasks from the request.
+     * @param ownedStandbyTasks     The owned standby tasks from the request.
+     * @param ownedWarmupTasks      The owned warmup tasks from the request.
+     * @param memberIsJoining       Whether the member is joining (epoch 0).
+     * @param records               The records accumulator used for member 
replacement.
+     *
+     * @return The resolved streams group member.
+     */
+    private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(
+            StreamsGroup group,
+            String memberId,
+            int memberEpoch,
+            String instanceId,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
+            List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
+            boolean memberIsJoining,
+            List<CoordinatorRecord> records
+    ) {
+        StreamsGroupMember existingStaticMemberOrNull = 
group.staticMember(instanceId);
+        if (memberIsJoining) {
+            // A new static member joins or the existing static member rejoins.
+            if (existingStaticMemberOrNull == null) {
+                // New static member.
+                StreamsGroupMember newMember = 
group.getOrCreateDefaultMember(memberId);
+                log.info("[GroupId {}][MemberId {}] Static member {} with 
instance id {} joins the streams group.",
+                        group.groupId(), memberId, memberId, instanceId);
+                return newMember;
+            } else {
+                throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull, 
group.groupId(), memberId, instanceId);
+
+                // Copy the member but with its new member id.
+                StreamsGroupMember newMember = new 
StreamsGroupMember.Builder(existingStaticMemberOrNull, memberId)
+                        .setMemberEpoch(0)
+                        .setPreviousMemberEpoch(0)
+                        .build();
+
+                // Generate the records to replace the member. We don't care 
about the regular expression
+                // here because it is taken care of later after the static 
membership replacement.
+                replaceStreamsMember(records, group, 
existingStaticMemberOrNull, newMember);
+
+                log.info("[GroupId {}][MemberId {}] Static member with 
instance id {} re-joins the stream group " +

Review Comment:
   nit: stream -> streams



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3530,6 +3699,7 @@ private boolean hasMemberSubscriptionChanged(
      * when a member is first created.
      *
      * @param groupId       The group id.
+     * @param instanceId    The instance id.   

Review Comment:
   can we expand this? null vs non-null instanceId changes the return semantics 
here (always true on metadata change vs only on epoch-relevant change per 
KIP-1071), which is non-obvious from the doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to