lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1320281717
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -154,6 +170,48 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } + /** + * Reset offsets for all assigned partitions that require it. Offsets will be reset + * with timestamps according to the reset strategy defined for each partition. This will + * generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent + * on the next call to {@link #poll(long)}. + * + * <p/> + * + * When a response is received, positions are updated in-memory, on the subscription state. If + * an error is received in the response, it will be saved to be thrown on the next call to + * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) + */ + public void resetPositionsIfNeeded() { + Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); + + if (offsetResetTimestamps.isEmpty()) + return; + + List<NetworkClientDelegate.UnsentRequest> unsentRequests = + buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps); + requestsToSend.addAll(unsentRequests); + } + + /** + * Validate positions for all assigned partitions for which a leader change has been detected. + * This will generate OffsetsForLeaderEpoch requests for the partitions and timestamps, and Review Comment: Right, it only involves partitions and epochs in the request. Comment updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org