m1a2st commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1835289473
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1574,7 +1583,11 @@ private boolean updateFetchPositions(final Timer timer) {
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
- cachedSubscriptionHasAllFetchPositions =
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+ applicationEventHandler.add(checkAndUpdatePositionsEvent);
+ cachedSubscriptionHasAllFetchPositions = processBackgroundEvents(
Review Comment:
Hello @lianetm
Let me first explain my understanding of the current issue and the approach
I’m using.
When `consumer#position()` is called and reaches the line
`applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);`, it gets
blocked because `addAndGet` requires a response to proceed. However, in the
current setup, the background thread converts errors into Error Events. This
means the application thread won’t receive any errors if it hasn’t executed
`processBackgroundEvents`.
> we have a “direct” way of communicating the 2 threads
To address this, I’ve implemented a `CompletableFuture<Exception>
metadataException` within the `NetworkClientDelegate `class specifically for
storing errors that occur in the background thread. So in
`NetworkClientDelegate#maybePropagateMetadataError`, if no error is present, I
refresh the future. If there is an error, I call
`metadataException.completeExceptionally(e);`.
The reason for this design is that when
`ConsumerNetworkThread#processApplicationEvents` executes
`checkAndUpdatePositionsEvent`, `TopicAuthorizationException` doesn’t
immediately appear. Instead, it may take several iterations of `runOnce` before
this error shows up. Without the future approach, this error can’t be
propagated from the background thread to the OffsetsRequestManager.
> Catch metadata errors on ConsumerNetworkThread.runOnce and
completeExceptionally all the requests it has (requests returned by the
managers poll here
I also catch the metadata error on this place, I think we can catch it when
it also add to Error Event.
https://github.com/m1a2st/kafka/blob/51788cfa92881f29d95bf0ee197c6e5c8cd21ea6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L156
> Notify managers of the metadata error too (ie. onMetadataERror), in case
they have requests generated that did not make it to point 1, but still need to
fail (ie. Commit request manager may have
unsentOffsetCommitRequests/unsentOffsetFetchRequests. I don't think any other
keeps requests without sending them to the network client but let's double
check)
I also passing this error to the
`OffsetsRequestManager#updateFetchPositions` when the process really get the
event, It will passing the exception to the `OffsetsRequestManager` and let
application thread know it is fail on `addAndGet` method.
https://github.com/m1a2st/kafka/blob/51788cfa92881f29d95bf0ee197c6e5c8cd21ea6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L261
However, some tests still fail, likely due to behavioral differences from
the classic consumer. I'm not sure that I miss something about this problem.
Please let me which Im missing, Thanks for your review and suggestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]