philipnee commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1318977376
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -268,7 +326,7 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( .forConsumer(requireTimestamps, isolationLevel, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes)); - log.debug("Creating ListOffsetRequest {} for broker {} to reset positions", builder, Review Comment: I really think ListOffsetRequest is fine. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + + void onSuccessfulRequestForValidatingPositions( Review Comment: it should be onSuccessfulResponse ? As well as onFailedResponse. Maybe `onSuccessfulValidatePositionResponse` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( return result; } + /** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions( + final Map<TopicPartition, Long> timestampsToSearch) { + Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, Optional.empty()); + + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + + timestampsToSearchByNode.forEach((node, resetTimestamps) -> { + subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), + time.milliseconds() + requestTimeoutMs); + + CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( + node, + resetTimestamps, + false, + unsentRequests); + + partialResult.whenComplete((result, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, Review Comment: ditto - should be onSuccessfulResponse right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( return result; } + /** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions( + final Map<TopicPartition, Long> timestampsToSearch) { + Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, Optional.empty()); + + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + + timestampsToSearchByNode.forEach((node, resetTimestamps) -> { + subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), + time.milliseconds() + requestTimeoutMs); + + CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( + node, + resetTimestamps, + false, + unsentRequests); + + partialResult.whenComplete((result, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, + result); + } else { + RuntimeException e; + if (error instanceof RuntimeException) { + e = (RuntimeException) error; + } else { + e = new RuntimeException("Unexpected failure in ListOffsets request for " + + "resetting positions", error); + } + offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e); + } + }); + }); + return unsentRequests; + } + + /** + * For each partition that needs validation, make an asynchronous request to get the end-offsets + * for the partition with the epoch less than or equal to the epoch the partition last saw. + * <p/> + * Requests are grouped by Node for efficiency. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndValidatePositions( + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) { + + final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = + regroupFetchPositionsByLeader(partitionsToValidate); + + long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + regrouped.forEach((node, fetchPositions) -> { + + if (node.isEmpty()) { Review Comment: can this happen? i did see the missing leader was filtered out here: .filter(entry -> entry.getValue().currentLeader.leader.isPresent()) .collect(Collectors.groupingBy(entry -> entry.getValue().currentLeader.leader.get(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -209,6 +215,34 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { return ConsumerRecords.empty(); } + /** + * Set the fetch position to the committed position (if there is one) or reset it using the + * offset reset policy the user has configured. + * + * @return true if the operation completed without timing out + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + */ + private boolean updateFetchPositions() { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); + eventHandler.add(validatePositionsEvent); + + // TODO: integrate logic for refreshing committed offsets if available Review Comment: Is there a follow up pr for this? or we are still missing the refresh committed offsets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org