philipnee commented on code in PR #14308: URL: https://github.com/apache/kafka/pull/14308#discussion_r1310633377
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; +import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetsRequestData; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Manager responsible for building the following requests to retrieve partition offsets, and + * processing its responses. + * <ul> + * <li>ListOffset request</li> + * <li>OffsetForLeaderEpoch request</li> + * </ul> + * Requests are kept in-memory ready to be sent on the next call to {@link #poll(long)}. + * <br> + * Partition leadership information required to build ListOffset requests is retrieved from the + * {@link ConsumerMetadata}, so this implements {@link ClusterResourceListener} to get notified + * when the cluster metadata is updated. + */ +public class OffsetsRequestManager implements RequestManager, ClusterResourceListener { + + private final ConsumerMetadata metadata; + private final IsolationLevel isolationLevel; + private final Logger log; + private final OffsetFetcherUtils offsetFetcherUtils; + + private final Set<ListOffsetsRequestState> requestsToRetry; + private final List<NetworkClientDelegate.UnsentRequest> requestsToSend; + + public OffsetsRequestManager(final SubscriptionState subscriptionState, + final ConsumerMetadata metadata, + final IsolationLevel isolationLevel, + final Time time, + final long retryBackoffMs, + final ApiVersions apiVersions, + final LogContext logContext) { + requireNonNull(subscriptionState); + requireNonNull(metadata); + requireNonNull(isolationLevel); + requireNonNull(time); + requireNonNull(apiVersions); + requireNonNull(logContext); + + this.metadata = metadata; + this.metadata.addClusterUpdateListener(this); + this.isolationLevel = isolationLevel; + this.log = logContext.logger(getClass()); + this.requestsToRetry = new HashSet<>(); + this.requestsToSend = new ArrayList<>(); + this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, + time, retryBackoffMs, apiVersions); + } + + /** + * Determine if a there are pending fetch offsets requests to be sent and build a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} + * containing it. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (requestsToSend.isEmpty()) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + NetworkClientDelegate.PollResult pollResult = + new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>(requestsToSend)); + this.requestsToSend.clear(); + + return pollResult; + } + + /** + * Retrieve offsets for the given partitions and timestamp. + * + * @param timestampsToSearch Partitions and target timestamps to get offsets for + * @param requireTimestamps True if this should fail with an UnsupportedVersionException if the + * broker does not support fetching precise timestamps for offsets + * @return Future containing the map of {@link TopicPartition} and {@link OffsetAndTimestamp} + * found (offset of the first message whose timestamp is greater than or equals to the target + * timestamp).The future will complete when the requests responses are received and + * processed, following a call to {@link #poll(long)} + */ + public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( + final Map<TopicPartition, Long> timestampsToSearch, + final boolean requireTimestamps) { + if (timestampsToSearch.isEmpty()) { + return CompletableFuture.completedFuture(Collections.emptyMap()); + } + metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet())); + + ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState( + timestampsToSearch, + requireTimestamps, + offsetFetcherUtils, + isolationLevel); + listOffsetsRequestState.globalResult.whenComplete((result, error) -> { + metadata.clearTransientTopics(); + if (error != null) { + log.error("Fetch offsets completed with error for partitions and timestamps {}.", + timestampsToSearch, error); + } else { + log.debug("Fetch offsets completed successfully for partitions and timestamps {}." + + " Result {}", timestampsToSearch, result); + } + }); + + fetchOffsetsByTimes(timestampsToSearch, requireTimestamps, listOffsetsRequestState); + + return listOffsetsRequestState.globalResult.thenApply(result -> + OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); + } + + /** + * Generate requests for partitions with known leaders. Update the listOffsetsRequestState by adding + * partitions with unknown leader to the listOffsetsRequestState.remainingToSearch + */ + private void fetchOffsetsByTimes(final Map<TopicPartition, Long> timestampsToSearch, + final boolean requireTimestamps, + final ListOffsetsRequestState listOffsetsRequestState) { + if (timestampsToSearch.isEmpty()) { + // Early return if empty map to avoid wrongfully raising StaleMetadataException on + // empty grouping + return; + } + try { + List<NetworkClientDelegate.UnsentRequest> unsentRequests = sendListOffsetsRequests( + timestampsToSearch, requireTimestamps, listOffsetsRequestState); + requestsToSend.addAll(unsentRequests); + } catch (StaleMetadataException e) { + requestsToRetry.add(listOffsetsRequestState); + } + } + + @Override + public void onUpdate(ClusterResource clusterResource) { + // Retry requests that were awaiting a metadata update. Process a copy of the list to + // avoid errors, given that the list of requestsToRetry may be modified from the + // fetchOffsetsByTimes call if any of the requests being retried fails + List<ListOffsetsRequestState> requestsToProcess = new ArrayList<>(requestsToRetry); + requestsToRetry.clear(); + requestsToProcess.forEach(requestState -> { + Map<TopicPartition, Long> timestampsToSearch = + new HashMap<>(requestState.remainingToSearch); + requestState.remainingToSearch.clear(); + fetchOffsetsByTimes(timestampsToSearch, requestState.requireTimestamps, requestState); + }); + } + + /** + * Search the offsets by target times for the specified partitions. + * + * @param partitionResetTimestamps the mapping between partitions and target time + * @param requireTimestamps true if we should fail with an UnsupportedVersionException if the broker does + * not support fetching precise timestamps for offsets + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ + private List<NetworkClientDelegate.UnsentRequest> sendListOffsetsRequests( Review Comment: yeah this is probably my fault - I started using "send" in the context of request manager. What it really means is handing these requests to the networkclient to get send out. I think i've tried to use "enqueue" before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org