lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1857306123
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);
- reapExpiredApplicationEvents(currentTimeMs);
+ List<CompletableEvent<?>> completableEvents =
reapExpiredApplicationEvents(currentTimeMs);
+
+ // If there is a metadata error, complete the completable events which
are not passed by error event with the metadata error
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ completableEvents.stream()
+ .filter(event -> !(event instanceof
CompletableApplicationEvent && ((CompletableApplicationEvent<?>)
event).isPassedByErrorEvent()))
+ .forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
Review Comment:
if there were no events to propagate the error, this line will still clear
the error. Shouldn't we clear only if we were able to propagate it?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -150,6 +152,7 @@ private void maybePropagateMetadataError() {
try {
metadata.maybeThrowAnyException();
} catch (Exception e) {
+ metadataError = Optional.of(e);
backgroundEventHandler.add(new ErrorEvent(e));
Review Comment:
Seems to me you're really close to being able to remove this duplicated way
of propagating metadata errors (and simplify the `isPassedByErrorEvent`)
We now have a way to propagate metadata errors based on the triggering
event, to fill a gap we have to let calls like position know about them. Then
why exactly is it that we still need to propagate them via ErrorEvents still
too? Seems to be for the consumer.poll case only, but I would expect that (once
the gap from the comment above is addressed), the CheckAndUpdatePositions event
should fail with subscription metadata errors, no matter if called from
consumer.position or from consumer.poll. Makes sense?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);
- reapExpiredApplicationEvents(currentTimeMs);
+ List<CompletableEvent<?>> completableEvents =
reapExpiredApplicationEvents(currentTimeMs);
+
+ // If there is a metadata error, complete the completable events which
are not passed by error event with the metadata error
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ completableEvents.stream()
+ .filter(event -> !(event instanceof
CompletableApplicationEvent && ((CompletableApplicationEvent<?>)
event).isPassedByErrorEvent()))
+ .forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
Review Comment:
should we encapsulate this in a kind of `maybeFailOnMetadataError` that
would take the events?
Then I wonder if we need to do the same also before actually processing each
event?? I'm thinking that, events that do require subscription metadata (ex.
CheckAndUpdatePositions), may be processed, not generate any request and
complete right away, so they will never know about the metadata error right?
Right before:
https://github.com/apache/kafka/blob/57c4c386796028dc4144dfb50a2a786dac0a51a3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L181
Do we need to `maybeFailOnMetadataError(event)` to cover this gap?
Note that it's not a replacement for the maybeFailOnMetadataError of
awaiting events, I wonder if we need them both (to cover events that complete
right away and never make it to the "awaiting" list, and also the ones that are
left waiting).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1683,10 +1683,10 @@ private Fetch<K, V> collectFetch() {
* @throws NoOffsetForPartitionException If no offset is stored for a
given partition and no offset reset policy is
* defined
*/
- private boolean updateFetchPositions(final Timer timer) {
+ private boolean updateFetchPositions(final Timer timer, final boolean
isPassedByErrorEvent) {
Review Comment:
if the suggestion below works and we can get rid of the ErrorEvent for
metadata errors, I hope we could end up removing this param, and just having a
kind of requiresSubscriptionMetadata defined in events, and the
CheckAndUpdatePositions event would have that true. So the network client would
know that it needs to completeExceptionally the future for that event (and not
for others that are not related to subscription metadata)....just an idea that
depends on whether we can simplify the metadata error event
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]