This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 157409cc53c MINOR: Introduce TopicRegexResolver and Move Regex
Resolution Logic Out of GroupMetadataManager (#21103)
157409cc53c is described below
commit 157409cc53c2bd41438b0c1d895d631d35d49ef6
Author: Aneesh Garg <[email protected]>
AuthorDate: Fri Dec 12 17:43:53 2025 +0530
MINOR: Introduce TopicRegexResolver and Move Regex Resolution Logic Out of
GroupMetadataManager (#21103)
This patch introduces a dedicated `TopicRegexResolver` component
responsible for resolving regex-based topic subscriptions. The logic was
previously embedded within `GroupMetadataManager`, coupling group
metadata operations with topic pattern matching.
By extracting this logic into a dedicated resolver, we improve
modularity and make the topic resolution behavior easier to evolve and
test independently. `GroupMetadataManager` is updated to delegate regex
resolution to the new component, without altering existing semantics.
This is a non-functional refactor intended to improve code clarity and
maintainability.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 160 +++-----------------
.../group/modern/consumer/TopicRegexResolver.java | 166 +++++++++++++++++++++
.../modern/consumer/TopicRegexResolverTest.java | 146 ++++++++++++++++++
.../jmh/coordinator/RegexResolutionBenchmark.java | 10 +-
4 files changed, 338 insertions(+), 144 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 77b47dc00d5..5135516c28d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -77,7 +77,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
-import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
@@ -145,6 +144,7 @@ import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import
org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
import
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.consumer.TopicRegexResolver;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupStatePartitionMetadataInfo;
@@ -164,9 +164,7 @@ import
org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
-import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
-import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
@@ -178,9 +176,6 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
-import com.google.re2j.Pattern;
-import com.google.re2j.PatternSyntaxException;
-
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -204,7 +199,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
@@ -212,8 +206,6 @@ import static
org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
-import static org.apache.kafka.common.resource.PatternType.LITERAL;
-import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
@@ -514,9 +506,9 @@ public class GroupMetadataManager {
private final ShareGroupPartitionAssignor shareGroupAssignor;
/**
- * The authorizer to validate the regex subscription topics.
+ * The topic regex resolver to resolve the regex subscription topics.
*/
- private final Optional<Plugin<Authorizer>> authorizerPlugin;
+ private final TopicRegexResolver topicRegexResolver;
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
@@ -551,8 +543,8 @@ public class GroupMetadataManager {
this.shareGroupStatePartitionMetadata = new
TimelineHashMap<>(snapshotRegistry, 0);
this.groupConfigManager = groupConfigManager;
this.shareGroupAssignor = shareGroupAssignor;
- this.authorizerPlugin = authorizerPlugin;
this.streamsGroupAssignors =
streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name,
Function.identity()));
+ this.topicRegexResolver = new TopicRegexResolver(() ->
authorizerPlugin, this.time);
this.topicHashCache = new HashMap<>();
}
@@ -2364,7 +2356,7 @@ public class GroupMetadataManager {
updatedMember,
records
);
- UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
+ UpdateRegularExpressionStatus updateRegularExpressionStatus =
maybeUpdateRegularExpressions(
context,
group,
member,
@@ -2374,7 +2366,7 @@ public class GroupMetadataManager {
// The subscription has changed when either the subscribed topic names
or subscribed topic
// regex has changed.
- boolean hasSubscriptionChanged = subscribedTopicNamesChanged ||
updateRegularExpressionsResult.regexUpdated();
+ boolean hasSubscriptionChanged = subscribedTopicNamesChanged ||
updateRegularExpressionStatus.regexUpdated();
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
@@ -2388,7 +2380,7 @@ public class GroupMetadataManager {
// bumping the group epoch when the new subscribed topic regex has
not been resolved
// yet, since we will have to update the target assignment again
later.
subscribedTopicNamesChanged ||
- updateRegularExpressionsResult ==
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+ updateRegularExpressionStatus ==
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
@@ -2566,7 +2558,7 @@ public class GroupMetadataManager {
// Maybe create tombstone for the regex if the joining member replaces
a static member
// with regex subscription.
- UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
+ UpdateRegularExpressionStatus updateRegularExpressionStatus =
maybeUpdateRegularExpressions(
context,
group,
member,
@@ -2574,7 +2566,7 @@ public class GroupMetadataManager {
records
);
- boolean bumpGroupEpoch = hasMemberSubscriptionChanged ||
updateRegularExpressionsResult.regexUpdated();
+ boolean bumpGroupEpoch = hasMemberSubscriptionChanged ||
updateRegularExpressionStatus.regexUpdated();
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
@@ -3250,7 +3242,7 @@ public class GroupMetadataManager {
return value != null && !value.isEmpty();
}
- private enum UpdateRegularExpressionsResult {
+ private enum UpdateRegularExpressionStatus {
NO_CHANGE,
REGEX_UPDATED,
REGEX_UPDATED_AND_RESOLVED;
@@ -3273,7 +3265,7 @@ public class GroupMetadataManager {
* @param records The records accumulator.
* @return The result of the update.
*/
- private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
+ private UpdateRegularExpressionStatus maybeUpdateRegularExpressions(
AuthorizableRequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
@@ -3287,7 +3279,7 @@ public class GroupMetadataManager {
String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
boolean requireRefresh = false;
- UpdateRegularExpressionsResult updateRegularExpressionsResult =
UpdateRegularExpressionsResult.NO_CHANGE;
+ UpdateRegularExpressionStatus updateRegularExpressionStatus =
UpdateRegularExpressionStatus.NO_CHANGE;
// Check whether the member has changed its subscribed regex.
boolean subscribedTopicRegexChanged =
!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex);
@@ -3295,7 +3287,7 @@ public class GroupMetadataManager {
log.debug("[GroupId {}] Member {} updated its subscribed regex to:
{}.",
groupId, memberId, newSubscribedTopicRegex);
- updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED;
+ updateRegularExpressionStatus =
UpdateRegularExpressionStatus.REGEX_UPDATED;
if (isNotEmpty(oldSubscribedTopicRegex) &&
group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
// If the member was the last one subscribed to the regex, we
delete the
@@ -3316,11 +3308,11 @@ public class GroupMetadataManager {
// If the new regex is already resolved, we trigger a
rebalance
// by bumping the group epoch.
if
(group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
- updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+ updateRegularExpressionStatus =
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
}
}
} else if (isNotEmpty(oldSubscribedTopicRegex)) {
- updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+ updateRegularExpressionStatus =
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
}
}
@@ -3335,20 +3327,20 @@ public class GroupMetadataManager {
// 0. The group is subscribed to regular expressions. We also take the
one
// that the current may have just introduced.
if (!requireRefresh && group.subscribedRegularExpressions().isEmpty())
{
- return updateRegularExpressionsResult;
+ return updateRegularExpressionStatus;
}
// 1. There is no ongoing refresh for the group.
String key = group.groupId() + "-regex";
if (executor.isScheduled(key)) {
- return updateRegularExpressionsResult;
+ return updateRegularExpressionStatus;
}
// 2. The last refresh is older than 10s. If the group does not have
any regular
// expressions but the current member just brought a new one, we
should continue.
long lastRefreshTimeMs =
group.lastResolvedRegularExpressionRefreshTimeMs();
if (currentTimeMs <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
- return updateRegularExpressionsResult;
+ return updateRegularExpressionStatus;
}
// 3.1 The group has unresolved regular expressions.
@@ -3372,124 +3364,12 @@ public class GroupMetadataManager {
Set<String> regexes =
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
key,
- () -> refreshRegularExpressions(context, groupId, log, time,
metadataImage, authorizerPlugin, regexes),
+ () -> topicRegexResolver.resolveRegularExpressions(context,
groupId, log, metadataImage, regexes),
(result, exception) -> handleRegularExpressionsResult(groupId,
memberId, result, exception)
);
}
- return updateRegularExpressionsResult;
- }
-
- /**
- * Resolves the provided regular expressions. Note that this static method
is executed
- * as an asynchronous task in the executor. Hence, it should not access
any state from
- * the manager.
- *
- * @param context The request context.
- * @param groupId The group id.
- * @param log The log instance.
- * @param time The time instance.
- * @param image The metadata image to use for listing the topics.
- * @param authorizerPlugin The authorizer.
- * @param regexes The list of regular expressions that must be
resolved.
- * @return The list of resolved regular expressions.
- *
- * public for benchmarks.
- */
- public static Map<String, ResolvedRegularExpression>
refreshRegularExpressions(
- AuthorizableRequestContext context,
- String groupId,
- Logger log,
- Time time,
- CoordinatorMetadataImage image,
- Optional<Plugin<Authorizer>> authorizerPlugin,
- Set<String> regexes
- ) {
- long startTimeMs = time.milliseconds();
- log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId,
regexes);
-
- Map<String, Set<String>> resolvedRegexes = new
HashMap<>(regexes.size());
- List<Pattern> compiledRegexes = new ArrayList<>(regexes.size());
- for (String regex : regexes) {
- resolvedRegexes.put(regex, new HashSet<>());
- try {
- compiledRegexes.add(Pattern.compile(regex));
- } catch (PatternSyntaxException ex) {
- // This should not happen because the regular expressions are
validated
- // when received from the members. If for some reason, it would
- // happen, we log it and ignore it.
- log.error("[GroupId {}] Couldn't parse regular expression '{}'
due to `{}`. Ignoring it.",
- groupId, regex, ex.getDescription());
- }
- }
-
- for (String topicName : image.topicNames()) {
- for (Pattern regex : compiledRegexes) {
- if (regex.matcher(topicName).matches()) {
- resolvedRegexes.get(regex.pattern()).add(topicName);
- }
- }
- }
-
- filterTopicDescribeAuthorizedTopics(
- context,
- authorizerPlugin,
- resolvedRegexes
- );
-
- long version = image.version();
- Map<String, ResolvedRegularExpression> result = new
HashMap<>(resolvedRegexes.size());
- for (Map.Entry<String, Set<String>> resolvedRegex :
resolvedRegexes.entrySet()) {
- result.put(
- resolvedRegex.getKey(),
- new ResolvedRegularExpression(resolvedRegex.getValue(),
version, startTimeMs)
- );
- }
-
- log.info("[GroupId {}] Scanned {} topics to refresh regular
expressions {} in {}ms.",
- groupId, image.topicNames().size(), resolvedRegexes.keySet(),
- time.milliseconds() - startTimeMs);
-
- return result;
- }
-
- /**
- * This method filters the topics in the resolved regexes
- * that the member is authorized to describe.
- *
- * @param context The request context.
- * @param authorizerPlugin The authorizer.
- * @param resolvedRegexes The map of the regex pattern and its set of
matched topics.
- */
- private static void filterTopicDescribeAuthorizedTopics(
- AuthorizableRequestContext context,
- Optional<Plugin<Authorizer>> authorizerPlugin,
- Map<String, Set<String>> resolvedRegexes
- ) {
- if (authorizerPlugin.isEmpty()) return;
-
- Map<String, Integer> topicNameCount = new HashMap<>();
- resolvedRegexes.values().forEach(topicNames ->
- topicNames.forEach(topicName ->
- topicNameCount.compute(topicName, Utils::incValue)
- )
- );
-
- List<Action> actions = topicNameCount.entrySet().stream().map(entry ->
{
- ResourcePattern resource = new ResourcePattern(TOPIC,
entry.getKey(), LITERAL);
- return new Action(DESCRIBE, resource, entry.getValue(), true,
false);
- }).collect(Collectors.toList());
-
- List<AuthorizationResult> authorizationResults =
authorizerPlugin.get().get().authorize(context, actions);
- Set<String> deniedTopics = new HashSet<>();
- IntStream.range(0, actions.size()).forEach(i -> {
- if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
- String deniedTopic = actions.get(i).resourcePattern().name();
- deniedTopics.add(deniedTopic);
- }
- });
-
- resolvedRegexes.forEach((__, topicNames) ->
topicNames.removeAll(deniedTopics));
+ return updateRegularExpressionStatus;
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
new file mode 100644
index 00000000000..b28f5b6c8b8
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolver.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.consumer;
+
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.group.Utils;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class TopicRegexResolver {
+
+ private final Supplier<Optional<Plugin<Authorizer>>>
authorizerPluginSupplier;
+ private final Time time;
+
+ public TopicRegexResolver(
+ Supplier<Optional<Plugin<Authorizer>>> authorizerPluginSupplier,
+ Time time
+ ) {
+ this.authorizerPluginSupplier = authorizerPluginSupplier;
+ this.time = time;
+ }
+
+ /**
+ * Resolves the provided regular expressions.
+ *
+ * @param context The request context.
+ * @param groupId The group id.
+ * @param log The logger to use.
+ * @param metadataImage The metadata image to use for the resolution.
+ * @param regexes The list of regular expressions that must be
resolved.
+ * @return The list of resolved regular expressions.
+ *
+ * public for benchmarks.
+ */
+ public Map<String, ResolvedRegularExpression> resolveRegularExpressions(
+ AuthorizableRequestContext context,
+ String groupId,
+ Logger log,
+ CoordinatorMetadataImage metadataImage,
+ Set<String> regexes
+ ) {
+ long startTimeMs = time.milliseconds();
+ log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId,
regexes);
+
+ Map<String, Set<String>> resolvedRegexes = new
HashMap<>(regexes.size());
+ List<Pattern> compiledRegexes = new ArrayList<>(regexes.size());
+ for (String regex : regexes) {
+ resolvedRegexes.put(regex, new HashSet<>());
+ try {
+ compiledRegexes.add(Pattern.compile(regex));
+ } catch (PatternSyntaxException ex) {
+ // This should not happen because the regular expressions are
validated
+ // when received from the members. If for some reason, it would
+ // happen, we log it and ignore it.
+ log.error("[GroupId {}] Couldn't parse regular expression '{}'
due to `{}`. Ignoring it.",
+ groupId, regex, ex.getDescription());
+ }
+ }
+
+ for (String topicName : metadataImage.topicNames()) {
+ for (Pattern regex : compiledRegexes) {
+ if (regex.matcher(topicName).matches()) {
+ resolvedRegexes.get(regex.pattern()).add(topicName);
+ }
+ }
+ }
+
+ filterTopicDescribeAuthorizedTopics(
+ context,
+ resolvedRegexes
+ );
+
+ long version = metadataImage.version();
+ Map<String, ResolvedRegularExpression> result = new
HashMap<>(resolvedRegexes.size());
+ for (Map.Entry<String, Set<String>> resolvedRegex :
resolvedRegexes.entrySet()) {
+ result.put(
+ resolvedRegex.getKey(),
+ new ResolvedRegularExpression(resolvedRegex.getValue(),
version, startTimeMs)
+ );
+ }
+
+ log.info("[GroupId {}] Scanned {} topics to refresh regular
expressions {} in {}ms.",
+ groupId, metadataImage.topicNames().size(),
resolvedRegexes.keySet(),
+ time.milliseconds() - startTimeMs);
+
+ return result;
+ }
+
+ /**
+ * This method filters the topics in the resolved regexes
+ * that the member is authorized to describe.
+ *
+ * @param context The request context.
+ * @param resolvedRegexes The map of the regex pattern and its set of
matched topics.
+ */
+ private void filterTopicDescribeAuthorizedTopics(
+ AuthorizableRequestContext context,
+ Map<String, Set<String>> resolvedRegexes
+ ) {
+ if (authorizerPluginSupplier.get().isEmpty()) return;
+
+ var authorizer = authorizerPluginSupplier.get().get().get();
+
+ Map<String, Integer> topicNameCount = new HashMap<>();
+ resolvedRegexes.values().forEach(topicNames ->
+ topicNames.forEach(topicName ->
+ topicNameCount.compute(topicName, Utils::incValue)
+ )
+ );
+
+ List<Action> actions = topicNameCount.entrySet().stream().map(entry ->
{
+ ResourcePattern resource = new ResourcePattern(TOPIC,
entry.getKey(), LITERAL);
+ return new Action(DESCRIBE, resource, entry.getValue(), true,
false);
+ }).collect(Collectors.toList());
+
+ List<AuthorizationResult> authorizationResults =
authorizer.authorize(context, actions);
+ Set<String> deniedTopics = new HashSet<>();
+ IntStream.range(0, actions.size()).forEach(i -> {
+ if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
+ String deniedTopic = actions.get(i).resourcePattern().name();
+ deniedTopics.add(deniedTopic);
+ }
+ });
+
+ resolvedRegexes.forEach((__, topicNames) ->
topicNames.removeAll(deniedTopics));
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
new file mode 100644
index 00000000000..0dcec5f4573
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/TopicRegexResolverTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TopicRegexResolverTest {
+
+ private final Logger log =
LoggerFactory.getLogger(TopicRegexResolverTest.class);
+
+ @Test
+ public void testBasicMatching() {
+ CoordinatorMetadataImage image = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 10)
+ .addTopic(Uuid.randomUuid(), "bar", 10)
+ .addTopic(Uuid.randomUuid(), "baz", 10)
+ .addTopic(Uuid.randomUuid(), "qux", 10)
+ .buildCoordinatorMetadataImage();
+
+ Time time = new MockTime(0L, 0L, 0L);
+
+ TopicRegexResolver resolver = new TopicRegexResolver(Optional::empty,
time);
+
+ var result = resolver.resolveRegularExpressions(
+ null,
+ "group-1",
+ log,
+ image,
+ Set.of("ba.*")
+ );
+
+ var resolved = result.get("ba.*");
+
+ assertEquals(Set.of("bar", "baz"), resolved.topics());
+ assertEquals(image.version(), resolved.version());
+ assertEquals(0L, resolved.timestamp());
+ }
+
+ @Test
+ public void testInvalidRegexIgnored() {
+ CoordinatorMetadataImage image = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 10)
+ .addTopic(Uuid.randomUuid(), "bar", 10)
+ .buildCoordinatorMetadataImage();
+
+ Time time = new MockTime(5L, 0L, 0L);
+
+ TopicRegexResolver resolver = new TopicRegexResolver(Optional::empty,
time);
+
+ var result = resolver.resolveRegularExpressions(
+ null,
+ "group-2",
+ log,
+ image,
+ Set.of("a.*")
+ );
+
+ var resolved = result.get("a.*");
+
+ assertTrue(resolved.topics().isEmpty());
+ assertEquals(image.version(), resolved.version());
+ assertEquals(5L, resolved.timestamp());
+ }
+
+ @Test
+ public void testAuthorizationFiltering() {
+ CoordinatorMetadataImage image = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "allow1", 10)
+ .addTopic(Uuid.randomUuid(), "deny1", 10)
+ .addTopic(Uuid.randomUuid(), "allow2", 10)
+ .buildCoordinatorMetadataImage();
+
+ Time time = new MockTime(10L, 0L, 0L);
+
+ Authorizer authorizer = mock(Authorizer.class);
+ when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+ List<Action> actions = invocation.getArgument(1);
+ var results = new ArrayList<>(actions.size());
+ for (Action action : actions) {
+ String topic = action.resourcePattern().name();
+ results.add("deny1".equals(topic) ? AuthorizationResult.DENIED
: AuthorizationResult.ALLOWED);
+ }
+ return results;
+ });
+
+ var plugin = Plugin.wrapInstance(authorizer, null,
"authorizer.class.name");
+
+ TopicRegexResolver resolver = new TopicRegexResolver(() ->
Optional.of(plugin), time);
+
+ var result = resolver.resolveRegularExpressions(
+ null,
+ "group-3",
+ log,
+ image,
+ Set.of("a.*", "d.*")
+ );
+
+ var resolved = result.get("a.*");
+
+ assertEquals(Set.of("allow1", "allow2"), resolved.topics());
+ assertEquals(image.version(), resolved.version());
+ assertEquals(10L, resolved.timestamp());
+
+ resolved = result.get("d.*");
+ assertTrue(resolved.topics().isEmpty());
+ assertEquals(image.version(), resolved.version());
+ assertEquals(10L, resolved.timestamp());
+ }
+}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index aa91e88e9a5..289835cd1cd 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
-import org.apache.kafka.coordinator.group.GroupMetadataManager;
+import org.apache.kafka.coordinator.group.modern.consumer.TopicRegexResolver;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -119,13 +119,15 @@ public class RegexResolutionBenchmark {
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void run() {
- GroupMetadataManager.refreshRegularExpressions(
+ TopicRegexResolver resolver = new TopicRegexResolver(
+ Optional::empty,
+ TIME
+ );
+ resolver.resolveRegularExpressions(
null,
GROUP_ID,
LOG,
- TIME,
new KRaftCoordinatorMetadataImage(image),
- Optional.empty(),
regexes
);
}