AndrewJSchofield commented on code in PR #19983:
URL: https://github.com/apache/kafka/pull/19983#discussion_r2155122157


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -487,14 +487,27 @@ public synchronized List<TopicPartition> 
fetchablePartitions(Predicate<TopicPart
         List<TopicPartition> result = new ArrayList<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if 
possible
-            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| topicPartitionState.isFetchable())
+            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| isFetchableAndSubscribed(topicPartition, topicPartitionState))
                     && isAvailable.test(topicPartition)) {
                 result.add(topicPartition);
             }
         });
         return result;
     }
 
+    /**
+     * Check if the partition is fetchable.
+     * If the consumer has explicitly subscribed to a list of topic names,
+     * this will also check that the partition is contained in the 
subscription.
+     */
+    private synchronized boolean isFetchableAndSubscribed(TopicPartition 
topicPartition, TopicPartitionState topicPartitionState) {
+        if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && 
!subscription.contains(topicPartition.topic())) {
+            log.debug("Assigned partition {} is not in the subscription {} so 
will be considered not fetchable.", topicPartition, subscription);
+            return false;
+        }

Review Comment:
   There is the possibility of logging this on every fetch loop for a topic 
which has been unsubscribed and not yet removed from the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to