dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183181


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.

Review Comment:
   If a static member joins without a member id, we replace any existing member 
with the same instance id; 
   if the static member joins with a member id, then we treat it as a normal 
rejoin if the static member exists and the member id matches, and throw an 
exception otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to