lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3273041114
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1718,17 +1759,41 @@ private void
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
}
}
+ /**
+ * Validates whether a static member exists for the given instanceId.
+ *
+ * @param staticMember The static member in the group.
+ * @param receivedInstanceId The instance id received in the request.
+ *
+ * @throws UnknownMemberIdException if no static member exists in the
group against the provided instance id.
+ */
+ private void throwIfStaticMemberIsUnknown(StreamsGroupMember staticMember,
String receivedInstanceId) {
+ if (staticMember == null) {
+ throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " +
receivedInstanceId + " is unknown.");
+ }
+ }
+
/**
* Checks whether the streams group can accept a new member or not based
on the
* max group size defined.
*
* @param group The streams group.
+ * @param instanceId The instance id.
*
* @throws GroupMaxSizeReachedException if the maximum capacity has been
reached.
*/
private void throwIfStreamsGroupIsFull(
Review Comment:
consumer's throwIfConsumerGroupIsFull takes memberId to detect rejoin
(`group.hasMember(memberId)`), this one takes instanceId. Could we align -
either pass both or follow the same shape?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember
getOrMaybeSubscribeStaticConsumerGroupMember(
}
}
+ /**
+ * Gets an existing static Streams group member or creates/replaces one
for static membership.
+ *
+ * If the member is joining:
+ * 1. Creates a new static member when no member exists for the instance
ID.
+ * 2. Replaces the previous static member when the instance ID is released.
+ *
+ * If the member is not joining, validates static member identity and
member epoch
+ * and returns the existing static member.
+ *
+ * @param group The streams group.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The static instance id from the request.
+ * @param ownedActiveTasks The owned active tasks from the request.
+ * @param ownedStandbyTasks The owned standby tasks from the request.
+ * @param ownedWarmupTasks The owned warmup tasks from the request.
+ * @param memberIsJoining Whether the member is joining (epoch 0).
+ * @param records The records accumulator used for member
replacement.
+ *
+ * @return The resolved streams group member.
+ */
+ private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(
Review Comment:
nit: continuation indentation here (and in replaceStreamsMember,
streamsGroupStaticMemberGroupLeave, and the new throwIfXxx overloads) is 8
spaces, but getOrMaybeCreateDynamicStreamsGroupMember above and the consumer
counterparts use 4. Worth normalizing
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1685,6 +1687,26 @@ private void
throwIfInstanceIdIsUnreleased(ConsumerGroupMember member, String gr
}
}
+ /**
+ * Validates if the received instanceId has been released from the group
+ *
+ * @param member The streams group member.
+ * @param groupId The streams group id.
+ * @param receivedMemberId The member id received in the request.
+ * @param receivedInstanceId The instance id received in the request.
+ *
+ * @throws UnreleasedInstanceIdException if the instance id received in
the request is still in use by an existing static member.
+ */
+ private void throwIfInstanceIdIsUnreleased(StreamsGroupMember member,
String groupId, String receivedMemberId, String receivedInstanceId) {
Review Comment:
LEAVE_GROUP_STATIC_MEMBER_EPOCH is already statically imported (from
ConsumerGroupHeartbeatRequest, same value) - the consumer overload right above
uses the static import. Could we do the same here for consistency?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8966,6 +9240,35 @@ private Map<String, String>
streamsGroupAssignmentConfigs(String groupId) {
));
}
+ private boolean hasUserEndpointChanged(StreamsGroupMember maybeOldMember,
StreamsGroupMember updatedMember) {
Review Comment:
could this be static? hasEpochRelevantMemberConfigChanged next to it is -
neither uses instance state
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4276,6 +4475,45 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
);
}
+ /**
+ * Handles the case when a static member decides to leave the group.
+ * The member is not actually fenced from the group, and instead it's
+ * member epoch is updated to -2 to reflect that a member using the given
+ * instance id decided to leave the group and would be back within session
+ * timeout.
+ *
+ * @param group The group.
+ * @param member The static member in the group for the instance id.
+ *
+ * @return A CoordinatorResult with a single record signifying that the
static member is leaving.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupStaticMemberGroupLeave(
Review Comment:
consumerGroupStaticMemberGroupLeave doesn't do this state cleanup (UNREVOKED
-> STABLE). The streams behavior is arguably more correct, but worth a code
comment explaining why we need it here - otherwise a future "align with
consumer" pass might strip it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]