lianetm commented on code in PR #18089:
URL: https://github.com/apache/kafka/pull/18089#discussion_r1878512850
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1175,21 +1175,8 @@ private CompletableFuture<Void> assignPartitions(
// Invoke user call back.
CompletableFuture<Void> result =
signalPartitionsAssigned(addedPartitions);
- result.whenComplete((__, exception) -> {
- if (exception == null) {
- // Enable newly added partitions to start fetching and
updating positions for them.
-
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
- } else {
- // Keeping newly added partitions as non-fetchable after the
callback failure.
- // They will be retried on the next reconciliation loop, until
it succeeds or the
- // broker removes them from the assignment.
- if (!addedPartitions.isEmpty()) {
- log.warn("Leaving newly assigned partitions {} marked as
non-fetchable and not " +
- "requiring initializing positions after
onPartitionsAssigned callback failed.",
- addedPartitions, exception);
- }
- }
- });
+ // Enable newly added partitions to start fetching and updating
positions for them.
+ result.whenComplete((__, exception) ->
subscriptions.enablePartitionsAwaitingCallback(addedPartitions));
Review Comment:
Hey Chia, interesting finding, I do believe we have a bug here but not sure
if it's the one we're describing:
1. Regarding
> the classic consumer continues to send fetch requests for those partitions
even if the listener throws an exception
I may be missing something but I expect that's not the case, because if
callbacks fail, the classic detects that when polling the coordinator here and
propagates the exception, so it never makes it to the fetching part of the
consumer.poll
https://github.com/apache/kafka/blob/f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L641
We intentionally introduced the awaitingCallback status for partitions to
achieve the same with the new consumer, and do not allow fetch if the
onAssigned callback hasn't completed successfully (users may be seeking on
those, for instance, so we really shouldn't fetch until its done)
2. The test you shared does reveal an inconsistency, but I would describe
the bug as : the new consumer does not recover to fetch after a failed
onAssigned callback. (This would explain why on your test the classic returns
records and the async does not, recovery). And for that bug, I wonder if the
gap is that once the callback fails the first time, the new consumer considers
the partition assigned (but awaitingCallback). So on the next reconciliation
attempt, where we expect that the callback should be triggered again, I think
it may not be triggered because we consider only "added" partitions (and this
one is not added in this second round)
https://github.com/apache/kafka/blob/f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L1177
Is that maybe the bug? Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]