squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2535873655
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -77,6 +79,8 @@
*/
public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerGroup.class);
Review Comment:
We have to use the `LogContext` from the `GroupMetadataManager`, otherwise
we won't prefix log lines with `[GroupCoordinator id=%d]`. When adding
`LogContext` to the constructor, it should go as the first parameter.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1047,7 +1051,7 @@ void removePartitionEpochs(
assignedPartitions.forEach(partitionId -> {
Integer prevValue =
partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
+ log.warn(
Review Comment:
When we hit this case we must not have removed the entry from the map. We
can replace the `remove` above with a `get` and `remove` if we don't hit this
check.
The same for streams
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24057,4 +24057,196 @@ private Map<Uuid,
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsRes
return responseTopics.stream()
.collect(Collectors.toMap(DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic::topicId,
Function.identity()));
}
+
+ @Test
+ public void testConsumerGroupResolvesOnCompaction() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberIdA = "memberA";
+ String memberIdB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockTime time = new MockTime(0, 0, 0);
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withTime(time)
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .build())
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash))))
+ .build();
+
+ // Member A is assigned partition bar-0,1 and member B is assigned
partition bar-2
+ assignor.prepareGroupAssignment(new GroupAssignment(
Review Comment:
We should be `replay`ing the records directly, as if we are loading the
group coordinator. The records can be created using
`GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord`.
The same for streams
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1034,7 +1038,7 @@ private void maybeRemovePartitionEpoch(
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected
one.
+ * @throws IllegalStateException if the epoch does not exist.
Review Comment:
Thinking about this again, I think we can also hit the IllegalStateException
case
```
Member A is assigned topic 1 partition X
[removed by compaction] Member A is unassigned topic 1 partition X
Member B is assigned topic 1 partition X
Member B is unassigned topic 1 partition X
Member A is assigned topic 2 partition Y
```
so we have to also turn that into log message and add another test.
The same for streams.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]