[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1330551023 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -395,27 +386,28 @@ class PendingRequests { List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +// Visible for teseting +boolean hasUnsentRequests() { return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty(); } -public CompletableFuture addOffsetCommitRequest(final Map offsets) { +CompletableFuture addOffsetCommitRequest(final Map offsets) { // TODO: Dedupe committing the same offsets to the same partitions OffsetCommitRequestState request = new OffsetCommitRequestState( offsets, groupState.groupId, groupState.groupInstanceId.orElse(null), groupState.generation); unsentOffsetCommits.add(request); -return request.future(); +return request.future; } /** - * Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future - * to the existing one. + * Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future + * to the existing one. * - * If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} - * upon completion. + * If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} + * upon completion. Review Comment: nit: wrong tag and other unclosed ones above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327799007 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327524919 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -141,35 +142,57 @@ private Optional send(final long currentTim final MetadataRequest.Builder request; request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) -.orElseGet(MetadataRequest.Builder::allTopics); - -final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( -request, -Optional.empty(), -(response, exception) -> { -if (exception != null) { -this.future.completeExceptionally(new KafkaException(exception)); -inflightRequests.remove(topic); -return; -} - -try { -Map> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); -future.complete(res); -inflightRequests.remove(topic); -} catch (RetriableException e) { -if (e instanceof TimeoutException) { -inflightRequests.remove(topic); -} -this.onFailedAttempt(currentTimeMs); -} catch (Exception t) { -this.future.completeExceptionally(t); -inflightRequests.remove(topic); -} -}); +.orElseGet(MetadataRequest.Builder::allTopics); + +final NetworkClientDelegate.UnsentRequest unsent = createUnsentRequest(request); return Optional.of(unsent); } +private NetworkClientDelegate.UnsentRequest createUnsentRequest( +final MetadataRequest.Builder request) { +return new NetworkClientDelegate.UnsentRequest( +request, +Optional.empty(), +this::processResponseOrException +); +} + +private void processResponseOrException(final ClientResponse response, +final Throwable exception) { +long responseTimeMs = System.currentTimeMillis(); +if (exception != null) { +handleException(exception, responseTimeMs); +return; +} +handleResponse(response, responseTimeMs); Review Comment: I think it would be clearer and better to maintain/troubleshoot if error handling is in one place. Maybe having a single `handleException(final Throwable exception, final long responseTimeMs, boolean retryOnTimeout)`, called in both cases: ``` if (exception != null) { // Handle hard errors handleException(exception, responseTimeMs, false); ... } ... catch (Exception e) { // Handle soft errors handleException(e, responseTimeMs, true); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327372398 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -141,35 +142,57 @@ private Optional send(final long currentTim final MetadataRequest.Builder request; request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) -.orElseGet(MetadataRequest.Builder::allTopics); - -final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( -request, -Optional.empty(), -(response, exception) -> { -if (exception != null) { -this.future.completeExceptionally(new KafkaException(exception)); -inflightRequests.remove(topic); -return; -} - -try { -Map> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); -future.complete(res); -inflightRequests.remove(topic); -} catch (RetriableException e) { -if (e instanceof TimeoutException) { -inflightRequests.remove(topic); -} -this.onFailedAttempt(currentTimeMs); -} catch (Exception t) { -this.future.completeExceptionally(t); -inflightRequests.remove(topic); -} -}); +.orElseGet(MetadataRequest.Builder::allTopics); + +final NetworkClientDelegate.UnsentRequest unsent = createUnsentRequest(request); return Optional.of(unsent); } +private NetworkClientDelegate.UnsentRequest createUnsentRequest( +final MetadataRequest.Builder request) { +return new NetworkClientDelegate.UnsentRequest( +request, +Optional.empty(), +this::processResponseOrException +); +} + +private void processResponseOrException(final ClientResponse response, +final Throwable exception) { +long responseTimeMs = System.currentTimeMillis(); +if (exception != null) { +handleException(exception, responseTimeMs); +return; +} +handleResponse(response, responseTimeMs); Review Comment: It seems that the exception handling logic here ends up in 2 places. Apart from the initial `handleException`, this `handleResponse` could be as well end up handling exceptions, in a very similar way (catch block on `handleResponse` vs the `handleException`). If we prefer to keep them separated because of the "`retryOnTimeout`" part, which seems to be what makes them different, it would be helpful maybe to add a comment to describe that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327372398 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -141,35 +142,57 @@ private Optional send(final long currentTim final MetadataRequest.Builder request; request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) -.orElseGet(MetadataRequest.Builder::allTopics); - -final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( -request, -Optional.empty(), -(response, exception) -> { -if (exception != null) { -this.future.completeExceptionally(new KafkaException(exception)); -inflightRequests.remove(topic); -return; -} - -try { -Map> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); -future.complete(res); -inflightRequests.remove(topic); -} catch (RetriableException e) { -if (e instanceof TimeoutException) { -inflightRequests.remove(topic); -} -this.onFailedAttempt(currentTimeMs); -} catch (Exception t) { -this.future.completeExceptionally(t); -inflightRequests.remove(topic); -} -}); +.orElseGet(MetadataRequest.Builder::allTopics); + +final NetworkClientDelegate.UnsentRequest unsent = createUnsentRequest(request); return Optional.of(unsent); } +private NetworkClientDelegate.UnsentRequest createUnsentRequest( +final MetadataRequest.Builder request) { +return new NetworkClientDelegate.UnsentRequest( +request, +Optional.empty(), +this::processResponseOrException +); +} + +private void processResponseOrException(final ClientResponse response, +final Throwable exception) { +long responseTimeMs = System.currentTimeMillis(); +if (exception != null) { +handleException(exception, responseTimeMs); +return; +} +handleResponse(response, responseTimeMs); Review Comment: It seems that the exception handling logic here ends up in 2 places. Apart from the initial `handleException`, this `handleResponse` could be as well end up handling exceptions, in a very similar way (catch block on `handleResponse` vs the `handleException`). If we prefer to keep them separated because of the Timeout, which seems to be the case that makes them different, it would be helpful maybe to add a comment to describe that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326449328 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TopicMetadataRequestManagerTest { +private MockTime time; +private TopicMetadataRequestManager topicMetadataRequestManager; + +private Properties props; Review Comment: This is only used in the setup so no need to keep it at the class level -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326441512 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +retryBackoffMs