junrao commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325040397
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -91,7 +89,7 @@ public CommitRequestManager( } /** - * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will + * Poll for the {@link OffsetFetchRequest} and {@link org.apache.kafka.common.requests.OffsetCommitRequest} request if there's any. The function will Review Comment: Do we need to spell out the package name for OffsetCommitRequest? It doesn't seem this is done consistently. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listOffsets</li> + * <li>partitionsFor</li> Review Comment: It doesn't seem that we have wired `TopicMetadataRequestManager` to the `partitionsFor` call? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -381,41 +377,41 @@ public String toString() { } /** - * <p>This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * <p>This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</> * <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li> - * <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted</>. - * + * <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</>. + * <p> * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit - Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList<>(); + Queue<OffsetCommitRequest> unsentOffsetCommits = new LinkedList<>(); List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>(); List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>(); - public boolean hasUnsentRequests() { + boolean hasUnsentRequests() { Review Comment: Could this be private? Ditto below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.Map; + +public class TopicMetadataApplicationEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> { + private final String topic; + public TopicMetadataApplicationEvent(final String topic) { + super(Type.TOPIC_METADATA); + this.topic = topic; + } + + public String topic() { + return topic; + } Review Comment: Do we need to add `toString`, `equals` and `hashcode`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listOffsets</li> Review Comment: Is this true? `OffsetsRequestManager` says it handles listOffsets. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -220,7 +214,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { requestTopicDataMap.put(topicPartition.topic(), topic); } - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( + org.apache.kafka.common.requests.OffsetCommitRequest.Builder builder = new org.apache.kafka.common.requests.OffsetCommitRequest.Builder( Review Comment: Could we import the package? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listOffsets</li> + * <li>partitionsFor</li> + * </ul> + * <p> + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. Review Comment: memoized => memorized null => Empty? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to Review Comment: Do we need to spell out the package name? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listOffsets</li> + * <li>partitionsFor</li> + * </ul> + * <p> + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * </p> + */ + +public class TopicMetadataRequestManager implements RequestManager { + private final boolean allowAutoTopicCreation; + private final Map<Optional<String>, TopicMetadataRequestState> inflightRequests; + private final long retryBackoffMs; + private final long retryBackoffMaxMs; + private final Logger log; + private final LogContext logContext; + + public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { + this.logContext = logContext; + this.log = logContext.logger(this.getClass()); + this.inflightRequests = new HashMap<>(); + this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); + } + + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + List<NetworkClientDelegate.UnsentRequest> requests = inflightRequests.values().stream() + .map(req -> req.send(currentTimeMs)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + return requests.isEmpty() ? + new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : + new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); + } + + /** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ + public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final Optional<String> topic) { + if (inflightRequests.containsKey(topic)) { + return inflightRequests.get(topic).future; + } + + TopicMetadataRequestState newRequest = new TopicMetadataRequestState( + logContext, + topic, + retryBackoffMs, + retryBackoffMaxMs); + inflightRequests.put(topic, newRequest); + return newRequest.future; + } + + // Visible for testing + List<TopicMetadataRequestState> inflightRequests() { + return new ArrayList<>(inflightRequests.values()); + } + + class TopicMetadataRequestState extends RequestState { + private final Optional<String> topic; + CompletableFuture<Map<String, List<PartitionInfo>>> future; + + public TopicMetadataRequestState(final LogContext logContext, + final Optional<String> topic, + final long retryBackoffMs, + final long retryBackoffMaxMs) { + super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, + retryBackoffMaxMs); + this.future = new CompletableFuture<>(); + this.topic = topic; + } + + /** + * prepare the metadata request and return an + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. + */ + private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { + if (!this.canSendRequest(currentTimeMs)) { + return Optional.empty(); + } + this.onSendAttempt(currentTimeMs); + + final MetadataRequest.Builder request; + request = topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) + .orElseGet(MetadataRequest.Builder::allTopics); + + final NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( + request, + Optional.empty(), + (response, exception) -> { + if (exception != null) { + this.future.completeExceptionally(new KafkaException(exception)); + inflightRequests.remove(topic); + return; + } + + try { + Map<String, List<PartitionInfo>> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); + future.complete(res); + inflightRequests.remove(topic); + } catch (RetriableException e) { + this.onFailedAttempt(currentTimeMs); Review Comment: This may be fine. But if a pending request is in a retry loop, but is already timed out, do we need logic to remove it from inflightRequests preemptively? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ########## @@ -19,22 +19,23 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class CommitApplicationEvent extends ApplicationEvent { - final private CompletableFuture<Void> future; +public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { final private Map<TopicPartition, OffsetAndMetadata> offsets; public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) { super(Type.COMMIT); - this.offsets = offsets; - Optional<Exception> exception = isValid(offsets); Review Comment: `isValid` is no longer used. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * <p> + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * </p> + * <ul> + * <li>listOffsets</li> + * <li>partitionsFor</li> + * </ul> + * <p> + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * </p> + */ + +public class TopicMetadataRequestManager implements RequestManager { + private final boolean allowAutoTopicCreation; + private final Map<Optional<String>, TopicMetadataRequestState> inflightRequests; + private final long retryBackoffMs; + private final long retryBackoffMaxMs; + private final Logger log; + private final LogContext logContext; + + public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { + this.logContext = logContext; + this.log = logContext.logger(this.getClass()); + this.inflightRequests = new HashMap<>(); + this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); + } + + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + List<NetworkClientDelegate.UnsentRequest> requests = inflightRequests.values().stream() + .map(req -> req.send(currentTimeMs)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + return requests.isEmpty() ? + new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : + new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); + } + + /** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ + public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final Optional<String> topic) { + if (inflightRequests.containsKey(topic)) { + return inflightRequests.get(topic).future; + } + + TopicMetadataRequestState newRequest = new TopicMetadataRequestState( + logContext, + topic, + retryBackoffMs, + retryBackoffMaxMs); + inflightRequests.put(topic, newRequest); + return newRequest.future; + } + + // Visible for testing + List<TopicMetadataRequestState> inflightRequests() { + return new ArrayList<>(inflightRequests.values()); + } + + class TopicMetadataRequestState extends RequestState { + private final Optional<String> topic; + CompletableFuture<Map<String, List<PartitionInfo>>> future; + + public TopicMetadataRequestState(final LogContext logContext, + final Optional<String> topic, + final long retryBackoffMs, + final long retryBackoffMaxMs) { + super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, + retryBackoffMaxMs); + this.future = new CompletableFuture<>(); + this.topic = topic; + } + + /** + * prepare the metadata request and return an + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. + */ + private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { + if (!this.canSendRequest(currentTimeMs)) { Review Comment: RequestState only allows one outstanding request at a time. Does the old consumer do the same? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -203,16 +204,17 @@ public static class UnsentRequest { private Timer timer; public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node) { - this(requestBuilder, node, new FutureCompletionHandler()); + Objects.requireNonNull(requestBuilder); + this.requestBuilder = requestBuilder; + this.node = node; + this.handler = new FutureCompletionHandler(); } public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node, - final FutureCompletionHandler handler) { - Objects.requireNonNull(requestBuilder); - this.requestBuilder = requestBuilder; - this.node = node; - this.handler = handler; + final BiConsumer<ClientResponse, Throwable> callback) { Review Comment: This constructor seems unused? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } + pendingRequests.inflightOffsetFetches.forEach(System.out::println); Review Comment: Is this intended? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ########## @@ -19,22 +19,23 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class CommitApplicationEvent extends ApplicationEvent { - final private CompletableFuture<Void> future; +public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { final private Map<TopicPartition, OffsetAndMetadata> offsets; public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) { super(Type.COMMIT); - this.offsets = offsets; - Optional<Exception> exception = isValid(offsets); - if (exception.isPresent()) { - throw new RuntimeException(exception.get()); + this.offsets = Collections.unmodifiableMap(offsets); + + for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { Review Comment: This is an existing issue. Could we change `toString` to include the fields in the parent class? Do we need to add `equals` and `hashcode` for consistency? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -230,15 +224,20 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { return new NetworkClientDelegate.UnsentRequest( builder, coordinatorRequestManager.coordinator(), - future); + (response, throwable) -> { + if (throwable == null) { + this.future.complete(null); + } else { + this.future.completeExceptionally(throwable); + } + }); } } private class OffsetFetchRequestState extends RequestState { public final Set<TopicPartition> requestedPartitions; public final GroupState.Generation requestedGeneration; - public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; - + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; Review Comment: Could this be private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org