dajac commented on code in PR #20055:
URL: https://github.com/apache/kafka/pull/20055#discussion_r2174845778


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java:
##########
@@ -83,11 +125,35 @@ public ShareGroupMember build() {
             // when the member is updated.
             return new ShareGroupMember.Builder(member)
                 .setState(MemberState.STABLE)
-                .setAssignedPartitions(targetAssignment.partitions())
+                
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), 
member.subscribedTopicNames()))
                 .updateMemberEpoch(targetAssignmentEpoch)
                 .build();
+        } else if (hasSubscriptionChanged) {
+            return new ShareGroupMember.Builder(member)
+                
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), 
member.subscribedTopicNames()))
+                .build();
+        } else {
+            return member;
         }
+    }
 
-        return member;
+    private Map<Uuid, Set<Integer>> filterAssignedPartitions(
+        Map<Uuid, Set<Integer>> partitions,
+        Set<String> subscribedTopicNames
+    ) {
+        TopicIds.TopicResolver topicResolver = new 
TopicIds.CachedTopicResolver(metadataImage.topics());

Review Comment:
   Same question about the cache.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -20382,19 +20382,15 @@ public void 
testConsumerGroupMemberJoinsWithUpdatedRegex() {
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setTopicPartitions(List.of(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
-                    ))
+                    .setTopicPartitions(List.of())
                 ),
-            result.response()
+            result1.response()
         );
 
         ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(memberId1)
             .setState(MemberState.STABLE)
             .setMemberEpoch(10)
-            .setPreviousMemberEpoch(0)
+            .setPreviousMemberEpoch(10)

Review Comment:
   Could you elaborate?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3056,6 +3064,22 @@ private boolean hasMemberSubscriptionChanged(
         return false;
     }
 
+    /**
+     * Check whether the member has updated its subscribed topic regular 
expression.
+     *
+     * @param member        The old member.
+     * @param updatedMember The new member.
+     * @return A boolean indicating whether the subscribed topic regular 
expression has changed.
+     */
+    private boolean hasMemberRegularExpressionChanged(

Review Comment:
   nit: static?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -317,4 +445,30 @@ private ConsumerGroupMember computeNextAssignment(
                 .build();
         }
     }
+
+    /**
+     * Gets the set of topic IDs that the member is subscribed to.
+     *
+     * @return The set of topic IDs that the member is subscribed to.
+     */
+    private Set<Uuid> subscribedTopicIds() {
+        Set<String> subscriptions = member.subscribedTopicNames();
+        String subscribedTopicRegex = member.subscribedTopicRegex();
+        if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
+            ResolvedRegularExpression resolvedRegularExpression = 
resolvedRegularExpressions.get(subscribedTopicRegex);
+            if (resolvedRegularExpression != null) {
+                if (subscriptions.isEmpty()) {
+                    subscriptions = resolvedRegularExpression.topics;
+                } else if (!resolvedRegularExpression.topics.isEmpty()) {
+                    subscriptions = new UnionSet<>(subscriptions, 
resolvedRegularExpression.topics);
+                }
+            } else {
+                // Treat an unresolved regex as matching no topics, to be 
conservative.
+            }
+        }
+
+        TopicIds.TopicResolver topicResolver = new 
TopicIds.CachedTopicResolver(metadataImage.topics());

Review Comment:
   Do we need the cache in this case? It seems that we will check every topic 
id once so the cache does not bring any benefits, does it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -151,14 +210,18 @@ public ConsumerGroupMember build() {
                 // If the member provides its owned partitions. We verify if 
it still
                 // owns any of the revoked partitions. If it does, we cannot 
progress.
                 if 
(ownsRevokedPartitions(member.partitionsPendingRevocation())) {
-                    return member;
+                    if (hasSubscriptionChanged) {
+                        return 
updateCurrentAssignment(member.assignedPartitions());
+                    } else {
+                        return member;
+                    }
                 }
 
                 // When the member has revoked all the pending partitions, it 
can
                 // transition to the next epoch (current + 1) and we can 
reconcile
                 // its state towards the latest target assignment.
                 return computeNextAssignment(
-                    member.memberEpoch() + 1,
+                    Math.min(member.memberEpoch() + 1, targetAssignmentEpoch),

Review Comment:
   Why do we need this change? Is it to be more defensive?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2305,6 +2305,9 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
             group::currentPartitionEpoch,
             targetAssignmentEpoch,
             targetAssignment,
+            group.resolvedRegularExpressions(),
+            // Force consistency with the subscription when the subscription 
has changed.
+            bumpGroupEpoch || hasMemberRegularExpressionChanged(member, 
updatedMember),

Review Comment:
   I am not really happy with this. Could we somehow make 
hasMemberRegularExpressionChanged part of the checks that we already do in the 
beginning of this method? We may be able to refactor it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -138,6 +195,8 @@ public ConsumerGroupMember build() {
                         member.memberEpoch(),
                         member.assignedPartitions()
                     );
+                } else if (hasSubscriptionChanged) {
+                    return 
updateCurrentAssignment(member.assignedPartitions());

Review Comment:
   nit: Should we update the top level comment of the states?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java:
##########
@@ -83,11 +125,35 @@ public ShareGroupMember build() {
             // when the member is updated.
             return new ShareGroupMember.Builder(member)
                 .setState(MemberState.STABLE)
-                .setAssignedPartitions(targetAssignment.partitions())
+                
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), 
member.subscribedTopicNames()))

Review Comment:
   Do we really need filterAssignedPartitions here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -215,6 +278,64 @@ private boolean ownsRevokedPartitions(
         return false;
     }
 
+    /**
+     * Updates the current assignment, removing any partitions that are not 
part of the subscribed topics.
+     * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
+     *
+     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @return A new ConsumerGroupMember.
+     */
+    private ConsumerGroupMember updateCurrentAssignment(
+        Map<Uuid, Set<Integer>> memberAssignedPartitions
+    ) {
+        Set<Uuid> subscribedTopicIds = subscribedTopicIds();
+
+        // Reuse the original map if no topics need to be removed.
+        Map<Uuid, Set<Integer>> newAssignedPartitions = 
memberAssignedPartitions;
+        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
memberAssignedPartitions.entrySet()) {
+            if (!subscribedTopicIds.contains(entry.getKey())) {
+                if (newAssignedPartitions == memberAssignedPartitions) {
+                    newAssignedPartitions = new 
HashMap<>(memberAssignedPartitions);
+                    newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
+                }
+                newAssignedPartitions.remove(entry.getKey());
+                newPartitionsPendingRevocation.merge(
+                    entry.getKey(),
+                    entry.getValue(),
+                    (existing, additional) -> {
+                        existing = new HashSet<>(existing);
+                        existing.addAll(additional);
+                        return existing;
+                    }
+                );
+            }
+        }
+
+        if (newAssignedPartitions == memberAssignedPartitions) {
+            // If no partitions were removed, we can return the member as is.
+            return member;
+        }
+
+        if (ownsRevokedPartitions(newPartitionsPendingRevocation)) {

Review Comment:
   I wonder if we really need this check here or if we could just transition to 
UNREVOKED_PARTITIONS if we revoked at least one partition above. I may be 
missing something though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to