kirktrue commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1868623843
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +328,17 @@ void cleanup() {
log.debug("Closed the consumer network thread");
}
}
+
+ /**
+ * If there is a metadata error, completed all uncompleted events with the
metadata error.
+ */
+ private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ if (!events.isEmpty()) {
+ events.forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
+ }
+ }
+ }
Review Comment:
Could we perform a pseudo `getAndClear()` with the metadata error?
```suggestion
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
if (events.isEmpty())
return;
Optional<Exception> error =
networkClientDelegate.getAndClearMetadataError();
error.ifPresent(e -> events.forEach(event ->
event.future().completeExceptionally(e));
}
}
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -165,9 +167,10 @@ private void processApplicationEvents() {
for (ApplicationEvent event : events) {
try {
- if (event instanceof CompletableEvent)
+ if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);
-
+ maybeFailOnMetadataError(List.of((CompletableEvent<?>)
event));
Review Comment:
Could you add a comment about why we're calling `maybeFailOnMetadataError()`
here as well as at the end of `runOnce()`? I'm assuming this handles an
important edge case, but I'm not sure I understand why from just looking at the
code.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -230,6 +227,14 @@ private ClientRequest makeClientRequest(
unsent.handler
);
}
+
+ public Optional<Exception> metadataError() {
+ return metadataError;
+ }
+
+ public void clearMetadataError() {
+ metadataError = Optional.empty();
+ }
Review Comment:
Then this can be a single operation:
```suggestion
public Optional<Exception> getAndClearMetadataError() {
Optional<Exception> e = metadataError;
metadataError = Optional.empty();
return e;
}
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -155,4 +156,13 @@ public int size() {
public boolean contains(CompletableEvent<?> event) {
return event != null && tracked.contains(event);
}
+
+ public List<CompletableEvent<?>> uncompletedEvents() {
+ return tracked.stream()
+ .filter(e -> e instanceof CompletableApplicationEvent<?>)
+ .map(e -> (CompletableApplicationEvent<?>) e)
+ .filter(e -> !e.future().isDone())
+ .collect(Collectors.toList());
+ }
Review Comment:
You don't have to filter by `CompletableApplicationEvent` is because
`tracked` holds `CompletableEvents` already.
```suggestion
public List<CompletableEvent<?>> uncompletedEvents() {
return tracked.stream()
.filter(e -> !e.future().isDone())
.collect(Collectors.toList());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]