cadonna commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1495579840


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
         }
     }
 
+    private void subscribeInternal(SubscriptionPattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (pattern == null || pattern.pattern().isEmpty())

Review Comment:
   I think we should move the check if the pattern is non-empty (and maybe some 
other validity checks) to class `SubscriptionPattern`. It should not be 
possible to create an instance of `SubscriptionPattern` with an empty pattern.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) {
         delegate.subscribe(pattern);
     }
 
+    /**
+     * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
+     * The pattern matching will be done periodically against all topics 
existing at the time of check.
+     * This can be controlled through the {@code metadata.max.age.ms} 
configuration: by lowering
+     * the max metadata age, the consumer will refresh metadata more often and 
check for matching topics.

Review Comment:
   I think this is not correct with the new pattern subscription since the 
assignment is done broker-side.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -84,6 +85,9 @@ private enum SubscriptionType {
     /* the pattern user has requested */
     private Pattern subscribedPattern;
 
+    /* RE2J compatible regex */
+    private SubscriptionPattern subscriptionPattern;

Review Comment:
   What about having a class that contains either a `SubscriptionPattern` or a 
`Pattern`?
   Something like :
   
   ```java
   public class JavaPatternOrSubscriptionPattern {
   
       private final Pattern javaPattern;
   
       private final SubscriptionPattern subscriptionPattern;
   
       private JavaPatternOrSubscriptionPattern(final Pattern pattern) {
           this.pattern = pattern;
       } 
   
       private JavaPatternOrSubscriptionPattern(final SubscriptionPattern 
pattern) {
           this.subscriptionPattern = pattern;
       } 
   
       public static JavaPatternOrSubscriptionPattern javaPattern(final Pattern 
pattern) {
           return new JavaPatternOrSubscriptionPattern(pattern);
       }
   
       public static JavaPatternOrSubscriptionPattern subscriptionPattern(final 
SubscriptionPattern pattern) {
           return new JavaPatternOrSubscriptionPattern(pattern);
       }
   
       public String pattern() {
           return subscriptionPattern != null ? subscriptionPattern.pattern() : 
javaPattern.pattern();
       }
   
       public String toString() {
          return "pattern = " + pattern();
       } 
   
      ...
   } 
   ```
   
   In such a way we would encapsulate the code that ensures that there is only 
of both set.  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) {
         delegate.subscribe(pattern);
     }
 
+    /**
+     * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
+     * The pattern matching will be done periodically against all topics 
existing at the time of check.
+     * This can be controlled through the {@code metadata.max.age.ms} 
configuration: by lowering
+     * the max metadata age, the consumer will refresh metadata more often and 
check for matching topics.
+     * <p>
+     * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for 
details on the
+     * use of the {@link ConsumerRebalanceListener}. Generally rebalances are 
triggered when there
+     * is a change to the topics matching the provided pattern and when 
consumer group membership changes.
+     * Group rebalances only take place during an active call to {@link 
#poll(Duration)}.

Review Comment:
   This should also not correct anymore with KP-848 which introduces this 
method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##########
@@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) {
         subscribeInternal(pattern, Optional.empty());
     }
 
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+        log.warn("Operation not supported in new consumer group protocol");
+    }
+
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {
+        log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   See above



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -86,7 +86,6 @@
 import static org.mockito.Mockito.when;
 
 public class OffsetsRequestManagerTest {
-

Review Comment:
   Could you please revert this change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+/**
+ * A class that hold a regular expression which compatible with Google's RE2J 
engine. Visit
+ * <a href="https://github.com/google/re2j";>this repository</a> for details on 
RE2J engine.

Review Comment:
   ```suggestion
    * Represents a regular expression used to subscribe to topics. The pattern
    * must be a Google RE2/J compatible pattern. Visit
    * 
    * @see <a href="https://github.com/google/re2j";>RE2/J regular expression 
engine</a>
   ```



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -35,6 +38,8 @@
       "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
     { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
       "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+    { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },

Review Comment:
   Didn't we say we want to commit this change in a separate PR? 
   https://github.com/apache/kafka/pull/15188#issuecomment-1918915086



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##########
@@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) {
         subscribeInternal(pattern, Optional.empty());
     }
 
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+        log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   I think that should be an `IllegalStateException` or 
`IllegalArgumentException`. Otherwise, if the method returns successfully users 
think they are subscribed but they are actually not.
   
   Also the message is not correct. If users configure the consumer to use the 
new consumer protocol the `AsyncKafkaConsumer` is instantiated so they would 
never call this method on the `LegacyKafkaConsumer` when they use the new 
consumer protocol. 
   
   Is this method not supported in the classic group protocol? Or do we plan to 
use the pattern also in the classic protocol consumer-side? \cc @dajac   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
         }
     }
 
+    private void subscribeInternal(SubscriptionPattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (pattern == null || pattern.pattern().isEmpty())
+                throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+            throwIfNoAssignorsConfigured();

Review Comment:
   Is this needed?
   Can the new pattern subscription also be used with a consumer-side assignor? 
\cc @dajac



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -582,6 +583,23 @@ public void testPartitionRevocationOnClose() {
         assertTrue(subscriptions.assignedPartitions().isEmpty());
         assertEquals(1, listener.revokedCount);
     }
+    @Test
+    public void testSubscribeUsingSubscriptionPattern() {
+        MockRebalanceListener listener = new MockRebalanceListener();
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.NONE);
+        consumer = newConsumer(
+                mock(FetchBuffer.class),
+                mock(ConsumerInterceptors.class),
+                mock(ConsumerRebalanceListenerInvoker.class),
+                subscriptions,

Review Comment:
   You should pass in a mock here and check if method `subscribe()` is called 
with the correct parameters on the mock.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -494,6 +501,14 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol 
groupProtocol) {
             () -> consumer.subscribe(Pattern.compile("")));
     }
 
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CONSUMER")

Review Comment:
   You also need tests for the classic group protocol.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -186,6 +187,19 @@ public void 
partitionAssignmentChangeOnPatternSubscription() {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
     }
+    @Test
+    public void testSubscriptionPatternInclusionInSubscriptionState() {
+        SubscriptionPattern pattern = new SubscriptionPattern("*t");
+
+        state.subscribe(pattern, Optional.of(rebalanceListener));
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
+        assertTrue(state.subscriptionPattern().equals(pattern));

Review Comment:
   You also need to verify that the java pattern is not set. In the tests that 
verify that the java pattern is set you need to verify the subscription pattern 
is not set.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to