philipnee commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1318977376


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -268,7 +326,7 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
                 .forConsumer(requireTimestamps, isolationLevel, false)
                 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
 
-        log.debug("Creating ListOffsetRequest {} for broker {} to reset 
positions", builder,

Review Comment:
   I really think ListOffsetRequest is fine.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
             log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
     }
 
+
+    void onSuccessfulRequestForValidatingPositions(

Review Comment:
   it should be onSuccessfulResponse ? As well as onFailedResponse.  Maybe 
`onSuccessfulValidatePositionResponse`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,

Review Comment:
   ditto - should be onSuccessfulResponse right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,
+                            result);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+                                "resetting positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e);
+                }
+            });
+        });
+        return unsentRequests;
+    }
+
+    /**
+     * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+     * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+     * <p/>
+     * Requests are grouped by Node for efficiency.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndValidatePositions(
+            Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate) {
+
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+                regroupFetchPositionsByLeader(partitionsToValidate);
+
+        long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+        regrouped.forEach((node, fetchPositions) -> {
+
+            if (node.isEmpty()) {

Review Comment:
   can this happen? i did see the missing leader was filtered out here:
   
   .filter(entry -> entry.getValue().currentLeader.leader.isPresent())
                   .collect(Collectors.groupingBy(entry -> 
entry.getValue().currentLeader.leader.get(),
                           Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -209,6 +215,34 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
         return ConsumerRecords.empty();
     }
 
+    /**
+     * Set the fetch position to the committed position (if there is one) or 
reset it using the
+     * offset reset policy the user has configured.
+     *
+     * @return true if the operation completed without timing out
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
+     *                                                                defined
+     */
+    private boolean updateFetchPositions() {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
+        eventHandler.add(validatePositionsEvent);
+
+        // TODO: integrate logic for refreshing committed offsets if available

Review Comment:
   Is there a follow up pr for this? or we are still missing the refresh 
committed offsets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to