dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1579345622


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   What's the reason for doing this only if the member is not new?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);

Review Comment:
   I think that we should rather do this after the request/group/member 
validations.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.

Review Comment:
   I'd like to ensure that we are on the same page here. When a static member 
joins, it replaces any existing member with the same instance id, isn't it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }

Review Comment:
   Could we keep this in common with the classic path?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +

Review Comment:
   nit: Let's use the same format than the other logs.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   In this case, I wonder if we should just remove the member without 
cancelling the timers. The issue is that the timers are not reverted back if 
the operation fails. What's your take on this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);

Review Comment:
   Should we log the member id of the fenced member too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2131,7 +2471,29 @@ public void replay(
      *
      * @return The result that contains records to append if the join group 
phase completes.
      */
-    public CoordinatorResult<Void, Record> classicGroupJoin(
+    public CoordinatorResult<Void, Record> groupJoin(

Review Comment:
   I feel like that it would be great if we could keep the previous name here 
because the classic protocol is used in all cases.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+
+                ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   There are parts in common for almost all the branches (e.g. 
validateGenerationIdAndGetOwnedPartition and 
throwIfClassicProtocolIsNotSupported). I wonder if we could do those afterwards 
in all cases in order to reduce the complexity of those branches. Thoughts?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+
+                ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = false;
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicNames());
+                bumpGroupEpoch = true;
+            }
+
+            if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicRegex());
+                bumpGroupEpoch = true;
+            }
+        }
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }
+
+        // TODO: what will happen to the client if the generation id in 
response is 0?

Review Comment:
   I suppose that it will never happen because we never have a consumer group 
staying at epoch 0. If it would happen, I think that the client does not care. 
What was your concern?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+
+                ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = false;
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicNames());
+                bumpGroupEpoch = true;
+            }
+
+            if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicRegex());
+                bumpGroupEpoch = true;
+            }
+        }
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }
+
+        // TODO: what will happen to the client if the generation id in 
response is 0?
+        responseFuture.complete(new JoinGroupResponseData()
+            .setMemberId(updatedMember.memberId())
+            .setGenerationId(updatedMember.memberEpoch())
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocolName(protocols.stream().findAny().get().name())
+        );

Review Comment:
   This is incorrect. The issue is that it would send the response before the 
records are persisted/committed. I wonder if we could rather set the response 
to the `CoordinatorResult` below. Have you considered this?
   
   On a similar line of thoughts, I wonder if we could rationalize the flows in 
