kirktrue commented on code in PR #19983: URL: https://github.com/apache/kafka/pull/19983#discussion_r2153244230
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ########## @@ -155,7 +155,10 @@ private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRe log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp); } else if (!subscriptions.isFetchable(tp)) { // this can happen when a partition is paused before fetched records are returned to the consumer's - // poll call or if the offset is being reset + // poll call or if the offset is being reset. + // It can also happen under the Consumer rebalance protocol, when the consumer changes its subscription. + // Until the consumer receives an updated assignment from the coordinator, it can hold assigned partitions + // that are not in the subscription anymore, so we make them not fetchable. Review Comment: Wasn't one of the cases that was discovered where the coordinator (accidentally) assigned partitions for topics that the consumer wasn't subscribed to? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ########## @@ -487,14 +487,27 @@ public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPart List<TopicPartition> result = new ArrayList<>(); assignment.forEach((topicPartition, topicPartitionState) -> { // Cheap check is first to avoid evaluating the predicate if possible - if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || topicPartitionState.isFetchable()) + if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || isFetchableAndSubscribed(topicPartition, topicPartitionState)) && isAvailable.test(topicPartition)) { result.add(topicPartition); } }); return result; } + /** + * Check if the partition is fetchable. + * If the consumer has explicitly subscribed to a list of topic names, + * this will also check that the partition is contained in the subscription. + */ + private synchronized boolean isFetchableAndSubscribed(TopicPartition topicPartition, TopicPartitionState topicPartitionState) { + if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && !subscription.contains(topicPartition.topic())) { + log.debug("Assigned partition {} is not in the subscription {} so will be considered not fetchable.", topicPartition, subscription); + return false; + } Review Comment: Did we explore the option of removing or flagging the errant partition? This debug line has the possibility of being output on every fetch loop, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org