AndrewJSchofield commented on code in PR #19983:
URL: https://github.com/apache/kafka/pull/19983#discussion_r2154483769


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -487,14 +487,27 @@ public synchronized List<TopicPartition> 
fetchablePartitions(Predicate<TopicPart
         List<TopicPartition> result = new ArrayList<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if 
possible
-            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| topicPartitionState.isFetchable())
+            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| isFetchableAndSubscribed(topicPartition, topicPartitionState))

Review Comment:
   We'll revisit this in https://issues.apache.org/jira/browse/KAFKA-19410.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -880,7 +893,7 @@ public synchronized boolean isPaused(TopicPartition tp) {
 
     synchronized boolean isFetchable(TopicPartition tp) {
         TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
-        return assignedOrNull != null && assignedOrNull.isFetchable();
+        return assignedOrNull != null && isFetchableAndSubscribed(tp, 
assignedOrNull);

Review Comment:
   nit: This is a bit odd. If we call `isFetchableAndSubscribed`, we know that 
`assignedOrNull` is specifically not null. I would just rename that variable to 
`tps` or such, so there's no implication of nullness.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -1071,4 +1122,33 @@ public void testRequestOffsetResetIfPartitionAssigned() {
 
         assertThrows(IllegalStateException.class, () -> 
state.isOffsetResetNeeded(unassignedPartition));
     }
+
+    // This test ensures the "fetchablePartitions" does not run the custom 
predicate if the partition is not fetchable
+    // This func is used in the hot path for fetching, to find fetchable 
partitions that are not in the buffer,
+    // so it should avoid evaluating the predicate if not needed.
+    @Test
+    public void testFetchablePartitionsPerformsCheapChecksFirst() {
+        // Setup fetchable partition and pause it
+        state.assignFromUser(singleton(tp0));

Review Comment:
   Another candidate for `Set.of(tp0)`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -113,6 +116,54 @@ public void partitionAssignmentChangeOnTopicSubscription() 
{
         assertEquals(0, state.numAssignedPartitions());
     }
 
+    @Test
+    public void testIsFetchableOnManualAssignment() {
+        state.assignFromUser(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    @Test
+    public void testIsFetchableOnAutoAssignment() {
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    private void assertAssignedPartitionIsFetchable() {
+        assertEquals(2, state.assignedPartitions().size());
+        assertTrue(state.assignedPartitions().contains(tp0));
+        assertTrue(state.assignedPartitions().contains(tp1));
+
+        assertFalse(state.isFetchable(tp0), "Should not be fetchable without a 
valid position");
+        assertFalse(state.isFetchable(tp1), "Should not be fetchable without a 
valid position");
+
+        state.seek(tp0, 1);
+        state.seek(tp1, 1);
+
+        assertTrue(state.isFetchable(tp0));
+        assertTrue(state.isFetchable(tp1));
+    }
+
+    @Test
+    public void testIsFetchableConsidersExplicitTopicSubscription() {
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(singleton(t1p0));
+        state.seek(t1p0, 1);
+
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
+        assertTrue(state.isFetchable(t1p0));
+
+        // Change subscription. Assigned partitions should remain unchanged 
but not fetchable.
+        state.subscribe(singleton(topic), Optional.of(rebalanceListener));

Review Comment:
   Not that I can see. Probably ought to be `Set.of` these days.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to