AndrewJSchofield commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1459040148


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,22 @@ private CompletableFuture<Void> assignPartitions(
         // Make assignment effective on the client by updating the 
subscription state.
         updateSubscription(assignedPartitions, false);
 
+        // Pause partitions to ensure that fetch does not start until the 
callback completes.
+        assignedPartitions.forEach(tp -> 
subscriptions.pause(tp.topicPartition()));

Review Comment:
   I agree with this comment. There is a `KafkaConsumer.paused` method which 
would show that we had paused the partitions internally, and that seems 
incorrect in this case. They are logically not paused, but practically we are 
not fetching records until the callback has been completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to