lianetm commented on code in PR #19983:
URL: https://github.com/apache/kafka/pull/19983#discussion_r2155243860


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -113,6 +116,54 @@ public void partitionAssignmentChangeOnTopicSubscription() 
{
         assertEquals(0, state.numAssignedPartitions());
     }
 
+    @Test
+    public void testIsFetchableOnManualAssignment() {
+        state.assignFromUser(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    @Test
+    public void testIsFetchableOnAutoAssignment() {
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    private void assertAssignedPartitionIsFetchable() {
+        assertEquals(2, state.assignedPartitions().size());
+        assertTrue(state.assignedPartitions().contains(tp0));
+        assertTrue(state.assignedPartitions().contains(tp1));
+
+        assertFalse(state.isFetchable(tp0), "Should not be fetchable without a 
valid position");
+        assertFalse(state.isFetchable(tp1), "Should not be fetchable without a 
valid position");
+
+        state.seek(tp0, 1);
+        state.seek(tp1, 1);
+
+        assertTrue(state.isFetchable(tp0));
+        assertTrue(state.isFetchable(tp1));
+    }
+
+    @Test
+    public void testIsFetchableConsidersExplicitTopicSubscription() {
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(singleton(t1p0));
+        state.seek(t1p0, 1);
+
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
+        assertTrue(state.isFetchable(t1p0));
+
+        // Change subscription. Assigned partitions should remain unchanged 
but not fetchable.
+        state.subscribe(singleton(topic), Optional.of(rebalanceListener));

Review Comment:
   Yeap, just moving to the new way of doing it (we've had several PRs doing so 
already btw, so just using it whenever I add new code).
   I find better readability with the Set.of/List.of..., and a diff is also 
that Set.of won't allow nulls 
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -155,7 +155,10 @@ private Fetch<K, V> fetchRecords(final CompletedFetch 
nextInLineFetch, int maxRe
             log.debug("Not returning fetched records for partition {} since it 
is no longer assigned", tp);
         } else if (!subscriptions.isFetchable(tp)) {
             // this can happen when a partition is paused before fetched 
records are returned to the consumer's
-            // poll call or if the offset is being reset
+            // poll call or if the offset is being reset.
+            // It can also happen under the Consumer rebalance protocol, when 
the consumer changes its subscription.
+            // Until the consumer receives an updated assignment from the 
coordinator, it can hold assigned partitions
+            // that are not in the subscription anymore, so we make them not 
fetchable.

Review Comment:
   not really at the moment (with the explicit subscription). You're right 
about that point but it came out when brainstorming about async assignment (ex. 
broker-side regex, intentionally left out of scope here). 
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -487,14 +487,27 @@ public synchronized List<TopicPartition> 
fetchablePartitions(Predicate<TopicPart
         List<TopicPartition> result = new ArrayList<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if 
possible
-            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| topicPartitionState.isFetchable())
+            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| isFetchableAndSubscribed(topicPartition, topicPartitionState))
                     && isAvailable.test(topicPartition)) {
                 result.add(topicPartition);
             }
         });
         return result;
     }
 
+    /**
+     * Check if the partition is fetchable.
+     * If the consumer has explicitly subscribed to a list of topic names,
+     * this will also check that the partition is contained in the 
subscription.
+     */
+    private synchronized boolean isFetchableAndSubscribed(TopicPartition 
topicPartition, TopicPartitionState topicPartitionState) {
+        if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && 
!subscription.contains(topicPartition.topic())) {
+            log.debug("Assigned partition {} is not in the subscription {} so 
will be considered not fetchable.", topicPartition, subscription);
+            return false;
+        }

Review Comment:
   > Did we explore the option of removing or flagging the errant partition?
   
   Yes, but seemed all more complext. 
   - "removing the partitions" would require the client pro-actively doing the 
full thing (triggering callbacks, updating assignments), so it would mean 
having the client driving assignments along with the coordinator. This is what 
we intentionally got rid of with the Consumer protocol, where clients let the 
coordinator drive assignments (so we ant to avoid bringing those complexities 
back into the client). 
   - "flagging the partitions" is kind of the goal with this approach really, 
but without an explicit flag (ex. not using "pause"), just because it would be 
much more complicated to maintain. Checking assignment vs subscriptions on 
fetch seems the simplest approach to dynamically "flag" the partition as 
non-fetchable. Makes sense?
   
   > There is the possibility of logging this on every fetch loop for a topic 
which has been unsubscribed and not yet removed from the assignment   
   
   yeap, totally. Changed to trace better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to