Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567506931


##
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##
@@ -35,6 +35,20 @@
 { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
   "about": "The rebalance timeout" },
 { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-  "about": "The server assignor to use; or null if not used." }
+  "about": "The server assignor to use; or null if not used." },
+{ "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",
+  "about": "The classic member metadata which includes the supported 
protocols of the consumer if it uses the classic protocol. The metadata is null 
if the consumer uses the consumer protocol.",
+  "fields": [
+{ "name": "SupportedProtocols", "type": "[]ClassicProtocol", 
"versions": "0+", "taggedVersions": "0+", "tag": 0,

Review Comment:
   no I just put it here by mistake. Thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567506439


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+// The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+// in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+// If the consumer's real assigned partitions haven't been updated 
according to
+// classicGroupMember.assignment(), it will retry the request.
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get(;
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records   The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * Converts the list of TopicPartition to a map of topic id and partition 
set.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567372730


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+// The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+// in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+// If the consumer's real assigned partitions haven't been updated 
according to
+// classicGroupMember.assignment(), it will retry the request.
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment()));

Review Comment:
   nit: Should we move the closing parenthesis to new line to be consistent?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+// The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+// in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+// If the consumer's real assigned partitions haven't been updated 
according to
+// classicGroupMember.assignment(), it will retry the request.
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get(;

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1566918622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+classicGroup.groupId());
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+classicGroup.groupId());
+return false;
+}
+return true;
+}
+
+/**
+ * Creates a ConsumerGroup corresponding to the given classic group.
+ *
+ * @param classicGroup  The ClassicGroup to convert.
+ * @param records   The list of Records.
+ * @return  The created ConsumerGroup.
+ */
+ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List records) {
+// The upgrade is always triggered by a new member joining the classic 
group, which always results in
+// updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+// If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+// so that they will rejoin for the new rebalance.
+classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+classicGroup.createGroupTombstoneRecords(records);
+ConsumerGroup consumerGroup;
+try {
+consumerGroup = ConsumerGroup.fromClassicGroup(
+snapshotRegistry,
+metrics,
+classicGroup,
+metadataImage.topics(),
+log
+);
+} catch (SchemaException e) {
+log.warn("Cannot upgrade the classic group " + 
classicGroup.groupId() + ": fail to parse " +

Review Comment:
   nit: `log.warn("Cannot upgrade the classic group " + classicGroup.groupId() 
+ " to consumer group because the embedded consumer protocol is malformed: " + 
e.getMessage() + ".", e);`



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+classicGroup.groupId());
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+classicGroup.groupId());
+return false;
+}
+return true;
+}
+
+/**
+ * Creates a ConsumerGroup corresponding to the given classic group.
+ *
+ * @param classicGroup  The ClassicGroup to convert.
+ * @param records   The list of Records.
+ * @return  The created ConsumerGroup.

Review Comment:
   nit: There is an extra space after `return`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the 

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1566001316


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -1046,4 +1051,101 @@ public void testIsInStatesCaseInsensitive() {
 assertTrue(group.isInStates(Collections.singleton("stable"), 1));
 assertFalse(group.isInStates(Collections.singleton("empty"), 1));
 }
+
+@Test
+public void testClassicMembersSupportedProtocols() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+List rangeProtocol = 
new ArrayList<>();
+rangeProtocol.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+.setMetadata(new byte[0]));
+
+List 
roundRobinAndRangeProtocols = new ArrayList<>();
+roundRobinAndRangeProtocols.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("roundrobin")
+.setMetadata(new byte[0]));
+roundRobinAndRangeProtocols.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+.setMetadata(new byte[0]));
+
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member-1")
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(rangeProtocol))
+.build();
+consumerGroup.updateMember(member1);
+
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member-2")
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+assertEquals(1, 
consumerGroup.classicMembersSupportedProtocols().get("roundrobin"));
+
assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("range", "sticky";
+
assertFalse(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("sticky", "roundrobin";
+
+member2 = new ConsumerGroupMember.Builder(member2)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(rangeProtocol))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+
assertFalse(consumerGroup.classicMembersSupportedProtocols().containsKey("roundrobin"));
+
+member1 = new ConsumerGroupMember.Builder(member1)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member1);
+member2 = new ConsumerGroupMember.Builder(member2)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("roundrobin"));
+
assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("sticky", "roundrobin";
+}
+
+@Test
+public void testAllUseClassicProtocol() {

Review Comment:
   nit: testAllMembersUseClassicProtocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1565866968


##
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##
@@ -35,6 +35,20 @@
 { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
   "about": "The rebalance timeout" },
 { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-  "about": "The server assignor to use; or null if not used." }
+  "about": "The server assignor to use; or null if not used." },
+{ "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",

Review Comment:
   Let's make it a tagged field in order to not break the backward 
compatibility of the record. You can do it by adding `"taggedVersions": "0+", 
"tag": 0`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1090,177 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(
+classicGroupMember.metadata(classicGroup.protocolName().get()),
+log,
+"group upgrade"
+);
+Map> partitions = 
topicPartitionMapFromList(subscription.ownedPartitions(), topicsImage);
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateMember(newMember);
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param consumerGroup The consumer group to create.
+ * @param records   The list to which the new records are added.
+ */
+public static void createConsumerGroupRecords(
+ConsumerGroup consumerGroup,
+List records
+) {
+String groupId = consumerGroup.groupId;
+
+consumerGroup.members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+consumerGroup.members().forEach((consumerGroupMemberId, 
consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(groupId, 
consumerGroupMemberId, 
consumerGroup.targetAssignment(consumerGroupMemberId).partitions()))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+consumerGroup.members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, 
consumerGroupMember))
+);
+}
+
+/**
+ * Converts the list 

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * Map of protocol names to the number of members that use legacy protocol 
and support them.
+ */
+private final TimelineHashMap 
legacyProtocolMembersSupportedProtocols;

