philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372112998


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1081,4 +1026,74 @@ boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         // logic
         return updateFetchPositions(timer);
     }
+
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribeInternal(topics, Optional.empty());
+    }
+
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener 
listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+
+        subscribeInternal(topics, Optional.of(listener));
+    }
+
+    @Override
+    public void subscribe(Pattern pattern) {
+        subscribeInternal(pattern, Optional.empty());
+    }
+
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+
+        subscribeInternal(pattern, Optional.of(listener));
+    }
+
+    private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().isEmpty())
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        subscriptions.subscribe(pattern, listener);
+        updatePatternSubscription(metadata.fetch());
+        metadata.requestUpdateForNewTopics();
+    }
+
+    private void subscribeInternal(Collection<String> topics, 
Optional<ConsumerRebalanceListener> listener) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe 
to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as 
unsubscribing
+            unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to 
subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+
+            // Clear the buffered data which are not a part of newly assigned 
topics
+            final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+            for (TopicPartition tp : subscriptions.assignedPartitions()) {
+                if (topics.contains(tp.topic()))

Review Comment:
   ah, seems like this is done for my comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to