[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-18 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1329455102


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-18 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1329215296


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -38,8 +38,8 @@
  * Whether there is an existing coordinator.
  * Whether there is an inflight request.
  * Whether the backoff timer has expired.
- * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer

Review Comment:
   just cleaning up long references



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-18 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1329213351


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -298,8 +289,8 @@ private void onFailure(final long currentTimeMs,
 }
 
 private void retry(final long currentTimeMs) {
-onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);
+this.onFailedAttempt(currentTimeMs);

Review Comment:
   Addressed, see line #259 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-17 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1328231607


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -298,8 +289,8 @@ private void onFailure(final long currentTimeMs,
 }
 
 private void retry(final long currentTimeMs) {
-onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);
+this.onFailedAttempt(currentTimeMs);

Review Comment:
   Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1327781850


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1327735512


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;

Review Comment:
   sorry - got into the habit of using `this`.  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1327733352


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+   

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1327515651


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java:
##
@@ -19,22 +19,23 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-final private CompletableFuture future;
+public class CommitApplicationEvent extends CompletableApplicationEvent {
 final private Map offsets;
 
 public CommitApplicationEvent(final Map 
offsets) {
 super(Type.COMMIT);
-this.offsets = offsets;
-Optional exception = isValid(offsets);
-if (exception.isPresent()) {
-throw new RuntimeException(exception.get());
+this.offsets = Collections.unmodifiableMap(offsets);
+
+for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {

Review Comment:
   @junrao - per your request.  Here's the patch to address your comment: 
https://github.com/apache/kafka/pull/14391



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-15 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1327470102


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -141,35 +142,57 @@ private Optional 
send(final long currentTim
 
 final MetadataRequest.Builder request;
 request = topic.map(t -> new 
MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation))
-.orElseGet(MetadataRequest.Builder::allTopics);
-
-final NetworkClientDelegate.UnsentRequest unsent = new 
NetworkClientDelegate.UnsentRequest(
-request,
-Optional.empty(),
-(response, exception) -> {
-if (exception != null) {
-this.future.completeExceptionally(new 
KafkaException(exception));
-inflightRequests.remove(topic);
-return;
-}
-
-try {
-Map> res = 
handleTopicMetadataResponse((MetadataResponse) response.responseBody());
-future.complete(res);
-inflightRequests.remove(topic);
-} catch (RetriableException e) {
-if (e instanceof TimeoutException) {
-inflightRequests.remove(topic);
-}
-this.onFailedAttempt(currentTimeMs);
-} catch (Exception t) {
-this.future.completeExceptionally(t);
-inflightRequests.remove(topic);
-}
-});
+.orElseGet(MetadataRequest.Builder::allTopics);
+
+final NetworkClientDelegate.UnsentRequest unsent = 
createUnsentRequest(request);
 return Optional.of(unsent);
 }
 
+private NetworkClientDelegate.UnsentRequest createUnsentRequest(
+final MetadataRequest.Builder request) {
+return new NetworkClientDelegate.UnsentRequest(
+request,
+Optional.empty(),
+this::processResponseOrException
+);
+}
+
+private void processResponseOrException(final ClientResponse response,
+final Throwable exception) {
+long responseTimeMs = System.currentTimeMillis();
+if (exception != null) {
+handleException(exception, responseTimeMs);
+return;
+}
+handleResponse(response, responseTimeMs);

Review Comment:
   thanks, @lianetm - Do you think it would be clearer to keep them in 1 place? 
The intention was to handle hard failures in handleException and soft failures 
(i.e. returning an error code) in handleResponse. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-14 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1326800659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325364414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+retryBackoffMs,
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325355774


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor

Review Comment:
   As well as listTopics() - this is addressed in a separate PR: Would you be 
ok if we split this into multiple PRs? If you think we should remove the 
documentation due to inaccurate information, I'm willing to do so.
   
   The PR's commit is c71a18c95937dd18171f60afb4fd263ea39e9a1b - I anticipate 
@kirktrue will pick this into trunk soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325353270


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -203,16 +204,17 @@ public static class UnsentRequest {
 private Timer timer;
 
 public UnsentRequest(final AbstractRequest.Builder requestBuilder, 
final Optional node) {
-this(requestBuilder, node, new FutureCompletionHandler());
+Objects.requireNonNull(requestBuilder);
+this.requestBuilder = requestBuilder;
+this.node = node;
+this.handler = new FutureCompletionHandler();
 }
 
 public UnsentRequest(final AbstractRequest.Builder requestBuilder,
  final Optional node,
- final FutureCompletionHandler handler) {
-Objects.requireNonNull(requestBuilder);
-this.requestBuilder = requestBuilder;
-this.node = node;
-this.handler = handler;
+ final BiConsumer 
callback) {

Review Comment:
   I think it is according to my IDE. I think it is used in 
CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 
145)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325352296


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -381,41 +377,41 @@ public String toString() {
 }
 
 /**
- * This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
+ * This is used to stage the unsent {@link OffsetCommitRequest} and 
{@link OffsetFetchRequestState}.
  * unsentOffsetCommits holds the offset commit requests that have not 
been sent out
  * unsentOffsetFetches holds the offset fetch requests that have not 
been sent out
- * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted.
- *
+ * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed.
+ * 
  * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
  */
 
 class PendingRequests {
 // Queue is used to ensure the sequence of commit
-Queue unsentOffsetCommits = new 
LinkedList<>();
+Queue unsentOffsetCommits = new LinkedList<>();
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+boolean hasUnsentRequests() {

Review Comment:
   Sorry - leaving this protected as it is used in one of the test to verify 
there's no more unsent request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349813


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
 }
 
+pendingRequests.inflightOffsetFetches.forEach(System.out::println);

Review Comment:
   臘 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
 }
 
+pendingRequests.inflightOffsetFetches.forEach(System.out::println);

Review Comment:
   臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java:
##
@@ -19,22 +19,23 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-final private CompletableFuture future;
+public class CommitApplicationEvent extends CompletableApplicationEvent {
 final private Map offsets;
 
 public CommitApplicationEvent(final Map 
offsets) {
 super(Type.COMMIT);
-this.offsets = offsets;
-Optional exception = isValid(offsets);
-if (exception.isPresent()) {
-throw new RuntimeException(exception.get());
+this.offsets = Collections.unmodifiableMap(offsets);
+
+for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {

Review Comment:
   There are quite a few events out there that need to be refactored if we do 
it this way.  Would it be ok for you to skip this for now and I will post a 
MINOR patch to address after this is closed?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325345190


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+retryBackoffMs,
+

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325342579


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.

Review Comment:
   thanks - apologize for neglecting to update the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325341358


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets

Review Comment:
   臘  - listTopics() instead.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets

Review Comment:
   臘  - listTopics() instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325175686


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -381,41 +377,41 @@ public String toString() {
 }
 
 /**
- * This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
+ * This is used to stage the unsent {@link OffsetCommitRequest} and 
{@link OffsetFetchRequestState}.
  * unsentOffsetCommits holds the offset commit requests that have not 
been sent out
  * unsentOffsetFetches holds the offset fetch requests that have not 
been sent out
- * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted.
- *
+ * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed.
+ * 
  * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
  */
 
 class PendingRequests {
 // Queue is used to ensure the sequence of commit
-Queue unsentOffsetCommits = new 
LinkedList<>();
+Queue unsentOffsetCommits = new LinkedList<>();
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+boolean hasUnsentRequests() {

Review Comment:
   Sorry - I left out commit.  Fixing those obvious mistakes right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org