dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397540


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+
+        /**
+         * TODO:
+         *      joinGroup - sync timeout
+         *      syncGroup - (join timeout)
+         *      heartbeat - (join timeout)
+         *      => scheduleConsumerGroupRebalanceTimeout is not necessary
+         */

Review Comment:
   Yes correct.
   I'll make sure we remove all the the TODOs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to