dajac commented on code in PR #18046:
URL: https://github.com/apache/kafka/pull/18046#discussion_r1871021740
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -996,12 +1001,14 @@ public static ConsumerGroup fromClassicGroup(
if (Arrays.equals(classicGroupMember.assignment(),
EMPTY_ASSIGNMENT)) {
assignedPartitions = Collections.emptyMap();
} else {
- assignedPartitions = toTopicPartitionMap(
- ConsumerProtocol.deserializeConsumerProtocolAssignment(
- ByteBuffer.wrap(classicGroupMember.assignment())
- ),
- topicsImage
+ ConsumerProtocolAssignment assignment =
ConsumerProtocol.deserializeConsumerProtocolAssignment(
+ ByteBuffer.wrap(classicGroupMember.assignment())
);
+ if (assignment.userData() != null &&
+ assignment.userData().hasRemaining()) {
Review Comment:
nit: We could bring it back on the previous line. There is enough space.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1053,6 +1055,12 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup
classicGroup, List<Coordinator
throw new GroupIdNotFoundException("Cannot upgrade the classic
group " + classicGroup.groupId() +
" to consumer group because the embedded consumer protocol is
malformed.");
+ } catch (UnsupportedVersionException e) {
+ log.warn("Cannot upgrade the classic group " +
classicGroup.groupId() +
+ " to consumer group: " + e.getMessage() + ".", e);
+
+ throw new GroupIdNotFoundException("Cannot upgrade the classic
group " + classicGroup.groupId() +
+ " to consumer group because a custom assignor is in use.");
Review Comment:
nit: I wonder if we could extend the error message a bit more. We actually
support upgrading while using a custom assignor but only if it does not have
user data associated to the assignment. We could perhaps explain why they
should do in this case (e.g. move to a default assignor and then upgrade?)?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10077,6 +10082,113 @@ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1)
assertEquals(group,
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
}
+ /**
+ * Supplies the {@link Arguments} to {@link
#testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer, boolean)}.
+ */
+ private static Stream<Arguments>
testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() {
+ return Stream.of(
+ Arguments.of(null, true),
+ Arguments.of(ByteBuffer.allocate(0), true),
+ Arguments.of(ByteBuffer.allocate(1), false)
+ );
+ }
+
+ @ParameterizedTest
+
@MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource")
+ public void
testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData,
boolean expectUpgrade) {
+ String groupId = "group-id";
+ String memberId1 = "member-id-1";
+ String memberId2 = "member-id-2";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(barTopicId, 0)
+ ))
+ )));
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName),
+ null,
+ List.of(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(barTopicName, 0)
+ )
+ ))))
+ );
+
+ Map<String, byte[]> assignments = Map.of(
+ memberId1,
+ Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(List.of(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(barTopicName, 0)
+ ), userData)))
+ );
+
+ // Create a stable classic group with member 1.
+ ClassicGroup group = context.createClassicGroup(groupId);
+ group.setProtocolName(Optional.of("range"));
+ group.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ "client-id",
+ "client-host",
+ 10000,
+ 5000,
+ "consumer",
+ protocols,
+ assignments.get(memberId1)
+ )
+ );
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(COMPLETING_REBALANCE);
+ group.transitionTo(STABLE);
+
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
assignments, metadataImage.features().metadataVersion()));
+ context.commit();
+ group =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+
+ // A new member 2 with new protocol joins the classic group,
triggering the upgrade.
+ try {
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName,
barTopicName))
+ .setTopicPartitions(Collections.emptyList()));
+ assertTrue(expectUpgrade);
+ } catch (GroupIdNotFoundException ex) {
+ assertFalse(expectUpgrade);
+ assertEquals("Cannot upgrade the classic group group-id to
consumer group because a custom assignor is in use.", ex.getMessage());
Review Comment:
nit: It is usually better to use assertThrows. Would it be possible to use
it in this case?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1053,6 +1055,12 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup
classicGroup, List<Coordinator
throw new GroupIdNotFoundException("Cannot upgrade the classic
group " + classicGroup.groupId() +
" to consumer group because the embedded consumer protocol is
malformed.");
+ } catch (UnsupportedVersionException e) {
+ log.warn("Cannot upgrade the classic group " +
classicGroup.groupId() +
+ " to consumer group: " + e.getMessage() + ".", e);
+
+ throw new GroupIdNotFoundException("Cannot upgrade the classic
group " + classicGroup.groupId() +
+ " to consumer group because a custom assignor is in use.");
Review Comment:
Completely unrelated but I wonder if we always provide such good error
message when the upgrade is not possible. Do you mind checking?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]