lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1858732555
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);
- reapExpiredApplicationEvents(currentTimeMs);
+ List<CompletableEvent<?>> completableEvents =
reapExpiredApplicationEvents(currentTimeMs);
+
+ // If there is a metadata error, complete the completable events which
are not passed by error event with the metadata error
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ completableEvents.stream()
+ .filter(event -> !(event instanceof
CompletableApplicationEvent && ((CompletableApplicationEvent<?>)
event).isPassedByErrorEvent()))
+ .forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
Review Comment:
the trick is that events may complete right away on the call to process, so
they will never fail with the metadata error because we would call
completeExceptionally on a future that is already complete right? (I wonder if
this gap is the reason why you still need the `ErrorEvent` for metadata? if we
remove the error event propagation on ln 156, and fill this gap, would it work?
it would be nice because there would be one single consistent way of
propagating metadata errors: via the calling api event. Ex. we would get
TopicAuthException on poll and on positions in the same way, just because the
CheckAndUpdatePositions fails. Makes sense? just suggesting to try it out, I
could still be missing something but can see the gap in how we
completeExceptionally with metadta errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]