junrao commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1320173913
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -68,17 +75,22 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis private final IsolationLevel isolationLevel; private final Logger log; private final OffsetFetcherUtils offsetFetcherUtils; + private final SubscriptionState subscriptionState; private final Set<ListOffsetsRequestState> requestsToRetry; private final List<NetworkClientDelegate.UnsentRequest> requestsToSend; + private final long requestTimeoutMs; + private final Time time; + private final ApiVersions apiVersions; public OffsetsRequestManager(final SubscriptionState subscriptionState, - final ConsumerMetadata metadata, - final IsolationLevel isolationLevel, - final Time time, - final long retryBackoffMs, - final ApiVersions apiVersions, - final LogContext logContext) { + final ConsumerMetadata metadata, Review Comment: indentation ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -209,6 +215,34 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { return ConsumerRecords.empty(); } + /** + * Set the fetch position to the committed position (if there is one) or reset it using the + * offset reset policy the user has configured. + * + * @return true if the operation completed without timing out + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + */ + private boolean updateFetchPositions() { Review Comment: updateFetchPositionsIfNeeded? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -154,6 +170,48 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } + /** + * Reset offsets for all assigned partitions that require it. Offsets will be reset + * with timestamps according to the reset strategy defined for each partition. This will + * generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent + * on the next call to {@link #poll(long)}. + * + * <p/> + * + * When a response is received, positions are updated in-memory, on the subscription state. If + * an error is received in the response, it will be saved to be thrown on the next call to + * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) + */ + public void resetPositionsIfNeeded() { + Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); + + if (offsetResetTimestamps.isEmpty()) + return; + + List<NetworkClientDelegate.UnsentRequest> unsentRequests = + buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps); + requestsToSend.addAll(unsentRequests); + } + + /** + * Validate positions for all assigned partitions for which a leader change has been detected. + * This will generate OffsetsForLeaderEpoch requests for the partitions and timestamps, and Review Comment: OffsetsForLeaderEpoch request doesn't depend on timestamp, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + + void onSuccessfulResponseForValidatingPositions( + final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) { + List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); + if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(), + time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + } + + // For each OffsetsForLeader response, check if the end-offset is lower than our current offset + // for the partition. If so, it means we have experienced log truncation and need to reposition + // that partition's offset. + // In addition, check whether the returned offset and epoch are valid. If not, then we should reset + // its offset if reset policy is configured, or throw out of range exception. + offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> { + SubscriptionState.FetchPosition requestPosition = fetchPositions.get(topicPartition); + Optional<SubscriptionState.LogTruncation> truncationOpt = + subscriptionState.maybeCompleteValidation(topicPartition, requestPosition, + respEndOffset); + truncationOpt.ifPresent(truncations::add); + }); + + if (!truncations.isEmpty()) { + maybeSetOffsetForLeaderException(buildLogTruncationException(truncations)); Review Comment: Passing in LogTruncationException to `maybeSetOffsetForLeaderException` seems a bit weird since it's not an OffsetForLeaderException. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Event for resetting offsets for all assigned partitions that require it. This is an + * asynchronous event that generates ListOffsets requests, and completes by updating in-memory + * positions when responses are received. + */ +public class ResetPositionsApplicationEvent extends CompletableApplicationEvent<Void> { + + public ResetPositionsApplicationEvent() { + super(Type.RESET_POSITIONS); + } + + @Override + public String toString() { + return "ResetPositions{" + + "future=" + future + + ", type=" + type + Review Comment: Could we avoid duplicating the code in `toString` here and that in `CompletableApplicationEvent`? Ditto for `ValidatePositionsApplicationEvent`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( return result; } + /** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions( + final Map<TopicPartition, Long> timestampsToSearch) { + Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, Optional.empty()); + + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + + timestampsToSearchByNode.forEach((node, resetTimestamps) -> { + subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), + time.milliseconds() + requestTimeoutMs); + + CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( + node, + resetTimestamps, + false, + unsentRequests); + + partialResult.whenComplete((result, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, + result); + } else { + RuntimeException e; + if (error instanceof RuntimeException) { + e = (RuntimeException) error; + } else { + e = new RuntimeException("Unexpected failure in ListOffsets request for " + + "resetting positions", error); + } + offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e); + } + }); + }); + return unsentRequests; + } + + /** + * For each partition that needs validation, make an asynchronous request to get the end-offsets + * for the partition with the epoch less than or equal to the epoch the partition last saw. + * <p/> + * Requests are grouped by Node for efficiency. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndValidatePositions( + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) { + + final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = + regroupFetchPositionsByLeader(partitionsToValidate); + + long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + regrouped.forEach((node, fetchPositions) -> { + + if (node.isEmpty()) { + metadata.requestUpdate(true); + return; + } + + NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); + if (nodeApiVersions == null) { + return; + } + + if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { + log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", + fetchPositions.keySet()); + for (TopicPartition partition : fetchPositions.keySet()) { + subscriptionState.completeValidation(partition); + } + return; + } + + subscriptionState.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs); + + CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> partialResult = + buildOffsetsForLeaderEpochRequestToNode(node, fetchPositions, unsentRequests); + + partialResult.whenComplete((offsetsResult, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions, + offsetsResult); + } else { + RuntimeException e; + if (error instanceof RuntimeException) { + e = (RuntimeException) error; + } else { + e = new RuntimeException("Unexpected failure in OffsetsForLeaderEpoch " + + "request for validating positions", error); + } + offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e); + } + }); + + }); + + return unsentRequests; + } + + /** + * Build OffsetsForLeaderEpoch request to send to a specific broker for the partitions and + * positions to fetch. This also adds the request to the list of unsentRequests. + **/ + private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> buildOffsetsForLeaderEpochRequestToNode( + final Node node, + final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + List<NetworkClientDelegate.UnsentRequest> unsentRequests) { + AbstractRequest.Builder<OffsetsForLeaderEpochRequest> builder = + OffsetsForLeaderEpochUtils.prepareRequest(fetchPositions); + + log.debug("Creating OffsetsForLeaderEpoch request request {} to broker {}", builder, node); + + NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( + builder, + Optional.ofNullable(node)); + unsentRequests.add(unsentRequest); + CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> result = new CompletableFuture<>(); + unsentRequest.future().whenComplete((response, error) -> { + if (error != null) { + log.debug("Sending OffsetsForLeaderEpoch request {} to broker {} failed", + builder, + node, + error); + result.completeExceptionally(error); + } else { + OffsetsForLeaderEpochResponse lor = (OffsetsForLeaderEpochResponse) response.responseBody(); Review Comment: What does lor stand for? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + + void onSuccessfulResponseForValidatingPositions( + final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) { + List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); + if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(), + time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + } + + // For each OffsetsForLeader response, check if the end-offset is lower than our current offset + // for the partition. If so, it means we have experienced log truncation and need to reposition + // that partition's offset. + // In addition, check whether the returned offset and epoch are valid. If not, then we should reset + // its offset if reset policy is configured, or throw out of range exception. + offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> { + SubscriptionState.FetchPosition requestPosition = fetchPositions.get(topicPartition); + Optional<SubscriptionState.LogTruncation> truncationOpt = + subscriptionState.maybeCompleteValidation(topicPartition, requestPosition, + respEndOffset); + truncationOpt.ifPresent(truncations::add); + }); + + if (!truncations.isEmpty()) { + maybeSetOffsetForLeaderException(buildLogTruncationException(truncations)); + } + } + + void onFailedResponseForValidatingPositions(final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final RuntimeException error) { + subscriptionState.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + + if (!(error instanceof RetriableException)) { + maybeSetOffsetForLeaderException(error); + } + } + + private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) { + Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<>(); + Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>(); + for (SubscriptionState.LogTruncation truncation : truncations) { + truncation.divergentOffsetOpt.ifPresent(divergentOffset -> + divergentOffsets.put(truncation.topicPartition, divergentOffset)); + truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset); + } + return new LogTruncationException("Detected truncated partitions: " + truncations, Review Comment: Should we customize the `toString` method of LogTruncationException? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -154,6 +170,48 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } + /** + * Reset offsets for all assigned partitions that require it. Offsets will be reset + * with timestamps according to the reset strategy defined for each partition. This will + * generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent + * on the next call to {@link #poll(long)}. + * + * <p/> + * + * When a response is received, positions are updated in-memory, on the subscription state. If + * an error is received in the response, it will be saved to be thrown on the next call to + * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) + */ + public void resetPositionsIfNeeded() { + Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); + + if (offsetResetTimestamps.isEmpty()) + return; + + List<NetworkClientDelegate.UnsentRequest> unsentRequests = + buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps); + requestsToSend.addAll(unsentRequests); + } + + /** + * Validate positions for all assigned partitions for which a leader change has been detected. + * This will generate OffsetsForLeaderEpoch requests for the partitions and timestamps, and + * enqueue them to be sent on the next call to {@link #poll(long)}. + * + * <p/> + * + * When a response is received, positions are validated and, if a log truncation is + * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * next call to this function. + */ + public void validatePositionsIfNeeded() { + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = Review Comment: If partitionsToValidate is empty, should we avoid sending an empty request? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode( return result; } + /** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions( + final Map<TopicPartition, Long> timestampsToSearch) { + Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, Optional.empty()); + + final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); + + timestampsToSearchByNode.forEach((node, resetTimestamps) -> { + subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), + time.milliseconds() + requestTimeoutMs); + + CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( + node, + resetTimestamps, + false, + unsentRequests); + + partialResult.whenComplete((result, error) -> { + if (error == null) { + offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, + result); + } else { + RuntimeException e; + if (error instanceof RuntimeException) { + e = (RuntimeException) error; + } else { + e = new RuntimeException("Unexpected failure in ListOffsets request for " + + "resetting positions", error); + } + offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e); + } + }); + }); + return unsentRequests; + } + + /** + * For each partition that needs validation, make an asynchronous request to get the end-offsets + * for the partition with the epoch less than or equal to the epoch the partition last saw. + * <p/> + * Requests are grouped by Node for efficiency. + */ + private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndValidatePositions( Review Comment: This generates OffsetsForLeaderEpochRequests, not ListOffsetsRequests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org