junrao commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1320173913


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -68,17 +75,22 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
     private final IsolationLevel isolationLevel;
     private final Logger log;
     private final OffsetFetcherUtils offsetFetcherUtils;
+    private final SubscriptionState subscriptionState;
 
     private final Set<ListOffsetsRequestState> requestsToRetry;
     private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+    private final long requestTimeoutMs;
+    private final Time time;
+    private final ApiVersions apiVersions;
 
     public OffsetsRequestManager(final SubscriptionState subscriptionState,
-                                 final ConsumerMetadata metadata,
-                                 final IsolationLevel isolationLevel,
-                                 final Time time,
-                                 final long retryBackoffMs,
-                                 final ApiVersions apiVersions,
-                                 final LogContext logContext) {
+                                     final ConsumerMetadata metadata,

Review Comment:
   indentation



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -209,6 +215,34 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
         return ConsumerRecords.empty();
     }
 
+    /**
+     * Set the fetch position to the committed position (if there is one) or 
reset it using the
+     * offset reset policy the user has configured.
+     *
+     * @return true if the operation completed without timing out
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
+     *                                                                defined
+     */
+    private boolean updateFetchPositions() {

Review Comment:
   updateFetchPositionsIfNeeded?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -154,6 +170,48 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestamp>> fetchOffsets(
                 
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
result.fetchedOffsets));
     }
 
