[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1329455102 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1329215296 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -38,8 +38,8 @@ * Whether there is an existing coordinator. * Whether there is an inflight request. * Whether the backoff timer has expired. - * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer Review Comment: just cleaning up long references -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1329213351 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -298,8 +289,8 @@ private void onFailure(final long currentTimeMs, } private void retry(final long currentTimeMs) { -onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); +this.onFailedAttempt(currentTimeMs); Review Comment: Addressed, see line #259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1328231607 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -298,8 +289,8 @@ private void onFailure(final long currentTimeMs, } private void retry(final long currentTimeMs) { -onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); +this.onFailedAttempt(currentTimeMs); Review Comment: Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327781850 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327735512 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; Review Comment: sorry - got into the habit of using `this`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException;
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327733352 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327515651 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ## @@ -19,22 +19,23 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class CommitApplicationEvent extends ApplicationEvent { -final private CompletableFuture future; +public class CommitApplicationEvent extends CompletableApplicationEvent { final private Map offsets; public CommitApplicationEvent(final Map offsets) { super(Type.COMMIT); -this.offsets = offsets; -Optional exception = isValid(offsets); -if (exception.isPresent()) { -throw new RuntimeException(exception.get()); +this.offsets = Collections.unmodifiableMap(offsets); + +for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { Review Comment: @junrao - per your request. Here's the patch to address your comment: https://github.com/apache/kafka/pull/14391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1327470102 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -141,35 +142,57 @@ private Optional send(final long currentTim final MetadataRequest.Builder request; request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) -.orElseGet(MetadataRequest.Builder::allTopics); - -final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( -request, -Optional.empty(), -(response, exception) -> { -if (exception != null) { -this.future.completeExceptionally(new KafkaException(exception)); -inflightRequests.remove(topic); -return; -} - -try { -Map> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); -future.complete(res); -inflightRequests.remove(topic); -} catch (RetriableException e) { -if (e instanceof TimeoutException) { -inflightRequests.remove(topic); -} -this.onFailedAttempt(currentTimeMs); -} catch (Exception t) { -this.future.completeExceptionally(t); -inflightRequests.remove(topic); -} -}); +.orElseGet(MetadataRequest.Builder::allTopics); + +final NetworkClientDelegate.UnsentRequest unsent = createUnsentRequest(request); return Optional.of(unsent); } +private NetworkClientDelegate.UnsentRequest createUnsentRequest( +final MetadataRequest.Builder request) { +return new NetworkClientDelegate.UnsentRequest( +request, +Optional.empty(), +this::processResponseOrException +); +} + +private void processResponseOrException(final ClientResponse response, +final Throwable exception) { +long responseTimeMs = System.currentTimeMillis(); +if (exception != null) { +handleException(exception, responseTimeMs); +return; +} +handleResponse(response, responseTimeMs); Review Comment: thanks, @lianetm - Do you think it would be clearer to keep them in 1 place? The intention was to handle hard failures in handleException and soft failures (i.e. returning an error code) in handleResponse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325364414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +retryBackoffMs, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325355774 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor Review Comment: As well as listTopics() - this is addressed in a separate PR: Would you be ok if we split this into multiple PRs? If you think we should remove the documentation due to inaccurate information, I'm willing to do so. The PR's commit is c71a18c95937dd18171f60afb4fd263ea39e9a1b - I anticipate @kirktrue will pick this into trunk soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325353270 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -203,16 +204,17 @@ public static class UnsentRequest { private Timer timer; public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node) { -this(requestBuilder, node, new FutureCompletionHandler()); +Objects.requireNonNull(requestBuilder); +this.requestBuilder = requestBuilder; +this.node = node; +this.handler = new FutureCompletionHandler(); } public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node, - final FutureCompletionHandler handler) { -Objects.requireNonNull(requestBuilder); -this.requestBuilder = requestBuilder; -this.node = node; -this.handler = handler; + final BiConsumer callback) { Review Comment: I think it is according to my IDE. I think it is used in CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 145) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325352296 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -381,41 +377,41 @@ public String toString() { } /** - * This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * unsentOffsetCommits holds the offset commit requests that have not been sent out * unsentOffsetFetches holds the offset fetch requests that have not been sent out - * inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + * inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + * * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit -Queue unsentOffsetCommits = new LinkedList<>(); +Queue unsentOffsetCommits = new LinkedList<>(); List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +boolean hasUnsentRequests() { Review Comment: Sorry - leaving this protected as it is used in one of the test to verify there's no more unsent request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349813 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } +pendingRequests.inflightOffsetFetches.forEach(System.out::println); Review Comment: 臘 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } +pendingRequests.inflightOffsetFetches.forEach(System.out::println); Review Comment: 臘 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349677 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ## @@ -19,22 +19,23 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class CommitApplicationEvent extends ApplicationEvent { -final private CompletableFuture future; +public class CommitApplicationEvent extends CompletableApplicationEvent { final private Map offsets; public CommitApplicationEvent(final Map offsets) { super(Type.COMMIT); -this.offsets = offsets; -Optional exception = isValid(offsets); -if (exception.isPresent()) { -throw new RuntimeException(exception.get()); +this.offsets = Collections.unmodifiableMap(offsets); + +for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { Review Comment: There are quite a few events out there that need to be refactored if we do it this way. Would it be ok for you to skip this for now and I will post a MINOR patch to address after this is closed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325345190 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +retryBackoffMs, +
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325342579 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. Review Comment: thanks - apologize for neglecting to update the doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325341358 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets Review Comment: 臘 - listTopics() instead. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets Review Comment: 臘 - listTopics() instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325175686 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -381,41 +377,41 @@ public String toString() { } /** - * This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * unsentOffsetCommits holds the offset commit requests that have not been sent out * unsentOffsetFetches holds the offset fetch requests that have not been sent out - * inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + * inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + * * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit -Queue unsentOffsetCommits = new LinkedList<>(); +Queue unsentOffsetCommits = new LinkedList<>(); List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +boolean hasUnsentRequests() { Review Comment: Sorry - I left out commit. Fixing those obvious mistakes right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org