hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the 
consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT 
updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.

Review comment:
       I was considering whether it would be possible to do something a little 
simpler without caring as much about the case of aborted data. In the future we 
may have more instances of control records which would probably need similar 
logic. The case of an empty committed transaction mentioned in the other 
comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We 
want to ensure that we return from poll() whenever this happens. In the code, 
this happens here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744.
 Could we structure this logic so that `Fetch` is tracking all the positions 
which have advanced? Then the condition for returning would only consider 
positions which have advanced.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to