dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
+ classicGroup.groupId(), consumerGroupMigrationPolicy);
+ return false;
+ } else if (classicGroup.isInState(DEAD)) {
Review Comment:
Could this really happen? I would have thought that it would be
automatically converted as Dead or equivalent to Empty.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -411,6 +432,20 @@ public int numMembers() {
return members.size();
}
+ /**
+ * @return The number of members that use the legacy protocol.
+ */
+ public int numLegacyProtocolMember() {
+ return (int) members.values().stream().filter(member ->
member.useLegacyProtocol()).count();
Review Comment:
It may be better to maintain this count in the group state instead of having
to go through all members. Is it possible?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
+ classicGroup.groupId(), consumerGroupMigrationPolicy);
+ return false;
+ } else if (classicGroup.isInState(DEAD)) {
+ log.debug("Online upgrade is invalid because the classic group {}
is in DEAD state.", classicGroup.groupId());
+ return false;
+ } else if (!classicGroup.usesConsumerGroupProtocol()) {
+ log.debug("Online upgrade is invalid because the classic group {}
has protocol type {} and doesn't use the consumer group protocol.",
Review Comment:
nit: `Cannot upgrade classic group {} to consumer group because the group
does not use the consumer embedded protocol.`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
+ classicGroup.groupId(), consumerGroupMigrationPolicy);
+ return false;
+ } else if (classicGroup.isInState(DEAD)) {
+ log.debug("Online upgrade is invalid because the classic group {}
is in DEAD state.", classicGroup.groupId());
+ return false;
+ } else if (!classicGroup.usesConsumerGroupProtocol()) {
+ log.debug("Online upgrade is invalid because the classic group {}
has protocol type {} and doesn't use the consumer group protocol.",
+ classicGroup.groupId(),
classicGroup.protocolType().orElse(""));
+ return false;
+ } else if (classicGroup.size() > consumerGroupMaxSize) {
+ log.debug("Online upgrade is invalid because the classic group {}
size {} exceeds the consumer group maximum size {}.",
Review Comment:
nit: Same idea.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
Review Comment:
nit: `Cannot upgrade classic group {} to consumer group because the online
upgrade is disabled.`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
+ classicGroup.groupId(), consumerGroupMigrationPolicy);
+ return false;
+ } else if (classicGroup.isInState(DEAD)) {
+ log.debug("Online upgrade is invalid because the classic group {}
is in DEAD state.", classicGroup.groupId());
+ return false;
+ } else if (!classicGroup.usesConsumerGroupProtocol()) {
+ log.debug("Online upgrade is invalid because the classic group {}
has protocol type {} and doesn't use the consumer group protocol.",
+ classicGroup.groupId(),
classicGroup.protocolType().orElse(""));
+ return false;
+ } else if (classicGroup.size() > consumerGroupMaxSize) {
+ log.debug("Online upgrade is invalid because the classic group {}
size {} exceeds the consumer group maximum size {}.",
+ classicGroup.groupId(), classicGroup.size(),
consumerGroupMaxSize);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates a ConsumerGroup corresponding to the given classic group.
+ *
+ * @param classicGroup The ClassicGroup to convert.
+ * @param records The list of Records.
+ * @return The created ConsumerGroup.
+ */
+ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup,
List<Record> records) {
+ // The upgrade is always triggered by a new member joining the classic
group, which always results in
+ // updatedMember.subscribedTopicNames changing, the group epoch being
bumped, and triggering a new rebalance.
+ // If the ClassicGroup is rebalancing, inform the awaiting consumers
of another ongoing rebalance
+ // so that they will rejoin for the new rebalance.
+ classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+ classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+ classicGroup.createGroupTombstoneRecords(records);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
logContext, classicGroup.groupId(), metrics);
+ consumerGroup.fromClassicGroup(classicGroup, records,
metadataImage.topics());
Review Comment:
nit: I may be better to have a static method -
`ConsumerGroup.fromClassicGroup(....)` - which creates the `ConsumerGroup` and
populates it from the classic group.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
+ ClassicGroup classicGroup,
+ List<Record> records,
+ TopicsImage topicsImage
+ ) {
+ setGroupEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
classicGroup.generationId()));
+ // SubscriptionMetadata will be computed in the following
consumerGroupHeartbeat.
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(),
Collections.emptyMap()));
+
+ setTargetAssignmentEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
classicGroup.generationId()));
+
+ classicGroup.allMembers().forEach(member -> {
+ try {
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+ ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(member.memberId())
+ .setMemberEpoch(classicGroup.generationId())
+ .setPreviousMemberEpoch(classicGroup.generationId())
+ .setInstanceId(member.groupInstanceId().orElse(null))
+ .setRackId(subscription.rackId().orElse(null))
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setSubscribedTopicNames(subscription.topics())
+ .setAssignedPartitions(partitions)
+ .setSupportedProtocols(member.supportedProtocols())
+ .build();
+ updateMember(newMember);
+
+
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+ records.add(RecordHelpers.newTargetAssignmentRecord(groupId(),
member.memberId(), partitions));
+ } catch (SchemaException e) {
+ log.warn("Failed to parse Consumer Protocol " +
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+ classicGroup.protocolName().get() + " of group " + groupId
+ ".", e);
+ }
+ });
+ }
+
+ /**
+ * Converts the list of TopicPartition to a map of topic id and partition
set.
+ */
+ private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+ List<TopicPartition> partitions,
+ TopicsImage topicsImage
+ ) {
+ Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+ partitions.forEach(topicPartition -> {
+ TopicImage topicImage =
topicsImage.getTopic(topicPartition.topic());
+ if (topicImage != null) {
+ topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new
HashSet<>())
+ .add(topicPartition.partition());
+ }
+ });
+ return topicPartitionMap;
+ }
+
+ /**
+ * Checks whether at least one of the given protocols can be supported. A
+ * protocol can be supported if it is supported by all members that use the
+ * legacy protocol.
+ *
+ * @param memberProtocolType the member protocol type.
+ * @param memberProtocols the set of protocol names.
+ *
+ * @return a boolean based on the condition mentioned above.
+ */
+ public boolean supportsProtocols(String memberProtocolType, Set<String>
memberProtocols) {
Review Comment:
nit: The name is a bit confusing. Would `supportsClassicProtocols` be better?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
+ ClassicGroup classicGroup,
+ List<Record> records,
+ TopicsImage topicsImage
+ ) {
+ setGroupEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
classicGroup.generationId()));
+ // SubscriptionMetadata will be computed in the following
consumerGroupHeartbeat.
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(),
Collections.emptyMap()));
+
+ setTargetAssignmentEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
classicGroup.generationId()));
+
+ classicGroup.allMembers().forEach(member -> {
+ try {
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+ ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(member.memberId())
+ .setMemberEpoch(classicGroup.generationId())
+ .setPreviousMemberEpoch(classicGroup.generationId())
+ .setInstanceId(member.groupInstanceId().orElse(null))
+ .setRackId(subscription.rackId().orElse(null))
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setSubscribedTopicNames(subscription.topics())
+ .setAssignedPartitions(partitions)
+ .setSupportedProtocols(member.supportedProtocols())
+ .build();
+ updateMember(newMember);
+
+
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+ records.add(RecordHelpers.newTargetAssignmentRecord(groupId(),
member.memberId(), partitions));
+ } catch (SchemaException e) {
+ log.warn("Failed to parse Consumer Protocol " +
ConsumerProtocol.PROTOCOL_TYPE + ":" +
Review Comment:
nit: `Cannot upgrade the classic group to a consumer group because parsing
the Consumer Protocol fails....`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
+ ClassicGroup classicGroup,
+ List<Record> records,
+ TopicsImage topicsImage
+ ) {
+ setGroupEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
classicGroup.generationId()));
+ // SubscriptionMetadata will be computed in the following
consumerGroupHeartbeat.
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(),
Collections.emptyMap()));
+
+ setTargetAssignmentEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
classicGroup.generationId()));
+
+ classicGroup.allMembers().forEach(member -> {
+ try {
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+ ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(member.memberId())
+ .setMemberEpoch(classicGroup.generationId())
+ .setPreviousMemberEpoch(classicGroup.generationId())
+ .setInstanceId(member.groupInstanceId().orElse(null))
+ .setRackId(subscription.rackId().orElse(null))
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setSubscribedTopicNames(subscription.topics())
+ .setAssignedPartitions(partitions)
+ .setSupportedProtocols(member.supportedProtocols())
+ .build();
+ updateMember(newMember);
+
+
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+ records.add(RecordHelpers.newTargetAssignmentRecord(groupId(),
member.memberId(), partitions));
+ } catch (SchemaException e) {
+ log.warn("Failed to parse Consumer Protocol " +
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+ classicGroup.protocolName().get() + " of group " + groupId
+ ".", e);
+ }
+ });
+ }
+
+ /**
+ * Converts the list of TopicPartition to a map of topic id and partition
set.
+ */
+ private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+ List<TopicPartition> partitions,
+ TopicsImage topicsImage
+ ) {
+ Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+ partitions.forEach(topicPartition -> {
+ TopicImage topicImage =
topicsImage.getTopic(topicPartition.topic());
+ if (topicImage != null) {
+ topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new
HashSet<>())
Review Comment:
nit: Let's add `.computeIfAbsent` on a new line too.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
}
}
+ public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ return
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+ !classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null))
&&
+ classicGroup.size() <= consumerGroupMaxSize;
+ }
+
+ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup,
List<Record> records) {
+ classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+ classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+ createGroupTombstoneRecords(classicGroup, records);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
classicGroup.groupId(), metrics);
+ classicGroup.convertToConsumerGroup(consumerGroup, records,
metadataImage.topics());
+
+ consumerGroup.members().forEach((memberId, __) ->
+ scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(),
memberId)
Review Comment:
Understood. Let's add a comment about it too.
##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##########
@@ -35,6 +35,13 @@
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+",
"default": -1,
"about": "The rebalance timeout" },
{ "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+",
"type": "string",
- "about": "The server assignor to use; or null if not used." }
+ "about": "The server assignor to use; or null if not used." },
+ { "name": "SupportedProtocols", "type":
"[]ClassicJoinGroupRequestProtocol", "versions": "0+",
+ "about": "The list of protocols that the member supports if the consumer
uses the legacy protocol.", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+ "about": "The protocol name." },
+ { "name": "Metadata", "type": "bytes", "versions": "0+",
+ "about": "The protocol metadata." }
+ ]}
Review Comment:
I think that it may be better to introduce a struct field
`ClassicMemberMetadata` which then contain the `SupportedProtocols` field. I
would also use a nullable tagged field for `ClassicMemberMetadata` in order to
make it optional. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -311,6 +337,11 @@ public ConsumerGroupMember build() {
*/
private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+ /**
+ * The list of supported protocols if the consumer uses the legacy
protocol.
+ */
+ private final
ConsumerGroupMemberMetadataValue.ClassicJoinGroupRequestProtocolCollection
supportedProtocols;
Review Comment:
nit: `supportedClassicProtocols`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
Review Comment:
The order of the records are still incorrect. I let you go back to my
previous comment about it. We can also discuss it offline if needed.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
*/
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ /**
+ * Map of protocol names to the number of members that use legacy protocol
and support them.
+ */
+ private final TimelineHashMap<String, Integer>
legacyProtocolMembersSupportedProtocols;
Review Comment:
I am not sure to fully understand why we need to maintain this. Could you
elaborate a bit more?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -504,6 +544,10 @@ private static String lookupTopicNameById(
}
}
+ public boolean useLegacyProtocol() {
Review Comment:
nit: useClassicProtocol?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
+ ClassicGroup classicGroup,
+ List<Record> records,
+ TopicsImage topicsImage
+ ) {
+ setGroupEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
classicGroup.generationId()));
+ // SubscriptionMetadata will be computed in the following
consumerGroupHeartbeat.
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(),
Collections.emptyMap()));
+
+ setTargetAssignmentEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
classicGroup.generationId()));
+
+ classicGroup.allMembers().forEach(member -> {
Review Comment:
nit: `classicMember`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
}
}
+ /**
+ * Validates the online upgrade if the Classic Group receives a
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the
classic group.
+ */
+ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+ if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+ log.debug("Online upgrade is invalid because the consumer group {}
migration config is {} so online upgrade is not enabled.",
Review Comment:
I also wonder if we should log those as `info`. This seems quite useful in
production.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+
+ /**
+ * Set the attributes of the consumer group according to a classic group.
+ * Add the records for creating and updating the consumer group.
+ *
+ * @param classicGroup The converted classic group.
+ * @param records The list to which the new records are added.
+ */
+ public void fromClassicGroup(
+ ClassicGroup classicGroup,
+ List<Record> records,
+ TopicsImage topicsImage
+ ) {
+ setGroupEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
classicGroup.generationId()));
+ // SubscriptionMetadata will be computed in the following
consumerGroupHeartbeat.
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(),
Collections.emptyMap()));
+
+ setTargetAssignmentEpoch(classicGroup.generationId());
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
classicGroup.generationId()));
+
+ classicGroup.allMembers().forEach(member -> {
+ try {
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+ ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(member.memberId())
+ .setMemberEpoch(classicGroup.generationId())
+ .setPreviousMemberEpoch(classicGroup.generationId())
+ .setInstanceId(member.groupInstanceId().orElse(null))
+ .setRackId(subscription.rackId().orElse(null))
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setSubscribedTopicNames(subscription.topics())
+ .setAssignedPartitions(partitions)
+ .setSupportedProtocols(member.supportedProtocols())
+ .build();
+ updateMember(newMember);
+
+
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+ records.add(RecordHelpers.newTargetAssignmentRecord(groupId(),
member.memberId(), partitions));
+ } catch (SchemaException e) {
+ log.warn("Failed to parse Consumer Protocol " +
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+ classicGroup.protocolName().get() + " of group " + groupId
+ ".", e);
Review Comment:
Shouldn't we throw an exception to fail the request too? It could be a
`GroupIdNotFoundException` with an appropriate error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]