lianetm commented on code in PR #18089:
URL: https://github.com/apache/kafka/pull/18089#discussion_r1878512850


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1175,21 +1175,8 @@ private CompletableFuture<Void> assignPartitions(
 
         // Invoke user call back.
         CompletableFuture<Void> result = 
signalPartitionsAssigned(addedPartitions);
-        result.whenComplete((__, exception) -> {
-            if (exception == null) {
-                // Enable newly added partitions to start fetching and 
updating positions for them.
-                
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
-            } else {
-                // Keeping newly added partitions as non-fetchable after the 
callback failure.
-                // They will be retried on the next reconciliation loop, until 
it succeeds or the
-                // broker removes them from the assignment.
-                if (!addedPartitions.isEmpty()) {
-                    log.warn("Leaving newly assigned partitions {} marked as 
non-fetchable and not " +
-                            "requiring initializing positions after 
onPartitionsAssigned callback failed.",
-                        addedPartitions, exception);
-                }
-            }
-        });
+        // Enable newly added partitions to start fetching and updating 
positions for them.
+        result.whenComplete((__, exception) -> 
subscriptions.enablePartitionsAwaitingCallback(addedPartitions));

Review Comment:
   Hey Chia, interesting finding, I do believe we have a bug here but not sure 
if it's the one we're describing:
   1. Regarding 
   > the classic consumer continues to send fetch requests for those partitions 
even if the listener throws an exception
   
   I may be missing something but I expect that's not the case, because if 
callbacks fail, the classic detects that when polling the coordinator here and 
propagates the exception, so it never makes it to the fetching part of the 
consumer.poll
   
https://github.com/apache/kafka/blob/f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L641
   We intentionally introduced the awaitingCallback status for partitions to 
achieve the same with the new consumer, and do not allow fetch if the 
onAssigned callback hasn't completed successfully (users may be seeking on 
those, for instance, so we really shouldn't fetch until its done)
   
   2. The test you shared does reveal an inconsistency, but I would describe 
the bug as : the new consumer does not recover to fetch after a failed 
onAssigned callback. (This would explain why on your test the classic returns 
records and the async does not, recovery). And for that bug, I wonder if the 
gap is that once the callback fails the first time, the new consumer considers 
the partition assigned (but awaitingCallback). So on the next reconciliation 
attempt, where we expect that the callback should be triggered again, I think 
it may not be triggered because we consider only "added" partitions (and this 
one is not added in this second round)
   
https://github.com/apache/kafka/blob/f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L1177
  
   
   Is that maybe the bug? Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to