lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1319093808
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( return result; } + /** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions( + final Map<TopicPartition, Long> timestampsToSearch) { + Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, Optional.empty()); + + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + + timestampsToSearchByNode.forEach((node, resetTimestamps) -> { + subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), + time.milliseconds() + requestTimeoutMs); + + CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( + node, + resetTimestamps, + false, + unsentRequests); + + partialResult.whenComplete((result, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, + result); + } else { + RuntimeException e; + if (error instanceof RuntimeException) { + e = (RuntimeException) error; + } else { + e = new RuntimeException("Unexpected failure in ListOffsets request for " + + "resetting positions", error); + } + offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e); + } + }); + }); + return unsentRequests; + } + + /** + * For each partition that needs validation, make an asynchronous request to get the end-offsets + * for the partition with the epoch less than or equal to the epoch the partition last saw. + * <p/> + * Requests are grouped by Node for efficiency. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndValidatePositions( + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) { + + final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = + regroupFetchPositionsByLeader(partitionsToValidate); + + long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + regrouped.forEach((node, fetchPositions) -> { + + if (node.isEmpty()) { Review Comment: Yes, it's needed, these are 2 different things. The `if` is checking the Node object `isEmpty` function, but the filter is only removing the entries for which the leader (`Optional<Node>`) is not present. So we could have an Optional<Node> present, but returning a Node for which `isEmpty` is true (used mainly in case of errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org