dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1404037483


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -691,6 +708,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+        } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+            throwIfNull(request.instanceId(), "InstanceId can't be null for 
Static Member. GroupId: "

Review Comment:
   I would rather use `InstanceId can't be null.` here in order to be 
consistent with the other error messages.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -751,6 +772,53 @@ private void throwIfConsumerGroupIsFull(
         }
     }
 
+    /**
+     * Validates and throws an error when the validation fails for static 
member.
+     * @param groupId     The group id
+     * @param instanceId  The instance id
+     * @param member      The existing static member in the group.
+     * @param memberEpoch The member epoch with which the static member sends 
heartbeat.
+     * @param memberId    The member id with which the member joins now.
+     *
+     * @throws UnknownMemberIdException if member sends heartbeat with a 
non-zero epoch and no static member exists for
+     * the instance id.
+     * @throws org.apache.kafka.common.errors.FencedInstanceIdException If 
member joins with non-zero epoch but there
+     * already exists a static member with a different memberId.
+     * @throws org.apache.kafka.common.errors.UnreleasedInstanceIdException A 
new member is trying to leave the group
+     * but the existing static member hasn't requested leaving the group.
+
+     */
+    private void throwIfStaticMemberValidationFails(

Review Comment:
   I suppose that we could remove this one now. Could we?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -849,21 +922,53 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-        if (memberEpoch == 0) {
-            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+            if (createIfNotExists) {
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            }
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            if (memberEpoch == 0) {
+                // A new static member joins or the existing static member 
rejoins.
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+                } else {
+                    // Static member rejoins with a different member id so it 
should replace
+                    // the previous instance iff the previous member had sent 
a Leave group.
+                    throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+                    // Replace the current member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                }
+            } else {
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(memberId, instanceId, member);

Review Comment:
   nit: I would put `member` as the first argument to be consistent with the 
other helpers.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());
+        }
+    }
+
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String instanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+        }
+    }
+
+    private void removeMemberAndCancelTimers(
+        List<Record> records,
+        String groupId,
+        String memberId
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, groupId, memberId);
+        // Cancel all the timers of the departed static member.
+        cancelTimers(groupId, memberId);
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (instanceId == null) {
+            ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records = consumerGroupFenceMember(group, member);
+        } else {
+            ConsumerGroupMember member = group.staticMember(instanceId);
+            throwIfStaticMemberIsUnknown(member, instanceId);
+            throwIfInstanceIdIsFenced(memberId, instanceId, member);
+            if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
temporarily left the consumer group",

Review Comment:
   nit: `.` at the end.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
         Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
 
         // Prepare the member spec for all members.
-        members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
-            member,
-            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-            subscriptionMetadata
-        )));
+        members.forEach((memberId, member) -> {

Review Comment:
   nit: Let's revert this change as it is not necessary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());
+        }
+    }
+
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String instanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+        }
+    }
+
+    private void removeMemberAndCancelTimers(
+        List<Record> records,
+        String groupId,
+        String memberId
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, groupId, memberId);
+        // Cancel all the timers of the departed static member.
+        cancelTimers(groupId, memberId);
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (instanceId == null) {
+            ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records = consumerGroupFenceMember(group, member);
+        } else {
+            ConsumerGroupMember member = group.staticMember(instanceId);
+            throwIfStaticMemberIsUnknown(member, instanceId);
+            throwIfInstanceIdIsFenced(memberId, instanceId, member);
+            if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
temporarily left the consumer group",
+                    group.groupId(), memberId, instanceId);
+                records.add(consumerGroupStaticMemberGroupLeave(group, 
member));
+            } else {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
left the consumer group",

Review Comment:
   nit: `.` at the end.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Member epoch of the response would be set to -2.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        // The departing static member will have it's epoch set to -2.
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+
+        // Member 2 rejoins the group with the same instance id.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Arrays.asList(
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(fooTopicId)
+                        .setPartitions(Arrays.asList(3, 4, 5)),
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(barTopicId)
+                        .setPartitions(Collections.singletonList(2))
+                ))),
+                rejoinResult.response()
+        );
+
+        ConsumerGroupMember expectedRejoinedMember = new 
ConsumerGroupMember.Builder(member2RejoinId)
+            .setMemberEpoch(10)
+            .setInstanceId(memberId2)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))

Review Comment:
   nit: Indentation.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)

Review Comment:
   Let's replace `-2` with the relevant constant. There are other cases in this 
