lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1319093808


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,
+                            result);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+                                "resetting positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e);
+                }
+            });
+        });
+        return unsentRequests;
+    }
+
+    /**
+     * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+     * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+     * <p/>
+     * Requests are grouped by Node for efficiency.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndValidatePositions(
+            Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate) {
+
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+                regroupFetchPositionsByLeader(partitionsToValidate);
+
+        long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+        regrouped.forEach((node, fetchPositions) -> {
+
+            if (node.isEmpty()) {

Review Comment:
   Yes, it's needed, these are 2 different things. The `if` is checking the 
Node object `isEmpty` function, but the filter is only removing the entries for 
which  the leader (`Optional<Node>`) is not present. 
   
   So we could have an Optional<Node> present, but returning a Node for which 
`isEmpty` is true (used mainly in case of errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to