squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2579928090
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
Review Comment:
The indentation's not quite right here.
```suggestion
.addTopic(topicId, topicName, 2)
.buildCoordinatorMetadataImage())
```
The same for the other tests.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
Review Comment:
The idea is that at step 5, there are supposed to be no epochs for any `foo`
partition so that we hit the `[GroupId {}] Cannot remove the epoch {} from {}
because it does not have any epoch` path. Let's either fix this test (and make
it clear in this comment).
The same for the streams version.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
Review Comment:
Could we call these `memberIdA` and `memberIdB`?
```suggestion
String memberIdA = "memberA";
String memberIdB = "memberB";
```
The same for the other tests.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash, barTopicName, barTopicHash))))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
2)))
Review Comment:
We missed `foo-0`. It's still owned by the other member.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A just been unassigned partition 0.
Review Comment:
typo: I think we're missing a word here
```suggestion
// Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify member A only has ownership of partition bar-0. Member B has
no partitions.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)),
group.members().get(memberA).assignedPartitions());
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A unsubscribes from topic bar at epoch 10: Member A {
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+ // 2. A new assignment is available at epoch 11 with member A
unsubscribing from topic foo.
+ // 3. Member A yields bar. The epoch is bumped to 11: Member A {
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+ // 4. Member A yields topic foo. Member A { epoch: 11, assigned
partitions: [], pending revocations: [] } [removed by compaction]
+ // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned
partitions: [foo], pending revocations: [] }
+ // When record 4 is dropped by compaction, we want member B's
assignment to be accepted with the same epoch.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+ .build()));
+
+ // Member A yields bar at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .build()));
+
+ // Member A yields foo. [record removed by compaction]
+ // Member B is assigned foo at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // Verify partition foo-0 is assigned to member B at epoch 11.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)),
group.members().get(memberB).assignedPartitions());
+ assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ String subtopology = "subtopology";
+ String topicName = "foo";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task 0.
+ // 2. Member A is unassigned task 0 [record removed by compaction].
+ // 3. Member B is assigned task 0.
+ // 4. Member A is assigned task 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned tasks as long as the
epoch is larger.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 11))))
+ .build()));
+
+ // Task 0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 12))))
+ .build()));
Review Comment:
I don't understand why we aren't throwing an exception here, because we're
missing the fix for `addTaskProcessIdFromActiveTasksWithEpochs`. Can you spot
what I'm missing here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify member A only has ownership of partition bar-0. Member B has
no partitions.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)),
group.members().get(memberA).assignedPartitions());
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A unsubscribes from topic bar at epoch 10: Member A {
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+ // 2. A new assignment is available at epoch 11 with member A
unsubscribing from topic foo.
+ // 3. Member A yields bar. The epoch is bumped to 11: Member A {
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+ // 4. Member A yields topic foo. Member A { epoch: 11, assigned
partitions: [], pending revocations: [] } [removed by compaction]
+ // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned
partitions: [foo], pending revocations: [] }
+ // When record 4 is dropped by compaction, we want member B's
assignment to be accepted with the same epoch.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+ .build()));
+
+ // Member A yields bar at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .build()));
+
+ // Member A yields foo. [record removed by compaction]
+ // Member B is assigned foo at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // Verify partition foo-0 is assigned to member B at epoch 11.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)),
group.members().get(memberB).assignedPartitions());
+ assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ String subtopology = "subtopology";
+ String topicName = "foo";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task 0.
+ // 2. Member A is unassigned task 0 [record removed by compaction].
+ // 3. Member B is assigned task 0.
+ // 4. Member A is assigned task 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned tasks as long as the
epoch is larger.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 11))))
+ .build()));
+
+ // Task 0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 12))))
+ .build()));
+
+ // Task 0 must remain with member B at epoch 12 even though member A
just been unassigned task 0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(1, 13))))
+ .build()));
+
+ // Check task 1 is assigned to member A and task 0 to member B.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+
assertEquals(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, Map.of(1,
13))), group.members().get(memberA).assignedTasks());
+
assertEquals(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, Map.of(0,
12))), group.members().get(memberB).assignedTasks());
+ }
+
+ @Test
+ public void
testReplayStreamsGroupUnassignmentRecordSkippedWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ String subtopologyFoo = "subtopologyFoo";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopologyBar = "subtopologyBar";
+ String barTopicName = "bar";
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task foo-1.
+ // 2. Member A is unassigned task foo-1 [record removed by compaction].
+ // 3. Member B is assigned task foo-1.
+ // 4. Member B is unassigned task foo-1.
+ // 5. Member A is assigned task bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo,
Map.of(0, 11, 1, 11))))
+ .build()));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberB)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo,
Map.of(0, 11))))
+ .build()));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .build()));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyBar,
Map.of(0, 13))))
+ .build()));
+
+ // Check task bar-0 is assigned to member A. Member B has no tasks.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+
assertEquals(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyBar, Map.of(0,
13))), group.members().get(memberA).assignedTasks());
+ }
Review Comment:
```suggestion
}
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify member A only has ownership of partition bar-0. Member B has
no partitions.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)),
group.members().get(memberA).assignedPartitions());
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A unsubscribes from topic bar at epoch 10: Member A {
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+ // 2. A new assignment is available at epoch 11 with member A
unsubscribing from topic foo.
+ // 3. Member A yields bar. The epoch is bumped to 11: Member A {
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+ // 4. Member A yields topic foo. Member A { epoch: 11, assigned
partitions: [], pending revocations: [] } [removed by compaction]
+ // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned
partitions: [foo], pending revocations: [] }
+ // When record 4 is dropped by compaction, we want member B's
assignment to be accepted with the same epoch.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+ .build()));
+
+ // Member A yields bar at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .build()));
+
+ // Member A yields foo. [record removed by compaction]
+ // Member B is assigned foo at epoch 11.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // Verify partition foo-0 is assigned to member B at epoch 11.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)),
group.members().get(memberB).assignedPartitions());
+ assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ String subtopology = "subtopology";
+ String topicName = "foo";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task 0.
+ // 2. Member A is unassigned task 0 [record removed by compaction].
+ // 3. Member B is assigned task 0.
+ // 4. Member A is assigned task 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned tasks as long as the
epoch is larger.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 11))))
+ .build()));
+
+ // Task 0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(0, 12))))
+ .build()));
+
+ // Task 0 must remain with member B at epoch 12 even though member A
just been unassigned task 0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberA)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology,
Map.of(1, 13))))
+ .build()));
+
+ // Check task 1 is assigned to member A and task 0 to member B.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+
assertEquals(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, Map.of(1,
13))), group.members().get(memberA).assignedTasks());
+
assertEquals(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, Map.of(0,
12))), group.members().get(memberB).assignedTasks());
Review Comment:
Can we assert on `group.currentActiveTaskProcessId` instead?
`member.assignedTasks` will be trivially correct.
```suggestion
assertEquals(memberB, group.currentActiveTaskProcessId(subtopology,
0));
assertEquals(memberA, group.currentActiveTaskProcessId(subtopology,
1));
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -286,23 +287,37 @@ public void
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
mkTopicAssignment(fooTopicId, 1)))
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ // m2 can acquire foo-1 because the epoch is at least as large as m1's
epoch.
+ consumerGroup.updateMember(m2);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)),
+ consumerGroup.getOrMaybeCreateMember("m1",
false).assignedPartitions()
+ );
Review Comment:
I can't follow the purpose of this assert? (It looks like
`getOrMaybeCreateMember` will always return the exact same
`ConsumerGroupMember` instance passed into `updateMember`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]