dajac commented on code in PR #20055: URL: https://github.com/apache/kafka/pull/20055#discussion_r2174845778
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java: ########## @@ -83,11 +125,35 @@ public ShareGroupMember build() { // when the member is updated. return new ShareGroupMember.Builder(member) .setState(MemberState.STABLE) - .setAssignedPartitions(targetAssignment.partitions()) + .setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) .updateMemberEpoch(targetAssignmentEpoch) .build(); + } else if (hasSubscriptionChanged) { + return new ShareGroupMember.Builder(member) + .setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) + .build(); + } else { + return member; } + } - return member; + private Map<Uuid, Set<Integer>> filterAssignedPartitions( + Map<Uuid, Set<Integer>> partitions, + Set<String> subscribedTopicNames + ) { + TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics()); Review Comment: Same question about the cache. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -20382,19 +20382,15 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List.of( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(List.of(0, 1, 2, 3, 4, 5)) - )) + .setTopicPartitions(List.of()) ), - result.response() + result1.response() ); ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) .setMemberEpoch(10) - .setPreviousMemberEpoch(0) + .setPreviousMemberEpoch(10) Review Comment: Could you elaborate? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3056,6 +3064,22 @@ private boolean hasMemberSubscriptionChanged( return false; } + /** + * Check whether the member has updated its subscribed topic regular expression. + * + * @param member The old member. + * @param updatedMember The new member. + * @return A boolean indicating whether the subscribed topic regular expression has changed. + */ + private boolean hasMemberRegularExpressionChanged( Review Comment: nit: static? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java: ########## @@ -317,4 +445,30 @@ private ConsumerGroupMember computeNextAssignment( .build(); } } + + /** + * Gets the set of topic IDs that the member is subscribed to. + * + * @return The set of topic IDs that the member is subscribed to. + */ + private Set<Uuid> subscribedTopicIds() { + Set<String> subscriptions = member.subscribedTopicNames(); + String subscribedTopicRegex = member.subscribedTopicRegex(); + if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) { + ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex); + if (resolvedRegularExpression != null) { + if (subscriptions.isEmpty()) { + subscriptions = resolvedRegularExpression.topics; + } else if (!resolvedRegularExpression.topics.isEmpty()) { + subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics); + } + } else { + // Treat an unresolved regex as matching no topics, to be conservative. + } + } + + TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics()); Review Comment: Do we need the cache in this case? It seems that we will check every topic id once so the cache does not bring any benefits, does it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java: ########## @@ -151,14 +210,18 @@ public ConsumerGroupMember build() { // If the member provides its owned partitions. We verify if it still // owns any of the revoked partitions. If it does, we cannot progress. if (ownsRevokedPartitions(member.partitionsPendingRevocation())) { - return member; + if (hasSubscriptionChanged) { + return updateCurrentAssignment(member.assignedPartitions()); + } else { + return member; + } } // When the member has revoked all the pending partitions, it can // transition to the next epoch (current + 1) and we can reconcile // its state towards the latest target assignment. return computeNextAssignment( - member.memberEpoch() + 1, + Math.min(member.memberEpoch() + 1, targetAssignmentEpoch), Review Comment: Why do we need this change? Is it to be more defensive? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2305,6 +2305,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> group::currentPartitionEpoch, targetAssignmentEpoch, targetAssignment, + group.resolvedRegularExpressions(), + // Force consistency with the subscription when the subscription has changed. + bumpGroupEpoch || hasMemberRegularExpressionChanged(member, updatedMember), Review Comment: I am not really happy with this. Could we somehow make hasMemberRegularExpressionChanged part of the checks that we already do in the beginning of this method? We may be able to refactor it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java: ########## @@ -138,6 +195,8 @@ public ConsumerGroupMember build() { member.memberEpoch(), member.assignedPartitions() ); + } else if (hasSubscriptionChanged) { + return updateCurrentAssignment(member.assignedPartitions()); Review Comment: nit: Should we update the top level comment of the states? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java: ########## @@ -83,11 +125,35 @@ public ShareGroupMember build() { // when the member is updated. return new ShareGroupMember.Builder(member) .setState(MemberState.STABLE) - .setAssignedPartitions(targetAssignment.partitions()) + .setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) Review Comment: Do we really need filterAssignedPartitions here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java: ########## @@ -215,6 +278,64 @@ private boolean ownsRevokedPartitions( return false; } + /** + * Updates the current assignment, removing any partitions that are not part of the subscribed topics. + * This method is a lot faster than running the full reconciliation logic in computeNextAssignment. + * + * @param memberAssignedPartitions The assigned partitions of the member to use. + * @return A new ConsumerGroupMember. + */ + private ConsumerGroupMember updateCurrentAssignment( + Map<Uuid, Set<Integer>> memberAssignedPartitions + ) { + Set<Uuid> subscribedTopicIds = subscribedTopicIds(); + + // Reuse the original map if no topics need to be removed. + Map<Uuid, Set<Integer>> newAssignedPartitions = memberAssignedPartitions; + Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation()); + for (Map.Entry<Uuid, Set<Integer>> entry : memberAssignedPartitions.entrySet()) { + if (!subscribedTopicIds.contains(entry.getKey())) { + if (newAssignedPartitions == memberAssignedPartitions) { + newAssignedPartitions = new HashMap<>(memberAssignedPartitions); + newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation()); + } + newAssignedPartitions.remove(entry.getKey()); + newPartitionsPendingRevocation.merge( + entry.getKey(), + entry.getValue(), + (existing, additional) -> { + existing = new HashSet<>(existing); + existing.addAll(additional); + return existing; + } + ); + } + } + + if (newAssignedPartitions == memberAssignedPartitions) { + // If no partitions were removed, we can return the member as is. + return member; + } + + if (ownsRevokedPartitions(newPartitionsPendingRevocation)) { Review Comment: I wonder if we really need this check here or if we could just transition to UNREVOKED_PARTITIONS if we revoked at least one partition above. I may be missing something though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org