kirktrue commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1859304634
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -168,6 +171,8 @@ private void processApplicationEvents() {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);
+ List<CompletableEvent<?>> completableEvents =
reapExpiredApplicationEvents(currentTimeMs);
Review Comment:
Do we need to reap the events on every loop?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java:
##########
@@ -32,4 +32,8 @@ public class CheckAndUpdatePositionsEvent extends
CompletableApplicationEvent<Bo
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
}
+
+ public CheckAndUpdatePositionsEvent(long deadlineMs, boolean
isPassedByErrorEvent) {
+ super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs,
isPassedByErrorEvent);
+ }
Review Comment:
At this point it looks like `CheckAndUpdatePositionsEvent` is the only event
that uses `isPassedByErrorEvent`. Any reason not to define it in this event as
opposed to the superclass?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -102,6 +102,8 @@ public void reap(long currentTimeMs) {
// Second, remove any events that are already complete, just to make
sure we don't hold references. This will
// include any events that finished successfully as well as any events
we just completed exceptionally above.
tracked.removeIf(e -> e.future().isDone());
+
+ return tracked;
Review Comment:
I'm not sure I understand this pattern. Can we add a new method that returns
the tracked events instead of overloading the purpose of `reap()`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +330,23 @@ void cleanup() {
log.debug("Closed the consumer network thread");
}
}
+
+ /**
+ * If there is a metadata error, complete the completable events which are
not passed by
+ * error event with the metadata error
+ * @param events the completable events
+ */
+ private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ List<CompletableEvent<?>> completableApplicationEvent =
events.stream()
+ .filter(event -> !(event instanceof
CompletableApplicationEvent &&
+ ((CompletableApplicationEvent<?>)
event).isPassedByErrorEvent()))
+ .collect(Collectors.toList());
+ if (!completableApplicationEvent.isEmpty()) {
+ completableApplicationEvent.forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
+ }
+ }
+ }
Review Comment:
Do we only want to clear the error if an event "consumed" the error?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +330,23 @@ void cleanup() {
log.debug("Closed the consumer network thread");
}
}
+
+ /**
+ * If there is a metadata error, complete the completable events which are
not passed by
+ * error event with the metadata error
+ * @param events the completable events
+ */
+ private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+ if (networkClientDelegate.metadataError().isPresent()) {
+ Throwable metadataError =
networkClientDelegate.metadataError().get();
+ List<CompletableEvent<?>> completableApplicationEvent =
events.stream()
+ .filter(event -> !(event instanceof
CompletableApplicationEvent &&
+ ((CompletableApplicationEvent<?>)
event).isPassedByErrorEvent()))
+ .collect(Collectors.toList());
+ if (!completableApplicationEvent.isEmpty()) {
+ completableApplicationEvent.forEach(event ->
event.future().completeExceptionally(metadataError));
+ networkClientDelegate.clearMetadataError();
+ }
+ }
+ }
Review Comment:
It seems like this could be a little more succinct:
```suggestion
private void maybeFailOnMetadataError() {
networkClientDelegate.metadataError().ifPresent(metadataError -> {
applicationEventReaper.tracked().stream()
.filter(e -> e instanceof CompletableApplicationEvent<?>)
.map(e -> (CompletableApplicationEvent<?>) e)
.filter(e -> e.isPassedByErrorEvent())
.filter(e -> !e.future().isDone())
.forEach(e ->
e.future().completeExceptionally(metadataError));
networkClientDelegate.clearMetadataError();
}));
}
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -168,6 +171,8 @@ private void processApplicationEvents() {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);
+ List<CompletableEvent<?>> completableEvents =
reapExpiredApplicationEvents(currentTimeMs);
Review Comment:
It seems like we should remove the reaping from this method entirely and
have `runOnce()` call `maybeFailOnMetadataError()` directly.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##########
@@ -38,6 +43,13 @@ protected CompletableApplicationEvent(final Type type, final
long deadlineMs) {
this.deadlineMs = deadlineMs;
}
+ protected CompletableApplicationEvent(final Type type, final long
deadlineMs, final boolean isPassedByErrorEvent) {
+ super(type);
+ this.future = new CompletableFuture<>();
+ this.deadlineMs = deadlineMs;
+ this.isPassedByErrorEvent = isPassedByErrorEvent;
+ }
Review Comment:
I'd prefer to see the other constructor call this one so that we have as few
paths as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]