+    /**
+     * Reset offsets for all assigned partitions that require it. Offsets will 
be reset
+     * with timestamps according to the reset strategy defined for each 
partition. This will
+     * generate ListOffsets requests for the partitions and timestamps, and 
enqueue them to be sent
+     * on the next call to {@link #poll(long)}.
+     *
+     * <p/>
+     *
+     * When a response is received, positions are updated in-memory, on the 
subscription state. If
+     * an error is received in the response, it will be saved to be thrown on 
the next call to
+     * this function (ex. {@link 
org.apache.kafka.common.errors.TopicAuthorizationException})
+     */
+    public void resetPositionsIfNeeded() {
+        Map<TopicPartition, Long> offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
+
+        if (offsetResetTimestamps.isEmpty())
+            return;
+
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+                
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+        requestsToSend.addAll(unsentRequests);
+    }
+
+    /**
+     * Validate positions for all assigned partitions for which a leader 
change has been detected.
+     * This will generate OffsetsForLeaderEpoch requests for the partitions 
and timestamps, and

Review Comment:
   OffsetsForLeaderEpoch request doesn't depend on timestamp, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
             log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
     }
 
+
+    void onSuccessfulResponseForValidatingPositions(
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            final OffsetsForLeaderEpochUtils.OffsetForEpochResult 
offsetsResult) {
+        List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+        if (!offsetsResult.partitionsToRetry().isEmpty()) {
+            
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+                    time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate(false);
+        }
+
+        // For each OffsetsForLeader response, check if the end-offset is 
lower than our current offset
+        // for the partition. If so, it means we have experienced log 
truncation and need to reposition
+        // that partition's offset.
+        // In addition, check whether the returned offset and epoch are valid. 
If not, then we should reset
+        // its offset if reset policy is configured, or throw out of range 
exception.
+        offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+            SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
+            Optional<SubscriptionState.LogTruncation> truncationOpt =
+                    subscriptionState.maybeCompleteValidation(topicPartition, 
requestPosition,
+                            respEndOffset);
+            truncationOpt.ifPresent(truncations::add);
+        });
+
+        if (!truncations.isEmpty()) {
+            
maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));

Review Comment:
   Passing in LogTruncationException to `maybeSetOffsetForLeaderException` 
seems a bit weird since it's not an OffsetForLeaderException.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Event for resetting offsets for all assigned partitions that require it. 
This is an
+ * asynchronous event that generates ListOffsets requests, and completes by 
updating in-memory
+ * positions when responses are received.
+ */
+public class ResetPositionsApplicationEvent extends 
CompletableApplicationEvent<Void> {
+
+    public ResetPositionsApplicationEvent() {
+        super(Type.RESET_POSITIONS);
+    }
+
+    @Override
+    public String toString() {
+        return "ResetPositions{" +
+                "future=" + future +
+                ", type=" + type +

Review Comment:
   Could we avoid duplicating the code in `toString` here and that in 
`CompletableApplicationEvent`? Ditto for `ValidatePositionsApplicationEvent`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
+                            result);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+                                "resetting positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
+                }
+            });
+        });
+        return unsentRequests;
+    }
+
+    /**
+     * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+     * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+     * <p/>
+     * Requests are grouped by Node for efficiency.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndValidatePositions(
+            Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate) {
+
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+                regroupFetchPositionsByLeader(partitionsToValidate);
+
+        long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+        regrouped.forEach((node, fetchPositions) -> {
+
+            if (node.isEmpty()) {
+                metadata.requestUpdate(true);
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+                                "support the required protocol version 
(introduced in Kafka 2.3)",
+                        fetchPositions.keySet());
+                for (TopicPartition partition : fetchPositions.keySet()) {
+                    subscriptionState.completeValidation(partition);
+                }
+                return;
+            }
+
+            subscriptionState.setNextAllowedRetry(fetchPositions.keySet(), 
nextResetTimeMs);
+
+            CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
partialResult =
+                    buildOffsetsForLeaderEpochRequestToNode(node, 
fetchPositions, unsentRequests);
+
+            partialResult.whenComplete((offsetsResult, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
+                            offsetsResult);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
OffsetsForLeaderEpoch " +
+                                "request for validating positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
+                }
+            });
+
+        });
+
+        return unsentRequests;
+    }
+
+    /**
+     * Build OffsetsForLeaderEpoch request to send to a specific broker for 
the partitions and
+     * positions to fetch. This also adds the request to the list of 
unsentRequests.
+     **/
+    private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
buildOffsetsForLeaderEpochRequestToNode(
+            final Node node,
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
+        AbstractRequest.Builder<OffsetsForLeaderEpochRequest> builder =
+                OffsetsForLeaderEpochUtils.prepareRequest(fetchPositions);
+
+        log.debug("Creating OffsetsForLeaderEpoch request request {} to broker 
{}", builder, node);
+
+        NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                builder,
+                Optional.ofNullable(node));
+        unsentRequests.add(unsentRequest);
+        CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> 
result = new CompletableFuture<>();
+        unsentRequest.future().whenComplete((response, error) -> {
+            if (error != null) {
+                log.debug("Sending OffsetsForLeaderEpoch request {} to broker 
{} failed",
+                        builder,
+                        node,
+                        error);
+                result.completeExceptionally(error);
+            } else {
+                OffsetsForLeaderEpochResponse lor = 
(OffsetsForLeaderEpochResponse) response.responseBody();

Review Comment:
   What does lor stand for?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
             log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
     }
 
+
+    void onSuccessfulResponseForValidatingPositions(
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            final OffsetsForLeaderEpochUtils.OffsetForEpochResult 
offsetsResult) {
+        List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+        if (!offsetsResult.partitionsToRetry().isEmpty()) {
+            
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+                    time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate(false);
+        }
+
+        // For each OffsetsForLeader response, check if the end-offset is 
lower than our current offset
+        // for the partition. If so, it means we have experienced log 
truncation and need to reposition
+        // that partition's offset.
+        // In addition, check whether the returned offset and epoch are valid. 
If not, then we should reset
+        // its offset if reset policy is configured, or throw out of range 
exception.
+        offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+            SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
+            Optional<SubscriptionState.LogTruncation> truncationOpt =
+                    subscriptionState.maybeCompleteValidation(topicPartition, 
requestPosition,
+                            respEndOffset);
+            truncationOpt.ifPresent(truncations::add);
+        });
+
+        if (!truncations.isEmpty()) {
+            
maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
+        }
+    }
+
+    void onFailedResponseForValidatingPositions(final Map<TopicPartition, 
SubscriptionState.FetchPosition> fetchPositions,
+                                                final RuntimeException error) {
+        subscriptionState.requestFailed(fetchPositions.keySet(), 
time.milliseconds() + retryBackoffMs);
+        metadata.requestUpdate(false);
+
+        if (!(error instanceof RetriableException)) {
+            maybeSetOffsetForLeaderException(error);
+        }
+    }
+
+    private LogTruncationException 
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
+        Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new 
HashMap<>();
+        Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
+        for (SubscriptionState.LogTruncation truncation : truncations) {
+            truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
+                    divergentOffsets.put(truncation.topicPartition, 
divergentOffset));
+            truncatedFetchOffsets.put(truncation.topicPartition, 
truncation.fetchPosition.offset);
+        }
+        return new LogTruncationException("Detected truncated partitions: " + 
truncations,

Review Comment:
   Should we customize the `toString` method of LogTruncationException?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -154,6 +170,48 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestamp>> fetchOffsets(
                 
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
result.fetchedOffsets));
     }
 