this method. We sometimes completes the response future, or we throw 
exceptions, or we complete the response future with an error. I wonder if we 
could for instance use exceptions for errors and the CoordinatorResult for the 
responses. I am not sure whether it is actually possible.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates if the received classic member protocols are supported by the 
group.
+     *
+     * @param group         The ConsumerGroup.
+     * @param memberId      The joining member id.
+     * @param protocolType  The joining member protocol type.
+     * @param protocols     The joining member protocol collection.
+     */
+    private void throwIfClassicProtocolIsNotSupported(
+        ConsumerGroup group,
+        String memberId,
+        String protocolType,
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+        }
+    }
+
+    /**
+     * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+     * All the protocols have the same subscription, so the method picks a 
random one.
+     *
+     * @param protocols The JoinGroupRequestProtocolCollection.
+     * @return The Subscription.
+     */
+    private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        try {
+            return ConsumerProtocol.deserializeSubscription(
+                ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+            );
+        } catch (SchemaException e) {
+            throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+        }
+    }
+
+    /**
+     * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+     *
+     * @param member        The joining member.
+     * @param subscription  The Subscription.
+     * @return The owned partitions if valid, otherwise return null.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
validateGenerationIdAndGetOwnedPartition(

Review Comment:
   Overall, I wonder if we could just return the owned partitions without 
thinking too much about them. I need to think about it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates if the received classic member protocols are supported by the 
group.
+     *
+     * @param group         The ConsumerGroup.
+     * @param memberId      The joining member id.
+     * @param protocolType  The joining member protocol type.
+     * @param protocols     The joining member protocol collection.
+     */
+    private void throwIfClassicProtocolIsNotSupported(
+        ConsumerGroup group,
+        String memberId,
+        String protocolType,
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+        }
+    }
+
+    /**
+     * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+     * All the protocols have the same subscription, so the method picks a 
random one.
+     *
+     * @param protocols The JoinGroupRequestProtocolCollection.
+     * @return The Subscription.
+     */
+    private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        try {
+            return ConsumerProtocol.deserializeSubscription(
+                ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+            );
+        } catch (SchemaException e) {
+            throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+        }
+    }
+
+    /**
+     * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+     *
+     * @param member        The joining member.
+     * @param subscription  The Subscription.
+     * @return The owned partitions if valid, otherwise return null.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
validateGenerationIdAndGetOwnedPartition(
+        ConsumerGroupMember member,
+        ConsumerPartitionAssignor.Subscription subscription
+    ) {
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedPartitions =
+            toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics());
+        if (subscription.generationId().isPresent() && 
subscription.generationId().get() == member.memberEpoch()) {
+            return ownedPartitions;
+        } else {
+            // If the generation id is not provided or doesn't match the 
member epoch, it's still safe to
+            // accept the ownedPartitions that is a subset of the assigned 
partition. Otherwise, set the
+            // ownedPartition to be null. When a new assignment is provided, 
the consumer will stop fetching
+            // from and revoke the partitions it does not own.
+            if (isSubset(ownedPartitions, member.assignedPartitions())) {
+                return ownedPartitions;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list 
converted from the TopicPartitions list.
+     */
+    private static List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
toTopicPartitions(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        return ConsumerGroup.topicPartitionMapFromList(partitions, 
topicsImage).entrySet().stream().map(

Review Comment:
   nit: I would not use `topicPartitionMapFromList` here because you basically 
process the partitions twice. It seems that we could combine. I would also 
avoid using the stream API.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2122,6 +2435,33 @@ public void replay(
         }
     }
 
+    /**
+     * Complete the responseFuture with INVALID_SESSION_TIMEOUT error if 
session timeout doesn't
+     * fall in the interval (classicGroupMinSessionTimeoutMs, 
classicGroupMaxSessionTimeoutMs).
+     *
+     * @param memberId          The member id.
+     * @param sessionTimeoutMs  The session timeout.
+     * @param responseFuture    The response future.
+     * @return A boolean indicating whether the session timeout is valid.
+     */
+    private boolean validateClassicGroupSessionTimeout(
+        String memberId,
+        int sessionTimeoutMs,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+            sessionTimeoutMs > classicGroupMaxSessionTimeoutMs

Review Comment:
   nit: It looks like this one could fit on one line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+
+                ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = false;
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicNames());
+                bumpGroupEpoch = true;
+            }
+
+            if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicRegex());
+                bumpGroupEpoch = true;
+            }
+        }
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }
+
+        // TODO: what will happen to the client if the generation id in 
response is 0?
+        responseFuture.complete(new JoinGroupResponseData()
+            .setMemberId(updatedMember.memberId())
+            .setGenerationId(updatedMember.memberEpoch())
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocolName(protocols.stream().findAny().get().name())

Review Comment:
   Could we directly get the first element from `protocols` without using the 
stream API? Thinking about this, I also wonder whether we have any validation 
of the protocols. For instance, what would happen if we receive no protocols?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates if the received classic member protocols are supported by the 
group.
+     *
+     * @param group         The ConsumerGroup.
+     * @param memberId      The joining member id.
+     * @param protocolType  The joining member protocol type.
+     * @param protocols     The joining member protocol collection.
+     */
+    private void throwIfClassicProtocolIsNotSupported(
+        ConsumerGroup group,
+        String memberId,
+        String protocolType,
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+        }
+    }
+
+    /**
+     * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+     * All the protocols have the same subscription, so the method picks a 
random one.
+     *
+     * @param protocols The JoinGroupRequestProtocolCollection.
+     * @return The Subscription.
+     */
+    private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        try {
+            return ConsumerProtocol.deserializeSubscription(
+                ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+            );
+        } catch (SchemaException e) {
+            throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+        }
+    }
+
+    /**
+     * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+     *
+     * @param member        The joining member.
+     * @param subscription  The Subscription.
+     * @return The owned partitions if valid, otherwise return null.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
validateGenerationIdAndGetOwnedPartition(
+        ConsumerGroupMember member,
+        ConsumerPartitionAssignor.Subscription subscription
+    ) {
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedPartitions =
+            toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics());
+        if (subscription.generationId().isPresent() && 
subscription.generationId().get() == member.memberEpoch()) {
+            return ownedPartitions;
+        } else {
+            // If the generation id is not provided or doesn't match the 
member epoch, it's still safe to
+            // accept the ownedPartitions that is a subset of the assigned 
partition. Otherwise, set the
+            // ownedPartition to be null. When a new assignment is provided, 
the consumer will stop fetching
+            // from and revoke the partitions it does not own.
+            if (isSubset(ownedPartitions, member.assignedPartitions())) {
+                return ownedPartitions;
+            } else {
+                return null;

Review Comment:
   Returning `null` maybe an issue here because the CurrentAssignmentBuilder 
may not be able to progress with `null` in some cases. We need to double check.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java:
##########
@@ -43,4 +48,12 @@ public String name() {
     public GroupAssignment assign(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) throws 
PartitionAssignorException {
         return prepareGroupAssignment;
     }
+
+    /**
+     * Package private for testing.
+     */

Review Comment:
   It is actually find to make it public as we are in tests.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> consumerGroupJoin(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions = null;
+
+        if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+            return EMPTY_RESULT;
+        }
+        throwIfConsumerGroupIsFull(group, memberId);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+                // If member id required, send back a response to call for 
another join group request with allocated member id.
+                log.info("Dynamic member with unknown member id joins group 
{}. " +
+                        "Created a new member id {} and requesting the member 
to rejoin with this id.",
+                    group.groupId(), memberId);
+
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+                );
+                return EMPTY_RESULT;
+            } else {
+                // A dynamic member (re-)joins.
+                throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+                newMemberCreated = !group.hasMember(memberId);
+
+                member = group.getOrMaybeCreateMember(memberId, true);
+                if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
+        } else {
+            throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+
+                ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = false;
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicNames());
+                bumpGroupEpoch = true;
+            }
+
+            if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
+                    groupId, memberId, updatedMember.subscribedTopicRegex());
+                bumpGroupEpoch = true;
+            }
+        }

Review Comment:
   This block seems to be an easy one to extract into a method and share it 
with the other code path. Should we do it right away?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to