lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3272768189
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember
getOrMaybeSubscribeStaticConsumerGroupMember(
}
}
+ /**
+ * Gets an existing static Streams group member or creates/replaces one
for static membership.
+ *
+ * If the member is joining:
+ * 1. Creates a new static member when no member exists for the instance
ID.
+ * 2. Replaces the previous static member when the instance ID is released.
+ *
+ * If the member is not joining, validates static member identity and
member epoch
+ * and returns the existing static member.
+ *
+ * @param group The streams group.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The static instance id from the request.
+ * @param ownedActiveTasks The owned active tasks from the request.
+ * @param ownedStandbyTasks The owned standby tasks from the request.
+ * @param ownedWarmupTasks The owned warmup tasks from the request.
+ * @param memberIsJoining Whether the member is joining (epoch 0).
+ * @param records The records accumulator used for member
replacement.
+ *
+ * @return The resolved streams group member.
+ */
+ private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(
+ StreamsGroup group,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
+ boolean memberIsJoining,
+ List<CoordinatorRecord> records
+ ) {
+ StreamsGroupMember existingStaticMemberOrNull =
group.staticMember(instanceId);
+ if (memberIsJoining) {
+ // A new static member joins or the existing static member rejoins.
+ if (existingStaticMemberOrNull == null) {
+ // New static member.
+ StreamsGroupMember newMember =
group.getOrCreateDefaultMember(memberId);
+ log.info("[GroupId {}][MemberId {}] Static member {} with
instance id {} joins the streams group.",
+ group.groupId(), memberId, memberId, instanceId);
+ return newMember;
+ } else {
+ throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull,
group.groupId(), memberId, instanceId);
+
+ // Copy the member but with its new member id.
+ StreamsGroupMember newMember = new
StreamsGroupMember.Builder(existingStaticMemberOrNull, memberId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ // Generate the records to replace the member. We don't care
about the regular expression
+ // here because it is taken care of later after the static
membership replacement.
Review Comment:
nit: this comment is carried over from the consumer helper - streams has no
regex subscriptions
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3189,6 +3283,81 @@ private ConsumerGroupMember
getOrMaybeSubscribeStaticConsumerGroupMember(
}
}
+ /**
+ * Gets an existing static Streams group member or creates/replaces one
for static membership.
+ *
+ * If the member is joining:
+ * 1. Creates a new static member when no member exists for the instance
ID.
+ * 2. Replaces the previous static member when the instance ID is released.
+ *
+ * If the member is not joining, validates static member identity and
member epoch
+ * and returns the existing static member.
+ *
+ * @param group The streams group.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The static instance id from the request.
+ * @param ownedActiveTasks The owned active tasks from the request.
+ * @param ownedStandbyTasks The owned standby tasks from the request.
+ * @param ownedWarmupTasks The owned warmup tasks from the request.
+ * @param memberIsJoining Whether the member is joining (epoch 0).
+ * @param records The records accumulator used for member
replacement.
+ *
+ * @return The resolved streams group member.
+ */
+ private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember(
+ StreamsGroup group,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
+ boolean memberIsJoining,
+ List<CoordinatorRecord> records
+ ) {
+ StreamsGroupMember existingStaticMemberOrNull =
group.staticMember(instanceId);
+ if (memberIsJoining) {
+ // A new static member joins or the existing static member rejoins.
+ if (existingStaticMemberOrNull == null) {
+ // New static member.
+ StreamsGroupMember newMember =
group.getOrCreateDefaultMember(memberId);
+ log.info("[GroupId {}][MemberId {}] Static member {} with
instance id {} joins the streams group.",
+ group.groupId(), memberId, memberId, instanceId);
+ return newMember;
+ } else {
+ throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull,
group.groupId(), memberId, instanceId);
+
+ // Copy the member but with its new member id.
+ StreamsGroupMember newMember = new
StreamsGroupMember.Builder(existingStaticMemberOrNull, memberId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ // Generate the records to replace the member. We don't care
about the regular expression
+ // here because it is taken care of later after the static
membership replacement.
+ replaceStreamsMember(records, group,
existingStaticMemberOrNull, newMember);
+
+ log.info("[GroupId {}][MemberId {}] Static member with
instance id {} re-joins the stream group " +
Review Comment:
nit: stream -> streams
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3530,6 +3699,7 @@ private boolean hasMemberSubscriptionChanged(
* when a member is first created.
*
* @param groupId The group id.
+ * @param instanceId The instance id.
Review Comment:
can we expand this? null vs non-null instanceId changes the return semantics
here (always true on metadata change vs only on epoch-relevant change per
KIP-1071), which is non-obvious from the doc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]