+    /**
+     * Reset offsets for all assigned partitions that require it. Offsets will 
be reset
+     * with timestamps according to the reset strategy defined for each 
partition. This will
+     * generate ListOffsets requests for the partitions and timestamps, and 
enqueue them to be sent
+     * on the next call to {@link #poll(long)}.
+     *
+     * <p/>
+     *
+     * When a response is received, positions are updated in-memory, on the 
subscription state. If
+     * an error is received in the response, it will be saved to be thrown on 
the next call to
+     * this function (ex. {@link 
org.apache.kafka.common.errors.TopicAuthorizationException})
+     */
+    public void resetPositionsIfNeeded() {
+        Map<TopicPartition, Long> offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
+
+        if (offsetResetTimestamps.isEmpty())
+            return;
+
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests =
+                
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+        requestsToSend.addAll(unsentRequests);
+    }
+
+    /**
+     * Validate positions for all assigned partitions for which a leader 
change has been detected.
+     * This will generate OffsetsForLeaderEpoch requests for the partitions 
and timestamps, and
+     * enqueue them to be sent on the next call to {@link #poll(long)}.
+     *
+     * <p/>
+     *
+     * When a response is received, positions are validated and, if a log 
truncation is
+     * detected, a {@link LogTruncationException} will be saved in memory, to 
be thrown on the
+     * next call to this function.
+     */
+    public void validatePositionsIfNeeded() {
+        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate =

Review Comment:
   If partitionsToValidate is empty, should we avoid sending an empty request?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -298,6 +356,154 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
         return result;
     }
 
+    /**
+     * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+     * partitions.
+     * Use the retrieved offsets to reset positions in the subscription state.
+     *
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A list of
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+     * that can be polled to obtain the corresponding timestamps and offsets.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndResetPositions(
+            final Map<TopicPartition, Long> timestampsToSearch) {
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+        final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
+
+        timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+            subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+                    time.milliseconds() + requestTimeoutMs);
+
+            CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
+                    node,
+                    resetTimestamps,
+                    false,
+                    unsentRequests);
+
+            partialResult.whenComplete((result, error) -> {
+                if (error == null) {
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
+                            result);
+                } else {
+                    RuntimeException e;
+                    if (error instanceof RuntimeException) {
+                        e = (RuntimeException) error;
+                    } else {
+                        e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+                                "resetting positions", error);
+                    }
+                    
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
+                }
+            });
+        });
+        return unsentRequests;
+    }
+
+    /**
+     * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+     * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+     * <p/>
+     * Requests are grouped by Node for efficiency.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequestsAndValidatePositions(

Review Comment:
   This generates OffsetsForLeaderEpochRequests, not ListOffsetsRequests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to