m1a2st commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1850181200
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -206,12 +213,20 @@ public CompletableFuture<Map<TopicPartition,
OffsetAndTimestampInternal>> fetchO
" Result {}", timestampsToSearch, result);
}
});
-
+
prepareFetchOffsetsRequests(timestampsToSearch, requireTimestamps,
listOffsetsRequestState);
- return listOffsetsRequestState.globalResult.thenApply(
+
+
topicToOffsetsResult.set(listOffsetsRequestState.globalResult.thenApply(
result -> OffsetFetcherUtils.buildOffsetsForTimeInternalResult(
timestampsToSearch,
- result.fetchedOffsets));
+ result.fetchedOffsets)));
+
+ new ArrayList<>(metadataErrors).forEach(metadataError ->
metadataError.whenComplete((__, error) -> {
+ topicToOffsetsResult.get().completeExceptionally(error);
Review Comment:
In the current version, there are two places where CompletedFuture is used
to propagate errors:
1. `NetworkClientDelegate#maybePropagateMetadataError`
2. `CoordinatorRequestManager`
As a result, we need a list to store the different errors coming from these
two sources.
> Just with the goal of simplifying here, is a future really needed or just
keeping the error could do? The main difference with the metadata errors is
that the CommitRequestManager.poll called regularly will have the pending
requests in hand, so we could fail them if there is a fatal error in the
coordinator manager.
But if we can pass the `CoordinatorRequestManager` fatal error to use
exception, we should not use List to store these CompletedFuture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]