file.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -766,6 +834,11 @@ private void throwIfMemberEpochIsInvalid(
         int receivedMemberEpoch,
         List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
     ) {
+        // If a static member rejoins, it's previous epoch would be -2. In 
such a
+        // case, we don't need to fence the member.
+        if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+            return;
+        }

Review Comment:
   Is it still necessary with the last implementation?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",

Review Comment:
   I wonder if we could follow the structure of the other log messages here: 
`[GroupId {}] Static Member {} with instance id {}....`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -849,21 +922,53 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-        if (memberEpoch == 0) {
-            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+            if (createIfNotExists) {
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            }
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            if (memberEpoch == 0) {
+                // A new static member joins or the existing static member 
rejoins.
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+                } else {
+                    // Static member rejoins with a different member id so it 
should replace
+                    // the previous instance iff the previous member had sent 
a Leave group.

Review Comment:
   nit: `Leave` -> `leave`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());
+        }
+    }
+
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String instanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+        }
+    }
+
+    private void removeMemberAndCancelTimers(
+        List<Record> records,
+        String groupId,
+        String memberId
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, groupId, memberId);
+        // Cancel all the timers of the departed static member.
+        cancelTimers(groupId, memberId);
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (instanceId == null) {
+            ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records = consumerGroupFenceMember(group, member);
+        } else {
+            ConsumerGroupMember member = group.staticMember(instanceId);
+            throwIfStaticMemberIsUnknown(member, instanceId);
+            throwIfInstanceIdIsFenced(memberId, instanceId, member);
+            if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
temporarily left the consumer group",
+                    group.groupId(), memberId, instanceId);
+                records.add(consumerGroupStaticMemberGroupLeave(group, 
member));

Review Comment:
   nit: Could we also use `records = ` here? With this, we could remove `new 
ArrayList<>()` when `records` is declared, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -849,21 +922,53 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-        if (memberEpoch == 0) {
-            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+            if (createIfNotExists) {
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            }
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            if (memberEpoch == 0) {
+                // A new static member joins or the existing static member 
rejoins.
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+                } else {
+                    // Static member rejoins with a different member id so it 
should replace
+                    // the previous instance iff the previous member had sent 
a Leave group.
+                    throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+                    // Replace the current member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   Should we also log something in this case?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {

Review Comment:
   * Could we move those helpers next to `throwIfMemberEpochIsInvalid`? Could 
we also add some javadoc to each of them?
   * I wonder if we could also find better names for the params because it is 
not clear whether `memberId` and `instanceId` are the ones of the existing 
member or the ones received in the request. We could perhaps use 
`receivedMemberId`, etc. What do you think? This also applies to the other 
helpers.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.

Review Comment:
   Should we also log something here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());
+        }
+    }
+
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String instanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);

Review Comment:
   nit: `Instance id {} is unknown.`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
         Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
 
         // Prepare the member spec for all members.
-        members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
-            member,
-            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-            subscriptionMetadata
-        )));
+        members.forEach((memberId, member) -> {
+            memberSpecs.put(memberId, createAssignmentMemberSpec(
+                member,
+                targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+                subscriptionMetadata
+            ));
+        });
 
         // Update the member spec if updated or deleted members.
         updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
             if (updatedMemberOrNull == null) {
                 memberSpecs.remove(memberId);
             } else {
+                ConsumerGroupMember member = members.get(memberId);
+                Assignment assignment;
+                // A new static member joins and needs to replace an existing 
departed one.
+                if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+                    assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);

Review Comment:
   I am not sure to follow this one. My understanding is that we populate 
`staticMembers` only when `addOrUpdateMember` is called. In the main flow, we 
basically call this only once with the new or updated member.
   
   Let's imagine that a new static member joins. We will add its static id with 
its member id to `staticMembers`. Therefore here, we basically get back its 
member id and end up with no assignment. Did I get this right?
   
   I think that this could work but we would need to pass the `staticMembers` 
mapping from the `ConsumerGroup` to the builder, like we pass the members. If 
we have this, we could use it here to find the previous member with the static 
id if the member is new and has a static id.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());

Review Comment:
   nit: `Static member {} with instance id {} cannot join the group because the 
instance id is owned by member {}.`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());

Review Comment:
   nit: `Static member {} with instance id {} was fenced by member {}.`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -691,6 +735,91 @@ public void testDeleteMember() {
         assertEquals(expectedAssignment, result.targetAssignment());
     }
 
