cadonna commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1472581740


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
     public void subscribe(Pattern pattern) {
         delegate.subscribe(pattern);
     }
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {}

Review Comment:
   This method needs javadocs describing what this method does. Please also add 
an empty line before the method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1724,6 +1738,21 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
         }
     }
 
+    private void subscribeInternal(SubscriptionPattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (pattern == null || pattern.pattern().isEmpty())
+                throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+            throwIfNoAssignorsConfigured();
+            log.info("Subscribed to pattern: '{}'", pattern);

Review Comment:
   This will probably not work since you did not add a `toString()` method to 
`SubscriptionPattern`. You should either add `toString()` to 
`SubscriptionPattern` or change this line to
   ```suggestion
               log.info("Subscribed to pattern: '{}'", pattern.pattern());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -72,6 +72,10 @@ public interface Consumer<K, V> extends Closeable {
     */
     void subscribe(Pattern pattern);
 
+    void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener 
callback);

Review Comment:
   This method needs a link to the javadocs in `KafkaConsumer`:
   ```suggestion
       /**
        * @see KafkaConsumer#subscribe(SubscriptionPattern, 
ConsumerRebalanceListener)
        */
       void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener 
callback);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
     public void subscribe(Pattern pattern) {
         delegate.subscribe(pattern);
     }
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {}
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {}

Review Comment:
   This method needs javadocs describing what this method does. Please also add 
an empty line before the method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -84,6 +85,9 @@ private enum SubscriptionType {
     /* the pattern user has requested */
     private Pattern subscribedPattern;
 
+    /* we should rename this to something more specific */
+    private SubscriptionPattern subscriptionPattern;

Review Comment:
   Could you please also reset this field in `unsubscribe()` and add it to the 
return value of `toString()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public class SubscriptionPattern {

Review Comment:
   > This class would need some doc explaining what it represents.
   
   I definitely agree with @lianetm on this.
   
   > Also not sure if this is the right place for it (given that this whole 
intention with the new regex is driven by the broker, but is not implemented 
yet). So at this point is not clear to me if we would prefer to define this on 
the broker side to be used there?
   
   The KIP says this class should be added to the public API of the consumer. I 
agree with @Phuc-Hong-Tran that this class is merely there to differentiate 
patterns that are Google RE2/J from `java.util.regex.Pattern` as stated in the 
KIP.  
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -72,6 +72,10 @@ public interface Consumer<K, V> extends Closeable {
     */
     void subscribe(Pattern pattern);
 
+    void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener 
callback);
+
+    void subscribe(SubscriptionPattern pattern);

Review Comment:
   This method needs a link to the javadocs in `KafkaConsumer`:
   ```suggestion
       /**
        * @see KafkaConsumer#subscribe(SubscriptionPattern)
        */
       void subscribe(SubscriptionPattern pattern);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to