dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1404037483
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -691,6 +708,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); } + } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfEmptyString(request.memberId(), "MemberId can't be empty."); + throwIfNull(request.instanceId(), "InstanceId can't be null for Static Member. GroupId: " Review Comment: I would rather use `InstanceId can't be null.` here in order to be consistent with the other error messages. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -751,6 +772,53 @@ private void throwIfConsumerGroupIsFull( } } + /** + * Validates and throws an error when the validation fails for static member. + * @param groupId The group id + * @param instanceId The instance id + * @param member The existing static member in the group. + * @param memberEpoch The member epoch with which the static member sends heartbeat. + * @param memberId The member id with which the member joins now. + * + * @throws UnknownMemberIdException if member sends heartbeat with a non-zero epoch and no static member exists for + * the instance id. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException If member joins with non-zero epoch but there + * already exists a static member with a different memberId. + * @throws org.apache.kafka.common.errors.UnreleasedInstanceIdException A new member is trying to leave the group + * but the existing static member hasn't requested leaving the group. + + */ + private void throwIfStaticMemberValidationFails( Review Comment: I suppose that we could remove this one now. Could we? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -849,21 +922,53 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); - throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - - if (memberEpoch == 0) { - log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); + if (createIfNotExists) { + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + } + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + if (memberEpoch == 0) { + // A new static member joins or the existing static member rejoins. + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); + } else { + // Static member rejoins with a different member id so it should replace + // the previous instance iff the previous member had sent a Leave group. + throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); + // Replace the current member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); + } + } else { + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(memberId, instanceId, member); Review Comment: nit: I would put `member` as the first argument to be consistent with the other helpers. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); + } + } + + private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String instanceId) { + if (staticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); + } + } + + private void removeMemberAndCancelTimers( + List<Record> records, + String groupId, + String memberId + ) { + // Write tombstones for the departed static member. + removeMember(records, groupId, memberId); + // Cancel all the timers of the departed static member. + cancelTimers(groupId, memberId); + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (instanceId == null) { + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records = consumerGroupFenceMember(group, member); + } else { + ConsumerGroupMember member = group.staticMember(instanceId); + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(memberId, instanceId, member); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group", Review Comment: nit: `.` at the end. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ########## @@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws PartitionAssignorException { Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>(); // Prepare the member spec for all members. - members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( - member, - targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - subscriptionMetadata - ))); + members.forEach((memberId, member) -> { Review Comment: nit: Let's revert this change as it is not necessary. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); + } + } + + private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String instanceId) { + if (staticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); + } + } + + private void removeMemberAndCancelTimers( + List<Record> records, + String groupId, + String memberId + ) { + // Write tombstones for the departed static member. + removeMember(records, groupId, memberId); + // Cancel all the timers of the departed static member. + cancelTimers(groupId, memberId); + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (instanceId == null) { + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records = consumerGroupFenceMember(group, member); + } else { + ConsumerGroupMember member = group.staticMember(instanceId); + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(memberId, instanceId, member); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group", + group.groupId(), memberId, instanceId); + records.add(consumerGroupStaticMemberGroupLeave(group, member)); + } else { + log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group", Review Comment: nit: `.` at the end. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // Member epoch of the response would be set to -2. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + // The departing static member will have it's epoch set to -2. + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2) + .setMemberEpoch(-2) + .build(); + + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + + // Member 2 rejoins the group with the same instance id. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> rejoinResult = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setMemberId(member2RejoinId) + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(member2RejoinId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(3, 4, 5)), + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Collections.singletonList(2)) + ))), + rejoinResult.response() + ); + + ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId) + .setMemberEpoch(10) + .setInstanceId(memberId2) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) Review Comment: nit: Indentation. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) Review Comment: Let's replace `-2` with the relevant constant. There are other cases in this file. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -766,6 +834,11 @@ private void throwIfMemberEpochIsInvalid( int receivedMemberEpoch, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions ) { + // If a static member rejoins, it's previous epoch would be -2. In such a + // case, we don't need to fence the member. + if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && receivedMemberEpoch == 0) { + return; + } Review Comment: Is it still necessary with the last implementation? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", Review Comment: I wonder if we could follow the structure of the other log messages here: `[GroupId {}] Static Member {} with instance id {}....`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -849,21 +922,53 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); - throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - - if (memberEpoch == 0) { - log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); + if (createIfNotExists) { + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + } + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + if (memberEpoch == 0) { + // A new static member joins or the existing static member rejoins. + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); + } else { + // Static member rejoins with a different member id so it should replace + // the previous instance iff the previous member had sent a Leave group. Review Comment: nit: `Leave` -> `leave`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); + } + } + + private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String instanceId) { + if (staticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); + } + } + + private void removeMemberAndCancelTimers( + List<Record> records, + String groupId, + String memberId + ) { + // Write tombstones for the departed static member. + removeMember(records, groupId, memberId); + // Cancel all the timers of the departed static member. + cancelTimers(groupId, memberId); + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (instanceId == null) { + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records = consumerGroupFenceMember(group, member); + } else { + ConsumerGroupMember member = group.staticMember(instanceId); + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(memberId, instanceId, member); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group", + group.groupId(), memberId, instanceId); + records.add(consumerGroupStaticMemberGroupLeave(group, member)); Review Comment: nit: Could we also use `records = ` here? With this, we could remove `new ArrayList<>()` when `records` is declared, I think. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -849,21 +922,53 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); - throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - - if (memberEpoch == 0) { - log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); + if (createIfNotExists) { + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + } + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + if (memberEpoch == 0) { + // A new static member joins or the existing static member rejoins. + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); + } else { + // Static member rejoins with a different member id so it should replace + // the previous instance iff the previous member had sent a Leave group. + throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); + // Replace the current member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); Review Comment: Should we also log something in this case? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { Review Comment: * Could we move those helpers next to `throwIfMemberEpochIsInvalid`? Could we also add some javadoc to each of them? * I wonder if we could also find better names for the params because it is not clear whether `memberId` and `instanceId` are the ones of the existing member or the ones received in the request. We could perhaps use `receivedMemberId`, etc. What do you think? This also applies to the other helpers. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. Review Comment: Should we also log something here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); + } + } + + private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String instanceId) { + if (staticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); Review Comment: nit: `Instance id {} is unknown.`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ########## @@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws PartitionAssignorException { Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>(); // Prepare the member spec for all members. - members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( - member, - targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - subscriptionMetadata - ))); + members.forEach((memberId, member) -> { + memberSpecs.put(memberId, createAssignmentMemberSpec( + member, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + )); + }); // Update the member spec if updated or deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { if (updatedMemberOrNull == null) { memberSpecs.remove(memberId); } else { + ConsumerGroupMember member = members.get(memberId); + Assignment assignment; + // A new static member joins and needs to replace an existing departed one. + if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) { + assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY); Review Comment: I am not sure to follow this one. My understanding is that we populate `staticMembers` only when `addOrUpdateMember` is called. In the main flow, we basically call this only once with the new or updated member. Let's imagine that a new static member joins. We will add its static id with its member id to `staticMembers`. Therefore here, we basically get back its member id and end up with no assignment. Did I get this right? I think that this could work but we would need to pass the `staticMembers` mapping from the `ConsumerGroup` to the builder, like we pass the members. If we have this, we could use it here to find the previous member with the static id if the member is new and has a static id. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); Review Comment: nit: `Static member {} with instance id {} cannot join the group because the instance id is owned by member {}.`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); Review Comment: nit: `Static member {} with instance id {} was fenced by member {}.`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -691,6 +735,91 @@ public void testDeleteMember() { assertEquals(expectedAssignment, result.targetAssignment()); } + @Test + public void testStaticMemberReplace() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + + context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.updateMemberSubscription("member-1", Arrays.asList("foo", "bar", "zar"), Optional.of("member-1"), Optional.empty()); + context.updateMemberSubscription("member-2", Arrays.asList("foo", "bar", "zar"), Optional.of("member-2"), Optional.empty()); + context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty()); Review Comment: This should not be here. I think that you mix in two different things. `addGroupMember` is basically what is used to build what will be passed to `withMembers` and `withTargetAssignment` whereas `updateMemberSubscription` is for `addOrUpdateMember`. Therefore, the test does not reproduce how we use it. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); Review Comment: nit: Indentation seems off here. I think that it should be 4 spaces earlier. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -849,21 +922,53 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); - throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - - if (memberEpoch == 0) { - log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); + if (createIfNotExists) { + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + } + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + if (memberEpoch == 0) { + // A new static member joins or the existing static member rejoins. + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); + } else { + // Static member rejoins with a different member id so it should replace + // the previous instance iff the previous member had sent a Leave group. + throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); + // Replace the current member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); Review Comment: To close on this one, it is indeed correct to set the assigned partitions here. Without it, the reconciler checks if the partitions in the target assignment are still owned and they are effectively still owned until the previous member is removed. This only happens when the records are processed. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -691,6 +735,91 @@ public void testDeleteMember() { assertEquals(expectedAssignment, result.targetAssignment()); } + @Test + public void testStaticMemberReplace() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + + context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.updateMemberSubscription("member-1", Arrays.asList("foo", "bar", "zar"), Optional.of("member-1"), Optional.empty()); + context.updateMemberSubscription("member-2", Arrays.asList("foo", "bar", "zar"), Optional.of("member-2"), Optional.empty()); + context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty()); + + // Static member 3 leaves + context.removeMemberSubscription("member-3", "member-3"); + + // Another static member joins with the same instance id as the departed one + context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", "bar", "zar"), new HashMap<>()); Review Comment: This one is incorrect as well because the newly added member is not added via `withMembers`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1004,27 +1118,102 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { + if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + // The new member can't join. + throw Errors.UNRELEASED_INSTANCE_ID.exception("Member " + memberId + " trying to join group " + + groupId + " but the instance id " + instanceId + " is already in use by member " + member.memberId()); + } + } + + private void throwIfInstanceIdIsFenced(String memberId, String instanceId, ConsumerGroupMember member) { + if (!member.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, member.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception("Member " + memberId + " for static member with groupInstanceId " + instanceId + + " is fenced by existing memberId " + member.memberId()); + } + } + + private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String instanceId) { + if (staticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); + } + } + + private void removeMemberAndCancelTimers( + List<Record> records, + String groupId, + String memberId + ) { + // Write tombstones for the departed static member. + removeMember(records, groupId, memberId); + // Cancel all the timers of the departed static member. + cancelTimers(groupId, memberId); + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (instanceId == null) { + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records = consumerGroupFenceMember(group, member); + } else { + ConsumerGroupMember member = group.staticMember(instanceId); + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(memberId, instanceId, member); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group", + group.groupId(), memberId, instanceId); + records.add(consumerGroupStaticMemberGroupLeave(group, member)); + } else { + log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group", + group.groupId(), memberId, instanceId); + records = consumerGroupFenceMember(group, member); + } + } return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + .setMemberEpoch(memberEpoch)); + } + + /** + * Handles the case when a static member decides to leave the group. + * The member is not actually fenced from the group, and instead it's + * member epoch is updated to -2 to reflect that a member using the given + * instance id decided to leave the group and would be back within session + * timeout. + * + * @param group The group. + * @param member The static member in the group for the instance id. + * + * @return A ConsumerGroupCurrentMemberAssignment record signifying that the static member is leaving. + */ + private Record consumerGroupStaticMemberGroupLeave( + ConsumerGroup group, + ConsumerGroupMember member + ) { + // We will write a member epoch of -2 for this departing static member. + ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member) + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) Review Comment: I think that we could also `setPartitionsPendingRevocation` to empty because we know that the member has revoked all its partitions when it leaves. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // Member epoch of the response would be set to -2. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + // The departing static member will have it's epoch set to -2. + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2) + .setMemberEpoch(-2) + .build(); + + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + + // Member 2 rejoins the group with the same instance id. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> rejoinResult = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setMemberId(member2RejoinId) + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(member2RejoinId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(3, 4, 5)), + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Collections.singletonList(2)) + ))), + rejoinResult.response() + ); + + ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId) + .setMemberEpoch(10) + .setInstanceId(memberId2) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecordsAfterRejoin = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedRejoinedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedRejoinedMember) + ); + + assertRecordsEquals(expectedRecordsAfterRejoin, rejoinResult.records()); + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId2); + context.assertNoRevocationTimeout(groupId, memberId2); + } + + @Test + public void testNoGroupEpochBumpWhenStaticMemberLeaves() { Review Comment: nit: Temporarily leave? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -3328,6 +4202,87 @@ public void testSessionTimeoutExpiration() { context.assertNoRevocationTimeout(groupId, memberId); } + @Test + public void testSessionTimeoutExpirationStaticMember() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setInstanceId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(1, result.response().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Static member sends a temporary leave group request + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() Review Comment: nit: Indentation. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // Member epoch of the response would be set to -2. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + // The departing static member will have it's epoch set to -2. + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2) + .setMemberEpoch(-2) + .build(); + + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + + // Member 2 rejoins the group with the same instance id. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> rejoinResult = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setMemberId(member2RejoinId) + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(member2RejoinId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(3, 4, 5)), + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Collections.singletonList(2)) Review Comment: nit: Indentation is incorrect. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1919,6 +1920,879 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // Member epoch of the response would be set to -2. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + // The departing static member will have it's epoch set to -2. + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2) + .setMemberEpoch(-2) + .build(); + + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + + // Member 2 rejoins the group with the same instance id. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> rejoinResult = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setMemberId(member2RejoinId) + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(member2RejoinId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(3, 4, 5)), + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Collections.singletonList(2)) + ))), + rejoinResult.response() + ); + + ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId) + .setMemberEpoch(10) + .setInstanceId(memberId2) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecordsAfterRejoin = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedRejoinedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedRejoinedMember) + ); + + assertRecordsEquals(expectedRecordsAfterRejoin, rejoinResult.records()); + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId2); + context.assertNoRevocationTimeout(groupId, memberId2); + } + + @Test + public void testNoGroupEpochBumpWhenStaticMemberLeaves() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // member epoch of the response would be set to -2 + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2).setMemberEpoch(-2).build(); + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + } + + @Test + public void testLeavingStaticMemberBumpsGroupEpoch() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberId(memberId2) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH), + result.response() + ); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + // Subscription metadata is recomputed because zar is no longer there. + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testShouldThrownUnreleasedInstanceIdExceptionWhenNewMemberJoinsWithInUseInstanceId() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Member 2 joins the consumer group with an in-use instance id. + assertThrows(UnreleasedInstanceIdException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testShouldThrownUnknownMemberIdExceptionWhenUnknownStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Member 2 joins the consumer group with a non-zero epoch + assertThrows(UnknownMemberIdException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-" + memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(11) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testConsumerGroupMemberEpochValidationForStaticMember() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setInstanceId(memberId) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setTargetMemberEpoch(100) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2, 3))) + .build(); + + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, member)); + + context.replay(RecordHelpers.newGroupEpochRecord(groupId, 100)); + + context.replay(RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3) + ))); + + context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 100)); + + context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, member)); + + // Member epoch is greater than the expected epoch. + assertThrows(FencedMemberEpochException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setInstanceId(memberId) + .setMemberEpoch(200) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")))); + + // Member epoch is smaller than the expected epoch. + assertThrows(FencedMemberEpochException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setInstanceId(memberId) + .setMemberEpoch(50) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")))); + + // Member joins with previous epoch but without providing partitions. + assertThrows(FencedMemberEpochException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setInstanceId(memberId) + .setMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")))); + + // Member joins with previous epoch and has a subset of the owned partitions. This + // is accepted as the response with the bumped epoch may have been lost. In this + // case, we provide back the correct epoch to the member. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setInstanceId(memberId) + .setMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(1, 2))))); + assertEquals(100, result.response().memberEpoch()); + } + + @Test + public void testShouldThrowUnknownMemberIdExceptionWhenUnknownStaticMemberLeaves() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setInstanceId("unknown-" + memberId1) + .setMemberEpoch(-2) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdLeaves() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-" + memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(-2) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testShouldThrowInvalidRequestExceptionWhenInstanceIdIsNullForStaticMember() { Review Comment: We already have `testConsumerHeartbeatRequestValidation` so I wonder if we could just add the new case there. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org