+    @Test
+    public void testStaticMemberReplace() {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+        context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6),
+            mkTopicAssignment(barTopicId, 5, 6)
+        ));
+
+        context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+        context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+        context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());

Review Comment:
   This should not be here. I think that you mix in two different things. 
`addGroupMember` is basically what is used to build what will be passed to 
`withMembers` and `withTargetAssignment` whereas `updateMemberSubscription` is 
for `addOrUpdateMember`. Therefore, the test does not reproduce how we use it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));

Review Comment:
   nit: Indentation seems off here. I think that it should be 4 spaces earlier.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -849,21 +922,53 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-        if (memberEpoch == 0) {
-            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+            if (createIfNotExists) {
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            }
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            if (memberEpoch == 0) {
+                // A new static member joins or the existing static member 
rejoins.
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+                } else {
+                    // Static member rejoins with a different member id so it 
should replace
+                    // the previous instance iff the previous member had sent 
a Leave group.
+                    throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+                    // Replace the current member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());

Review Comment:
   To close on this one, it is indeed correct to set the assigned partitions 
here. Without it, the reconciler checks if the partitions in the target 
assignment are still owned and they are effectively still owned until the 
previous member is removed. This only happens when the records are processed.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -691,6 +735,91 @@ public void testDeleteMember() {
         assertEquals(expectedAssignment, result.targetAssignment());
     }
 
+    @Test
+    public void testStaticMemberReplace() {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+        context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6),
+            mkTopicAssignment(barTopicId, 5, 6)
+        ));
+
+        context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+        context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+        context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+        // Static member 3 leaves
+        context.removeMemberSubscription("member-3", "member-3");
+
+        // Another static member joins with the same instance id as the 
departed one
+        context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   This one is incorrect as well because the newly added member is not added 
via `withMembers`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId 
+ " trying to join group "
+                + groupId + " but the instance id " + instanceId + " is 
already in use by member " + member.memberId());
+        }
+    }
+
+    private void throwIfInstanceIdIsFenced(String memberId, String instanceId, 
ConsumerGroupMember member) {
+        if (!member.memberId().equals(memberId)) {
+            log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                    "is fenced by existing memberId={}",
+                memberId, instanceId, member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " 
for static member with groupInstanceId " + instanceId +
+                " is fenced by existing memberId " + member.memberId());
+        }
+    }
+
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String instanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+        }
+    }
+
+    private void removeMemberAndCancelTimers(
+        List<Record> records,
+        String groupId,
+        String memberId
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, groupId, memberId);
+        // Cancel all the timers of the departed static member.
+        cancelTimers(groupId, memberId);
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (instanceId == null) {
+            ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records = consumerGroupFenceMember(group, member);
+        } else {
+            ConsumerGroupMember member = group.staticMember(instanceId);
+            throwIfStaticMemberIsUnknown(member, instanceId);
+            throwIfInstanceIdIsFenced(memberId, instanceId, member);
+            if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
temporarily left the consumer group",
+                    group.groupId(), memberId, instanceId);
+                records.add(consumerGroupStaticMemberGroupLeave(group, 
member));
+            } else {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
left the consumer group",
+                    group.groupId(), memberId, instanceId);
+                records = consumerGroupFenceMember(group, member);
+            }
+        }
         return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+            .setMemberEpoch(memberEpoch));
+    }
+
+    /**
+     * Handles the case when a static member decides to leave the group.
+     * The member is not actually fenced from the group, and instead it's
+     * member epoch is updated to -2 to reflect that a member using the given
+     * instance id decided to leave the group and would be back within session
+     * timeout.
+     *
+     * @param group      The group.
+     * @param member     The static member in the group for the instance id.
+     *
+     * @return A ConsumerGroupCurrentMemberAssignment record signifying that 
the static member is leaving.
+     */
+    private Record consumerGroupStaticMemberGroupLeave(
+        ConsumerGroup group,
+        ConsumerGroupMember member
+    ) {
+        // We will write a member epoch of -2 for this departing static member.
+        ConsumerGroupMember leavingStaticMember = new 
ConsumerGroupMember.Builder(member)
+            .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)

Review Comment:
   I think that we could also `setPartitionsPendingRevocation` to empty because 
