m1a2st commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1835289473


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1574,7 +1583,11 @@ private boolean updateFetchPositions(final Timer timer) {
         try {
             CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new 
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
             wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
-            cachedSubscriptionHasAllFetchPositions = 
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+            applicationEventHandler.add(checkAndUpdatePositionsEvent);
+            cachedSubscriptionHasAllFetchPositions = processBackgroundEvents(

Review Comment:
   Hello @lianetm 
   Let me first explain my understanding of the current issue and the approach 
I’m using.
   
   When `consumer#position()` is called and reaches the line 
`applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);`, it gets 
blocked because `addAndGet` requires a response to proceed. However, in the 
current setup, the background thread converts errors into Error Events. This 
means the application thread won’t receive any errors if it hasn’t executed 
`processBackgroundEvents`.
   
   >  we have a “direct” way of communicating the 2 threads
   
   To address this, I’ve implemented a `CompletableFuture<Exception> 
metadataException` within the `NetworkClientDelegate `class specifically for 
storing errors that occur in the background thread. So in 
`NetworkClientDelegate#maybePropagateMetadataError`, if no error is present, I 
refresh the future. If there is an error, I call 
`metadataException.completeExceptionally(e);`.
   
   The reason for this design is that when 
`ConsumerNetworkThread#processApplicationEvents` executes 
`checkAndUpdatePositionsEvent`, `TopicAuthorizationException` doesn’t 
immediately appear. Instead, it may take several iterations of `runOnce` before 
this error shows up. Without the future approach, this error can’t be 
propagated from the background thread to the OffsetsRequestManager. 
   
   > Catch metadata errors on ConsumerNetworkThread.runOnce and 
completeExceptionally all the requests it has (requests returned by the 
managers poll here
   
   I also catch the metadata error on this place, I think we can catch it when 
it also add to Error Event.
   
https://github.com/m1a2st/kafka/blob/51788cfa92881f29d95bf0ee197c6e5c8cd21ea6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L156
   
   > Notify managers of the metadata error too (ie. onMetadataERror), in case 
they have requests generated that did not make it to point 1, but still need to 
fail (ie. Commit request manager may have 
unsentOffsetCommitRequests/unsentOffsetFetchRequests. I don't think any other 
keeps requests without sending them to the network client but let's double 
check)
   
   I also passing this error to the 
`OffsetsRequestManager#updateFetchPositions` when the process really get the 
event, It will passing the exception to the `OffsetsRequestManager` and let 
application thread know it is fail on `addAndGet` method.
   
https://github.com/m1a2st/kafka/blob/51788cfa92881f29d95bf0ee197c6e5c8cd21ea6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L261
   
   However, some tests still fail, likely due to behavioral differences from 
the classic consumer. I'm not sure that I miss something about this problem. 
Please let me which Im missing,  Thanks for your review and suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to