lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3273041114


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1718,17 +1759,41 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates whether a static member exists for the given instanceId.
+     *
+     * @param staticMember          The static member in the group.
+     * @param receivedInstanceId    The instance id received in the request.
+     *
+     * @throws UnknownMemberIdException if no static member exists in the 
group against the provided instance id.
+     */
+    private void throwIfStaticMemberIsUnknown(StreamsGroupMember staticMember, 
String receivedInstanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " + 
receivedInstanceId + " is unknown.");
+        }
+    }
+
     /**
      * Checks whether the streams group can accept a new member or not based 
on the
      * max group size defined.
      *
      * @param group     The streams group.
+     * @param instanceId   The instance id.
      *
      * @throws GroupMaxSizeReachedException if the maximum capacity has been 
reached.
      */
     private void throwIfStreamsGroupIsFull(

Review Comment:
   consumer's throwIfConsumerGroupIsFull takes memberId to detect rejoin 
(`group.hasMember(memberId)`), this one takes instanceId. Could we align - 
either pass both or follow the same shape?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember 
getOrMaybeSubscribeStaticConsumerGroupMember(
         }
     }
 
+    /**
+     * Gets an existing static Streams group member or creates/replaces one 
for static membership.
+     *
+     * If the member is joining:
+     * 1. Creates a new static member when no member exists for the instance 
ID.
+     * 2. Replaces the previous static member when the instance ID is released.
+     *
+     * If the member is not joining, validates static member identity and 
member epoch
+     * and returns the existing static member.
+     *
+     * @param group                 The streams group.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param instanceId            The static instance id from the request.
+     * @param ownedActiveTasks      The owned active tasks from the request.
+     * @param ownedStandbyTasks     The owned standby tasks from the request.
+     * @param ownedWarmupTasks      The owned warmup tasks from the request.
+     * @param memberIsJoining       Whether the member is joining (epoch 0).
+     * @param records               The records accumulator used for member 
replacement.
+     *
+     * @return The resolved streams group member.
+     */
+    private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(

Review Comment:
   nit: continuation indentation here (and in replaceStreamsMember, 
streamsGroupStaticMemberGroupLeave, and the new throwIfXxx overloads) is 8 
spaces, but getOrMaybeCreateDynamicStreamsGroupMember above and the consumer 
counterparts use 4. Worth normalizing



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1685,6 +1687,26 @@ private void 
throwIfInstanceIdIsUnreleased(ConsumerGroupMember member, String gr
         }
     }
 
+    /**
+     * Validates if the received instanceId has been released from the group
+     *
+     * @param member                The streams group member.
+     * @param groupId               The streams group id.
+     * @param receivedMemberId      The member id received in the request.
+     * @param receivedInstanceId    The instance id received in the request.
+     *
+     * @throws UnreleasedInstanceIdException if the instance id received in 
the request is still in use by an existing static member.
+     */
+    private void throwIfInstanceIdIsUnreleased(StreamsGroupMember member, 
String groupId, String receivedMemberId, String receivedInstanceId) {

Review Comment:
   LEAVE_GROUP_STATIC_MEMBER_EPOCH is already statically imported (from 
ConsumerGroupHeartbeatRequest, same value) - the consumer overload right above 
uses the static import. Could we do the same here for consistency?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8966,6 +9240,35 @@ private Map<String, String> 
streamsGroupAssignmentConfigs(String groupId) {
         ));
     }
 
+    private boolean hasUserEndpointChanged(StreamsGroupMember maybeOldMember, 
StreamsGroupMember updatedMember) {

Review Comment:
   could this be static? hasEpochRelevantMemberConfigChanged next to it is - 
neither uses instance state



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4276,6 +4475,45 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
         );
     }
 
+    /**
+     * Handles the case when a static member decides to leave the group.
+     * The member is not actually fenced from the group, and instead it's
+     * member epoch is updated to -2 to reflect that a member using the given
+     * instance id decided to leave the group and would be back within session
+     * timeout.
+     *
+     * @param group     The group.
+     * @param member    The static member in the group for the instance id.
+     *
+     * @return A CoordinatorResult with a single record signifying that the 
static member is leaving.
+     */
+    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
streamsGroupStaticMemberGroupLeave(

Review Comment:
   consumerGroupStaticMemberGroupLeave doesn't do this state cleanup (UNREVOKED 
-> STABLE). The streams behavior is arguably more correct, but worth a code 
comment explaining why we need it here - otherwise a future "align with 
consumer" pass might strip it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to