we know that the member has revoked all its partitions when it leaves.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Member epoch of the response would be set to -2.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        // The departing static member will have it's epoch set to -2.
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+
+        // Member 2 rejoins the group with the same instance id.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Arrays.asList(
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(fooTopicId)
+                        .setPartitions(Arrays.asList(3, 4, 5)),
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(barTopicId)
+                        .setPartitions(Collections.singletonList(2))
+                ))),
+                rejoinResult.response()
+        );
+
+        ConsumerGroupMember expectedRejoinedMember = new 
ConsumerGroupMember.Builder(member2RejoinId)
+            .setMemberEpoch(10)
+            .setInstanceId(memberId2)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecordsAfterRejoin = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedRejoinedMember),
+            RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2))),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedRejoinedMember)
+        );
+
+        assertRecordsEquals(expectedRecordsAfterRejoin, 
rejoinResult.records());
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId2);
+        context.assertNoRevocationTimeout(groupId, memberId2);
+    }
+
+    @Test
+    public void testNoGroupEpochBumpWhenStaticMemberLeaves() {

Review Comment:
   nit: Temporarily leave?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3328,6 +4202,87 @@ public void testSessionTimeoutExpiration() {
         context.assertNoRevocationTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testSessionTimeoutExpirationStaticMember() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Static member sends a temporary leave group request
+        result = context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()

Review Comment:
   nit: Indentation.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Member epoch of the response would be set to -2.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        // The departing static member will have it's epoch set to -2.
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+
+        // Member 2 rejoins the group with the same instance id.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Arrays.asList(
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(fooTopicId)
+                        .setPartitions(Arrays.asList(3, 4, 5)),
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(barTopicId)
+                        .setPartitions(Collections.singletonList(2))

Review Comment:
   nit: Indentation is incorrect. 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Member epoch of the response would be set to -2.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        // The departing static member will have it's epoch set to -2.
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+
+        // Member 2 rejoins the group with the same instance id.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Arrays.asList(
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(fooTopicId)
+                        .setPartitions(Arrays.asList(3, 4, 5)),
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(barTopicId)
+                        .setPartitions(Collections.singletonList(2))
+                ))),
+                rejoinResult.response()
+        );
+
+        ConsumerGroupMember expectedRejoinedMember = new 
ConsumerGroupMember.Builder(member2RejoinId)
+            .setMemberEpoch(10)
+            .setInstanceId(memberId2)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecordsAfterRejoin = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedRejoinedMember),
+            RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2))),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedRejoinedMember)
+        );
+
+        assertRecordsEquals(expectedRecordsAfterRejoin, 
rejoinResult.records());
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId2);
+        context.assertNoRevocationTimeout(groupId, memberId2);
+    }
+
+    @Test
+    public void testNoGroupEpochBumpWhenStaticMemberLeaves() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            // Use zar only here to ensure that metadata needs to be 
recomputed.
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // member epoch of the response would be set to -2
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2).setMemberEpoch(-2).build();
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+    }
+
+    @Test
+    public void testLeavingStaticMemberBumpsGroupEpoch() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+            result.response()
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void 
testShouldThrownUnreleasedInstanceIdExceptionWhenNewMemberJoinsWithInUseInstanceId()
 {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 joins the consumer group with an in-use instance id.
+        assertThrows(UnreleasedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrownUnknownMemberIdExceptionWhenUnknownStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 joins the consumer group with a non-zero epoch
+        assertThrows(UnknownMemberIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdJoins()
 {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-" + memberId1)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(11)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupMemberEpochValidationForStaticMember() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setInstanceId(memberId)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setTargetMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1, 2, 3)))
+            .build();
+
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, 
member));
+
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 100));
+
+        context.replay(RecordHelpers.newTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2, 3)
+        )));
+
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
100));
+
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, 
member));
+
+        // Member epoch is greater than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(200)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member epoch is smaller than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(50)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch but without providing partitions.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(99)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch and has a subset of the owned 
partitions. This
+        // is accepted as the response with the bumped epoch may have been 
lost. In this
+        // case, we provide back the correct epoch to the member.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setInstanceId(memberId)
+                .setMemberEpoch(99)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.singletonList(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Arrays.asList(1, 2)))));
+        assertEquals(100, result.response().memberEpoch());
+    }
+
+    @Test
+    public void 
testShouldThrowUnknownMemberIdExceptionWhenUnknownStaticMemberLeaves() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setInstanceId("unknown-" + memberId1)
+                .setMemberEpoch(-2)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdLeaves()
 {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-" + memberId1)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(-2)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrowInvalidRequestExceptionWhenInstanceIdIsNullForStaticMember() {

Review Comment:
   We already have `testConsumerHeartbeatRequestValidation` so I wonder if we 
could just add the new case there. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to