This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 157409cc53c MINOR: Introduce TopicRegexResolver and Move Regex 
Resolution Logic Out of GroupMetadataManager (#21103)
157409cc53c is described below

commit 157409cc53c2bd41438b0c1d895d631d35d49ef6
Author: Aneesh Garg <[email protected]>
AuthorDate: Fri Dec 12 17:43:53 2025 +0530

    MINOR: Introduce TopicRegexResolver and Move Regex Resolution Logic Out of 
GroupMetadataManager (#21103)
    
    This patch introduces a dedicated `TopicRegexResolver` component
    responsible for resolving regex-based topic subscriptions. The logic was
    previously embedded within `GroupMetadataManager`, coupling group
    metadata operations with topic pattern matching.
    
    By extracting this logic into a dedicated resolver, we improve
    modularity and make the topic resolution behavior easier to evolve and
    test independently. `GroupMetadataManager` is updated to delegate regex
    resolution to the new component, without altering existing semantics.
    
    This is a non-functional refactor intended to improve code clarity and
    maintainability.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 160 +++-----------------
 .../group/modern/consumer/TopicRegexResolver.java  | 166 +++++++++++++++++++++
 .../modern/consumer/TopicRegexResolverTest.java    | 146 ++++++++++++++++++
 .../jmh/coordinator/RegexResolutionBenchmark.java  |  10 +-
 4 files changed, 338 insertions(+), 144 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 77b47dc00d5..5135516c28d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -77,7 +77,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
-import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
@@ -145,6 +144,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.consumer.TopicRegexResolver;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupStatePartitionMetadataInfo;
@@ -164,9 +164,7 @@ import 
org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
 import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import 
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
-import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
-import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
@@ -178,9 +176,6 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineHashSet;
 
-import com.google.re2j.Pattern;
-import com.google.re2j.PatternSyntaxException;
-
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
@@ -204,7 +199,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
@@ -212,8 +206,6 @@ import static 
org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
-import static org.apache.kafka.common.resource.PatternType.LITERAL;
-import static org.apache.kafka.common.resource.ResourceType.TOPIC;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
 import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
@@ -514,9 +506,9 @@ public class GroupMetadataManager {
     private final ShareGroupPartitionAssignor shareGroupAssignor;
 
     /**
-     * The authorizer to validate the regex subscription topics.
+     * The topic regex resolver to resolve the regex subscription topics.
      */
-    private final Optional<Plugin<Authorizer>> authorizerPlugin;
+    private final TopicRegexResolver topicRegexResolver;
 
     private GroupMetadataManager(
         SnapshotRegistry snapshotRegistry,
@@ -551,8 +543,8 @@ public class GroupMetadataManager {
         this.shareGroupStatePartitionMetadata = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.groupConfigManager = groupConfigManager;
         this.shareGroupAssignor = shareGroupAssignor;
-        this.authorizerPlugin = authorizerPlugin;
         this.streamsGroupAssignors = 
streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, 
Function.identity()));
+        this.topicRegexResolver = new TopicRegexResolver(() -> 
authorizerPlugin, this.time);
         this.topicHashCache = new HashMap<>();
     }
 
@@ -2364,7 +2356,7 @@ public class GroupMetadataManager {
             updatedMember,
             records
         );
-        UpdateRegularExpressionsResult updateRegularExpressionsResult = 
maybeUpdateRegularExpressions(
+        UpdateRegularExpressionStatus updateRegularExpressionStatus = 
maybeUpdateRegularExpressions(
             context,
             group,
             member,
@@ -2374,7 +2366,7 @@ public class GroupMetadataManager {
 
         // The subscription has changed when either the subscribed topic names 
or subscribed topic
         // regex has changed.
-        boolean hasSubscriptionChanged = subscribedTopicNamesChanged || 
updateRegularExpressionsResult.regexUpdated();
+        boolean hasSubscriptionChanged = subscribedTopicNamesChanged || 
updateRegularExpressionStatus.regexUpdated();
         int groupEpoch = group.groupEpoch();
         SubscriptionType subscriptionType = group.subscriptionType();
 
@@ -2388,7 +2380,7 @@ public class GroupMetadataManager {
             // bumping the group epoch when the new subscribed topic regex has 
not been resolved
             // yet, since we will have to update the target assignment again 
later.
             subscribedTopicNamesChanged ||
-            updateRegularExpressionsResult == 
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+            updateRegularExpressionStatus == 
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
 
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
             // The subscription metadata is updated in two cases:
@@ -2566,7 +2558,7 @@ public class GroupMetadataManager {
 
         // Maybe create tombstone for the regex if the joining member replaces 
a static member
         // with regex subscription.
-        UpdateRegularExpressionsResult updateRegularExpressionsResult = 
maybeUpdateRegularExpressions(
+        UpdateRegularExpressionStatus updateRegularExpressionStatus = 
maybeUpdateRegularExpressions(
             context,
             group,
             member,
@@ -2574,7 +2566,7 @@ public class GroupMetadataManager {
             records
         );
 
-        boolean bumpGroupEpoch = hasMemberSubscriptionChanged || 
updateRegularExpressionsResult.regexUpdated();
+        boolean bumpGroupEpoch = hasMemberSubscriptionChanged || 
updateRegularExpressionStatus.regexUpdated();
 
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
             // The subscription metadata is updated in two cases:
@@ -3250,7 +3242,7 @@ public class GroupMetadataManager {
         return value != null && !value.isEmpty();
     }
 
-    private enum UpdateRegularExpressionsResult {
+    private enum UpdateRegularExpressionStatus {
         NO_CHANGE,
         REGEX_UPDATED,
         REGEX_UPDATED_AND_RESOLVED;
@@ -3273,7 +3265,7 @@ public class GroupMetadataManager {
      * @param records       The records accumulator.
      * @return The result of the update.
      */
-    private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
+    private UpdateRegularExpressionStatus maybeUpdateRegularExpressions(
         AuthorizableRequestContext context,
         ConsumerGroup group,
         ConsumerGroupMember member,
@@ -3287,7 +3279,7 @@ public class GroupMetadataManager {
         String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
 
         boolean requireRefresh = false;
-        UpdateRegularExpressionsResult updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.NO_CHANGE;
+        UpdateRegularExpressionStatus updateRegularExpressionStatus = 
UpdateRegularExpressionStatus.NO_CHANGE;
 
         // Check whether the member has changed its subscribed regex.
         boolean subscribedTopicRegexChanged = 
!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex);
@@ -3295,7 +3287,7 @@ public class GroupMetadataManager {
             log.debug("[GroupId {}] Member {} updated its subscribed regex to: 
{}.",
                 groupId, memberId, newSubscribedTopicRegex);
 
-            updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.REGEX_UPDATED;
+            updateRegularExpressionStatus = 
UpdateRegularExpressionStatus.REGEX_UPDATED;
 
             if (isNotEmpty(oldSubscribedTopicRegex) && 
group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
                 // If the member was the last one subscribed to the regex, we 
delete the
@@ -3316,11 +3308,11 @@ public class GroupMetadataManager {
                     // If the new regex is already resolved, we trigger a 
rebalance
                     // by bumping the group epoch.
                     if 
(group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
-                        updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+                        updateRegularExpressionStatus = 
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
                     }
                 }
             } else if (isNotEmpty(oldSubscribedTopicRegex)) {
-                updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+                updateRegularExpressionStatus = 
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
             }
         }
 
@@ -3335,20 +3327,20 @@ public class GroupMetadataManager {
         // 0. The group is subscribed to regular expressions. We also take the 
one
         //    that the current may have just introduced.
         if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) 
{
-            return updateRegularExpressionsResult;
+            return updateRegularExpressionStatus;
         }
 
         // 1. There is no ongoing refresh for the group.
         String key = group.groupId() + "-regex";
         if (executor.isScheduled(key)) {
-            return updateRegularExpressionsResult;
+            return updateRegularExpressionStatus;
         }
 
         // 2. The last refresh is older than 10s. If the group does not have 
any regular
         //    expressions but the current member just brought a new one, we 
should continue.
         long lastRefreshTimeMs = 
group.lastResolvedRegularExpressionRefreshTimeMs();
         if (currentTimeMs <= lastRefreshTimeMs + 
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
-            return updateRegularExpressionsResult;
+            return updateRegularExpressionStatus;
         }
 
         // 3.1 The group has unresolved regular expressions.
@@ -3372,124 +3364,12 @@ public class GroupMetadataManager {
             Set<String> regexes = 
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
             executor.schedule(
                 key,
-                () -> refreshRegularExpressions(context, groupId, log, time, 
metadataImage, authorizerPlugin, regexes),
+                () -> topicRegexResolver.resolveRegularExpressions(context, 
groupId, log, metadataImage, regexes),
                 (result, exception) -> handleRegularExpressionsResult(groupId, 
memberId, result, exception)
             );
         }
 
-        return updateRegularExpressionsResult;
-    }
-
-    /**
-     * Resolves the provided regular expressions. Note that this static method 
is executed
-     * as an asynchronous task in the executor. Hence, it should not access 
any state from
-     * the manager.
-     *
-     * @param context       The request context.
-     * @param groupId       The group id.
-     * @param log           The log instance.
-     * @param time          The time instance.
-     * @param image         The metadata image to use for listing the topics.
-     * @param authorizerPlugin    The authorizer.
-     * @param regexes       The list of regular expressions that must be 
resolved.
-     * @return The list of resolved regular expressions.
-     *
-     * public for benchmarks.
-     */
-    public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
-        AuthorizableRequestContext context,
-        String groupId,
-        Logger log,
-        Time time,
-        CoordinatorMetadataImage image,
-        Optional<Plugin<Authorizer>> authorizerPlugin,
-        Set<String> regexes
-    ) {
-        long startTimeMs = time.milliseconds();
-        log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId, 
regexes);
-
-        Map<String, Set<String>> resolvedRegexes = new 
HashMap<>(regexes.size());
-        List<Pattern> compiledRegexes = new ArrayList<>(regexes.size());
-        for (String regex : regexes) {
-            resolvedRegexes.put(regex, new HashSet<>());
-            try {
-                compiledRegexes.add(Pattern.compile(regex));
-            } catch (PatternSyntaxException ex) {
-                // This should not happen because the regular expressions are 
validated
-                // when received from the members. If for some reason, it would
-                // happen, we log it and ignore it.
-                log.error("[GroupId {}] Couldn't parse regular expression '{}' 
due to `{}`. Ignoring it.",
-                    groupId, regex, ex.getDescription());
-            }
-        }
-
-        for (String topicName : image.topicNames()) {
-            for (Pattern regex : compiledRegexes) {
-                if (regex.matcher(topicName).matches()) {
-                    resolvedRegexes.get(regex.pattern()).add(topicName);
-                }
-            }
-        }
-
-        filterTopicDescribeAuthorizedTopics(
-            context,
-            authorizerPlugin,
-            resolvedRegexes
-        );
-
-        long version = image.version();
-        Map<String, ResolvedRegularExpression> result = new 
HashMap<>(resolvedRegexes.size());
-        for (Map.Entry<String, Set<String>> resolvedRegex : 
resolvedRegexes.entrySet()) {
-            result.put(
-                resolvedRegex.getKey(),
-                new ResolvedRegularExpression(resolvedRegex.getValue(), 
version, startTimeMs)
-            );
-        }
-
-        log.info("[GroupId {}] Scanned {} topics to refresh regular 
expressions {} in {}ms.",
-            groupId, image.topicNames().size(), resolvedRegexes.keySet(),
-            time.milliseconds() - startTimeMs);
-
-        return result;
-    }
-
-    /**
-     * This method filters the topics in the resolved regexes
-     * that the member is authorized to describe.
-     *
-     * @param context           The request context.
-     * @param authorizerPlugin  The authorizer.
-     * @param resolvedRegexes   The map of the regex pattern and its set of 
matched topics.
-     */
-    private static void filterTopicDescribeAuthorizedTopics(
-        AuthorizableRequestContext context,
-        Optional<Plugin<Authorizer>> authorizerPlugin,
-        Map<String, Set<String>> resolvedRegexes
-    ) {
-        if (authorizerPlugin.isEmpty()) return;
-
-        Map<String, Integer> topicNameCount = new HashMap<>();
-        resolvedRegexes.values().forEach(topicNames ->
-            topicNames.forEach(topicName ->
-                topicNameCount.compute(topicName, Utils::incValue)
-            )
-        );
-
-        List<Action> actions = topicNameCount.entrySet().stream().map(entry -> 
{
-            ResourcePattern resource = new ResourcePattern(TOPIC, 
entry.getKey(), LITERAL);
-            return new Action(DESCRIBE, resource, entry.getValue(), true, 
false);
-        }).collect(Collectors.toList());
-
-        List<AuthorizationResult> authorizationResults = 
authorizerPlugin.get().get().authorize(context, actions);
-        Set<String> deniedTopics = new HashSet<>();
-        IntStream.range(0, actions.size()).forEach(i -> {
-            if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
-                String deniedTopic = actions.get(i).resourcePattern().name();
-                deniedTopics.add(deniedTopic);
-            }
-        });
-
-        resolvedRegexes.forEach((__, topicNames) -> 
topicNames.removeAll(deniedTopics));
+        return updateRegularExpressionStatus;
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
new file mode 100644
index 00000000000..b28f5b6c8b8
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.consumer;
+
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.group.Utils;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class TopicRegexResolver {
+
+    private final Supplier<Optional<Plugin<Authorizer>>> 
authorizerPluginSupplier;
+    private final Time time;
+
+    public TopicRegexResolver(
+        Supplier<Optional<Plugin<Authorizer>>> authorizerPluginSupplier,
+        Time time
+    ) {
+        this.authorizerPluginSupplier = authorizerPluginSupplier;
+        this.time = time;
+    }
+
+    /**
+     * Resolves the provided regular expressions.
+     *
+     * @param context           The request context.
+     * @param groupId           The group id.
+     * @param log               The logger to use.
+     * @param metadataImage     The metadata image to use for the resolution.
+     * @param regexes           The list of regular expressions that must be 
resolved.
+     * @return The list of resolved regular expressions.
+     *
+     * public for benchmarks.
+     */
+    public Map<String, ResolvedRegularExpression> resolveRegularExpressions(
+        AuthorizableRequestContext context,
+        String groupId,
+        Logger log,
+        CoordinatorMetadataImage metadataImage,
+        Set<String> regexes
+    ) {
+        long startTimeMs = time.milliseconds();
+        log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId, 
regexes);
+
+        Map<String, Set<String>> resolvedRegexes = new 
HashMap<>(regexes.size());
+        List<Pattern> compiledRegexes = new ArrayList<>(regexes.size());
+        for (String regex : regexes) {
+            resolvedRegexes.put(regex, new HashSet<>());
+            try {
+                compiledRegexes.add(Pattern.compile(regex));
+            } catch (PatternSyntaxException ex) {
+                // This should not happen because the regular expressions are 
validated
+                // when received from the members. If for some reason, it would
+                // happen, we log it and ignore it.
+                log.error("[GroupId {}] Couldn't parse regular expression '{}' 
due to `{}`. Ignoring it.",
+                        groupId, regex, ex.getDescription());
+            }
+        }
+
+        for (String topicName : metadataImage.topicNames()) {
+            for (Pattern regex : compiledRegexes) {
+                if (regex.matcher(topicName).matches()) {
+                    resolvedRegexes.get(regex.pattern()).add(topicName);
+                }
+            }
+        }
+
+        filterTopicDescribeAuthorizedTopics(
+            context,
+            resolvedRegexes
+        );
+
+        long version = metadataImage.version();
+        Map<String, ResolvedRegularExpression> result = new 
HashMap<>(resolvedRegexes.size());
+        for (Map.Entry<String, Set<String>> resolvedRegex : 
resolvedRegexes.entrySet()) {
+            result.put(
+                resolvedRegex.getKey(),
+                new ResolvedRegularExpression(resolvedRegex.getValue(), 
version, startTimeMs)
+            );
+        }
+
+        log.info("[GroupId {}] Scanned {} topics to refresh regular 
expressions {} in {}ms.",
+            groupId, metadataImage.topicNames().size(), 
resolvedRegexes.keySet(),
+            time.milliseconds() - startTimeMs);
+
+        return result;
+    }
+
+    /**
+     * This method filters the topics in the resolved regexes
+     * that the member is authorized to describe.
+     *
+     * @param context           The request context.
+     * @param resolvedRegexes   The map of the regex pattern and its set of 
matched topics.
+     */
+    private void filterTopicDescribeAuthorizedTopics(
+        AuthorizableRequestContext context,
+        Map<String, Set<String>> resolvedRegexes
+    ) {
+        if (authorizerPluginSupplier.get().isEmpty()) return;
+
+        var authorizer = authorizerPluginSupplier.get().get().get();
+
+        Map<String, Integer> topicNameCount = new HashMap<>();
+        resolvedRegexes.values().forEach(topicNames ->
+            topicNames.forEach(topicName ->
+                topicNameCount.compute(topicName, Utils::incValue)
+            )
+        );
+
+        List<Action> actions = topicNameCount.entrySet().stream().map(entry -> 
{
+            ResourcePattern resource = new ResourcePattern(TOPIC, 
entry.getKey(), LITERAL);
+            return new Action(DESCRIBE, resource, entry.getValue(), true, 
false);
+        }).collect(Collectors.toList());
+
+        List<AuthorizationResult> authorizationResults = 
authorizer.authorize(context, actions);
+        Set<String> deniedTopics = new HashSet<>();
+        IntStream.range(0, actions.size()).forEach(i -> {
+            if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
+                String deniedTopic = actions.get(i).resourcePattern().name();
+                deniedTopics.add(deniedTopic);
+            }
+        });
+
+        resolvedRegexes.forEach((__, topicNames) -> 
topicNames.removeAll(deniedTopics));
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
new file mode 100644
index 00000000000..0dcec5f4573
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TopicRegexResolverTest {
+
+    private final Logger log = 
LoggerFactory.getLogger(TopicRegexResolverTest.class);
+
+    @Test
+    public void testBasicMatching() {
+        CoordinatorMetadataImage image = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 10)
+            .addTopic(Uuid.randomUuid(), "bar", 10)
+            .addTopic(Uuid.randomUuid(), "baz", 10)
+            .addTopic(Uuid.randomUuid(), "qux", 10)
+            .buildCoordinatorMetadataImage();
+
+        Time time = new MockTime(0L, 0L, 0L);
+
+        TopicRegexResolver resolver = new TopicRegexResolver(Optional::empty, 
time);
+
+        var result = resolver.resolveRegularExpressions(
+            null,
+            "group-1",
+            log,
+            image,
+            Set.of("ba.*")
+        );
+
+        var resolved = result.get("ba.*");
+
+        assertEquals(Set.of("bar", "baz"), resolved.topics());
+        assertEquals(image.version(), resolved.version());
+        assertEquals(0L, resolved.timestamp());
+    }
+
+    @Test
+    public void testInvalidRegexIgnored() {
+        CoordinatorMetadataImage image = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 10)
+            .addTopic(Uuid.randomUuid(), "bar", 10)
+            .buildCoordinatorMetadataImage();
+
+        Time time = new MockTime(5L, 0L, 0L);
+
+        TopicRegexResolver resolver = new TopicRegexResolver(Optional::empty, 
time);
+
+        var result = resolver.resolveRegularExpressions(
+            null,
+            "group-2",
+            log,
+            image,
+            Set.of("a.*")
+        );
+
+        var resolved = result.get("a.*");
+
+        assertTrue(resolved.topics().isEmpty());
+        assertEquals(image.version(), resolved.version());
+        assertEquals(5L, resolved.timestamp());
+    }
+
+    @Test
+    public void testAuthorizationFiltering() {
+        CoordinatorMetadataImage image = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "allow1", 10)
+            .addTopic(Uuid.randomUuid(), "deny1", 10)
+            .addTopic(Uuid.randomUuid(), "allow2", 10)
+            .buildCoordinatorMetadataImage();
+
+        Time time = new MockTime(10L, 0L, 0L);
+
+        Authorizer authorizer = mock(Authorizer.class);
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1);
+            var results = new ArrayList<>(actions.size());
+            for (Action action : actions) {
+                String topic = action.resourcePattern().name();
+                results.add("deny1".equals(topic) ? AuthorizationResult.DENIED 
: AuthorizationResult.ALLOWED);
+            }
+            return results;
+        });
+
+        var plugin = Plugin.wrapInstance(authorizer, null, 
"authorizer.class.name");
+
+        TopicRegexResolver resolver = new TopicRegexResolver(() -> 
Optional.of(plugin), time);
+
+        var result = resolver.resolveRegularExpressions(
+            null,
+            "group-3",
+            log,
+            image,
+            Set.of("a.*", "d.*")
+        );
+
+        var resolved = result.get("a.*");
+
+        assertEquals(Set.of("allow1", "allow2"), resolved.topics());
+        assertEquals(image.version(), resolved.version());
+        assertEquals(10L, resolved.timestamp());
+
+        resolved = result.get("d.*");
+        assertTrue(resolved.topics().isEmpty());
+        assertEquals(image.version(), resolved.version());
+        assertEquals(10L, resolved.timestamp());
+    }
+}
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index aa91e88e9a5..289835cd1cd 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
-import org.apache.kafka.coordinator.group.GroupMetadataManager;
+import org.apache.kafka.coordinator.group.modern.consumer.TopicRegexResolver;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -119,13 +119,15 @@ public class RegexResolutionBenchmark {
     @Threads(1)
     @OutputTimeUnit(TimeUnit.MILLISECONDS)
     public void run() {
-        GroupMetadataManager.refreshRegularExpressions(
+        TopicRegexResolver resolver = new TopicRegexResolver(
+            Optional::empty,
+            TIME
+        );
+        resolver.resolveRegularExpressions(
             null,
             GROUP_ID,
             LOG,
-            TIME,
             new KRaftCoordinatorMetadataImage(image),
-            Optional.empty(),
             regexes
         );
     }

Reply via email to