philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listTopics</li> + * <li>partitionsFor</li> + * </ul> + * <p> + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * </p> + */ + +public class TopicMetadataRequestManager implements RequestManager { + private final boolean allowAutoTopicCreation; + private final Map<Optional<String>, TopicMetadataRequestState> inflightRequests; + private final long retryBackoffMs; + private final long retryBackoffMaxMs; + private final Logger log; + private final LogContext logContext; + + public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { + this.logContext = logContext; + this.log = logContext.logger(this.getClass()); + this.inflightRequests = new HashMap<>(); + this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); + } + + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + List<NetworkClientDelegate.UnsentRequest> requests = inflightRequests.values().stream() + .map(req -> req.send(currentTimeMs)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + return requests.isEmpty() ? + new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : + new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); + } + + /** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ + public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final Optional<String> topic) { + if (inflightRequests.containsKey(topic)) { + return inflightRequests.get(topic).future; + } + + TopicMetadataRequestState newRequest = new TopicMetadataRequestState( + logContext, + topic, + retryBackoffMs, + retryBackoffMaxMs); + inflightRequests.put(topic, newRequest); + return newRequest.future; + } + + // Visible for testing + List<TopicMetadataRequestState> inflightRequests() { + return new ArrayList<>(inflightRequests.values()); + } + + class TopicMetadataRequestState extends RequestState { + private final Optional<String> topic; + CompletableFuture<Map<String, List<PartitionInfo>>> future; + + public TopicMetadataRequestState(final LogContext logContext, + final Optional<String> topic, + final long retryBackoffMs, + final long retryBackoffMaxMs) { + super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, + retryBackoffMaxMs); + this.future = new CompletableFuture<>(); + this.topic = topic; + } + + /** + * prepare the metadata request and return an + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. + */ + private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { + if (!this.canSendRequest(currentTimeMs)) { + return Optional.empty(); + } + this.onSendAttempt(currentTimeMs); + + final MetadataRequest.Builder request; + request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) + .orElseGet(MetadataRequest.Builder::allTopics); + + final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( + request, + Optional.empty(), + (response, exception) -> { + if (exception != null) { + this.future.completeExceptionally(new KafkaException(exception)); + inflightRequests.remove(topic); + return; + } + + try { + Map<String, List<PartitionInfo>> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); + future.complete(res); + inflightRequests.remove(topic); + } catch (RetriableException e) { + if (e instanceof TimeoutException) { Review Comment: Hey @junrao - Sorry for writing such a long reply - I really need to brush up my memory about the network module as well as the TopicMetadataRequest... In the [current code](https://github.com/apache/kafka/blob/b49013b73efa25466652d8d8122974e60c927ec4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcher.java#L99-L149) I think the following is how responses should be handled. For the case of hard failure that the callback receives a non-null exception: We should remove the inflight request for all non-retriable and TimeoutException. We should continue to retry for RetriableException until timer runs out. For the case of soft failure (the server respond with an error code), it seems like the right thing to do is to continue to retry except for the non-retriable exception. It is worth noting that, if the request timeout (API level timeout), I think the callback is completed with TimeoutException hard failure, which should cause the exception being purged from the inflight request. To your question: if the responses include a retriable exception, I believe the right thing to do is to continue to retry until timeout. In fact - it seems like `if (e instanceof TimeoutException)` (that I added in the last commit) is actually incorrect. Because it is a RetriableException at it should be retried per: ``` if (error.exception() instanceof RetriableException) shouldRetry = true; ``` It seems like what's missing here is handling the hard RetriableException and TimeoutException. I will fix the code and add test cases to cover the scenarios above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org