cadonna commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1495579840
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen } } + private void subscribeInternal(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) { + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.pattern().isEmpty()) Review Comment: I think we should move the check if the pattern is non-empty (and maybe some other validity checks) to class `SubscriptionPattern`. It should not be possible to create an instance of `SubscriptionPattern` with an empty pattern. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. Review Comment: I think this is not correct with the new pattern subscription since the assignment is done broker-side. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ########## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; + /* RE2J compatible regex */ + private SubscriptionPattern subscriptionPattern; Review Comment: What about having a class that contains either a `SubscriptionPattern` or a `Pattern`? Something like : ```java public class JavaPatternOrSubscriptionPattern { private final Pattern javaPattern; private final SubscriptionPattern subscriptionPattern; private JavaPatternOrSubscriptionPattern(final Pattern pattern) { this.pattern = pattern; } private JavaPatternOrSubscriptionPattern(final SubscriptionPattern pattern) { this.subscriptionPattern = pattern; } public static JavaPatternOrSubscriptionPattern javaPattern(final Pattern pattern) { return new JavaPatternOrSubscriptionPattern(pattern); } public static JavaPatternOrSubscriptionPattern subscriptionPattern(final SubscriptionPattern pattern) { return new JavaPatternOrSubscriptionPattern(pattern); } public String pattern() { return subscriptionPattern != null ? subscriptionPattern.pattern() : javaPattern.pattern(); } public String toString() { return "pattern = " + pattern(); } ... } ``` In such a way we would encapsulate the code that ensures that there is only of both set. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. + * <p> + * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the + * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there + * is a change to the topics matching the provided pattern and when consumer group membership changes. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. Review Comment: This should also not correct anymore with KP-848 which introduces this method. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ########## @@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + log.warn("Operation not supported in new consumer group protocol"); + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + log.warn("Operation not supported in new consumer group protocol"); Review Comment: See above ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ########## @@ -86,7 +86,6 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { - Review Comment: Could you please revert this change? ########## clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +/** + * A class that hold a regular expression which compatible with Google's RE2J engine. Visit + * <a href="https://github.com/google/re2j">this repository</a> for details on RE2J engine. Review Comment: ```suggestion * Represents a regular expression used to subscribe to topics. The pattern * must be a Google RE2/J compatible pattern. Visit * * @see <a href="https://github.com/google/re2j">RE2/J regular expression engine</a> ``` ########## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ########## @@ -35,6 +38,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, Review Comment: Didn't we say we want to commit this change in a separate PR? https://github.com/apache/kafka/pull/15188#issuecomment-1918915086 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ########## @@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + log.warn("Operation not supported in new consumer group protocol"); Review Comment: I think that should be an `IllegalStateException` or `IllegalArgumentException`. Otherwise, if the method returns successfully users think they are subscribed but they are actually not. Also the message is not correct. If users configure the consumer to use the new consumer protocol the `AsyncKafkaConsumer` is instantiated so they would never call this method on the `LegacyKafkaConsumer` when they use the new consumer protocol. Is this method not supported in the classic group protocol? Or do we plan to use the pattern also in the classic protocol consumer-side? \cc @dajac ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen } } + private void subscribeInternal(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) { + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.pattern().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + throwIfNoAssignorsConfigured(); Review Comment: Is this needed? Can the new pattern subscription also be used with a consumer-side assignor? \cc @dajac ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -582,6 +583,23 @@ public void testPartitionRevocationOnClose() { assertTrue(subscriptions.assignedPartitions().isEmpty()); assertEquals(1, listener.revokedCount); } + @Test + public void testSubscribeUsingSubscriptionPattern() { + MockRebalanceListener listener = new MockRebalanceListener(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, Review Comment: You should pass in a mock here and check if method `subscribe()` is called with the correct parameters on the mock. ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -494,6 +501,14 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) { () -> consumer.subscribe(Pattern.compile(""))); } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CONSUMER") Review Comment: You also need tests for the classic group protocol. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java: ########## @@ -186,6 +187,19 @@ public void partitionAssignmentChangeOnPatternSubscription() { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); } + @Test + public void testSubscriptionPatternInclusionInSubscriptionState() { + SubscriptionPattern pattern = new SubscriptionPattern("*t"); + + state.subscribe(pattern, Optional.of(rebalanceListener)); + assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.numAssignedPartitions()); + assertTrue(state.subscriptionPattern().equals(pattern)); Review Comment: You also need to verify that the java pattern is not set. In the tests that verify that the java pattern is set you need to verify the subscription pattern is not set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org