Review Comment:
   In `supportsClassicProtocols(String memberProtocolType, Set 
memberProtocols)`, we need it to check if at least one of the given protocols 
in the `JoinGroupRequest` can be supported if a consumer using the classic 
protocol joins the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * Map of protocol names to the number of members that use legacy protocol 
and support them.
+ */
+private final TimelineHashMap 
legacyProtocolMembersSupportedProtocols;

Review Comment:
   We will need it to check if at least one of the given protocols in the 
`JoinGroupRequest` can be supported if a consumer using the classic protocol 
joins the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560315985


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {

Review Comment:
   Yeah it makes sense. Actually I think we don't set classicGroup to dead at 
all. We can delete the else if



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {

Review Comment:
   Could this really happen? I would have thought that it would be 
automatically converted as Dead or equivalent to Empty.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -411,6 +432,20 @@ public int numMembers() {
 return members.size();
 }
 
+/**
+ * @return The number of members that use the legacy protocol.
+ */
+public int numLegacyProtocolMember() {
+return (int) members.values().stream().filter(member -> 
member.useLegacyProtocol()).count();

Review Comment:
   It may be better to maintain this count in the group state instead of having 
to go through all members. Is it possible?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {
+log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",

Review Comment:
   nit: `Cannot upgrade classic group {} to consumer group because the group 
does not use the consumer embedded protocol.`



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {
+log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",
+classicGroup.groupId(), 
classicGroup.protocolType().orElse(""));
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.debug("Online upgrade is invalid because the classic group {} 
size {} exceeds the consumer group maximum size {}.",

Review Comment:
   nit: Same idea.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-07 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1555022535


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {
+consumerGroup.setGroupEpoch(generationId);
+consumerGroup.setTargetAssignmentEpoch(generationId);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+// SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+
+members.forEach((memberId, member) -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));

Review Comment:
   >keep a reference to those in the member's state
   
   It was planned to be added in downgrade conversion but let me bring them 
into this pr because we will need it in implementing the apis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-07 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1555022109


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {

Review Comment:
   `Group epoch` and `Target assignment epoch` only apply to the group. Should 
we add the 5 records for each member?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-07 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1555021412


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1244,6 +1267,24 @@ public boolean completeSyncFuture(
 return false;
 }
 
+/**
+ * Complete all the awaiting sync future with the give error.
+ *
+ * @param error  the error to complete the future with.
+ */
+public void completeAllSyncFutures(
+Errors error
+) {
+members.forEach((__, member) -> completeSyncFuture(
+member,
+new SyncGroupResponseData()
+.setProtocolName(protocolName.orElse(null))
+.setProtocolType(protocolType.orElse(null))
+.setAssignment(member.assignment())

Review Comment:
   We did set them in `resetAndPropagateAssignmentWithError(group, 
Errors.REBALANCE_IN_PROGRESS);` but I don't think they are necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&

Review Comment:
   I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` 
class. Could we use it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {

Review Comment:
   Does it have to be public? Should we add some javadoc?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+classicGroup.size() <= consumerGroupMaxSize;
+}
+
+ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List records) {
+classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+createGroupTombstoneRecords(classicGroup, records);
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());

Review Comment:
   I was wondering whether it would make more sense the other way around and 
have something like `ConsumerGroup.fromClassicGroup()`. I guess that it 
does not really matter in the end.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+classicGroup.size() <= consumerGroupMaxSize;
+}

Review Comment:
   I wonder whether we should log something (with the reason) when the upgrade 
is disallowed. Have you considered it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {
+consumerGroup.setGroupEpoch(generationId);
+consumerGroup.setTargetAssignmentEpoch(generationId);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+// SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+
+members.forEach((memberId, member) -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));

Review Comment:
   * `deserializeAssignment` and `deserializeSubscription` could throw an 
`SchemaException` if not mistaken if the bytes are incorrect. We should handle 
those, I suppose.
   * We also discussed offline the need to 

[PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-04 Thread via GitHub


dongnuo123 opened a new pull request, #15662:
URL: https://github.com/apache/kafka/pull/15662

   Based on https://github.com/apache/kafka/pull/15411, the pr contains 
triggering of group conversion from a classic group to a consumer group.
   
   In consumerGroupHeartbeat, the online migration will be triggered if a 
joining request (the request member epoch is 0) to an non-empty classic group 
is received. The converting method will create a new consumer group according 
to the existing classic group. The new consumer group's reference will be used 
in the following member joining process but will not be persisted to the 
timeline data structure until the records are replayed.
   
   A special case to consider is when the replay of the conversion and joining 
records fail. A possible solution is discussed in 
https://github.com/apache/kafka/pull/15587.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-03 Thread via GitHub


dongnuo123 commented on code in PR #15593:
URL: https://github.com/apache/kafka/pull/15593#discussion_r1550242669


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {
+consumerGroup.setGroupEpoch(generationId);
+consumerGroup.setTargetAssignmentEpoch(generationId);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+// SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+
+members.forEach((memberId, member) -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(generationId)
+.setPreviousMemberEpoch(generationId)
+.setInstanceId(member.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+.build();
+consumerGroup.updateMember(newMember);
+
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
newMember));
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
newMember));
+records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
memberId, partitions));
+});

Review Comment:
   Need to schedule session timeouts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org