junrao commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1326612426


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * <p>
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * </p>
+ * <ul>
+ * <li>listOffsets</li>
+ * <li>partitionsFor</li>

Review Comment:
   Yes, this is fine.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TopicMetadataRequestManagerTest {
+    private MockTime time;
+    private TopicMetadataRequestManager topicMetadataRequestManager;
+
+    private Properties props;
+
+    @BeforeEach
+    public void setup() {
+        this.time = new MockTime();
+        this.props = new Properties();
+        this.props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
+        this.props.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+        this.props.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        this.topicMetadataRequestManager = new TopicMetadataRequestManager(
+            new LogContext(),
+            new ConsumerConfig(props));
+    }
+
+    @ParameterizedTest
+    @MethodSource("topicsProvider")
+    public void testPoll_SuccessfulRequestTopicMetadata(Optional<String> 
topic) {
+        this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+    }
+
+    @ParameterizedTest
+    @MethodSource("exceptionProvider")
+    public void testExceptionAndInflightRequests(final Errors error, final 
boolean shouldRetry) {
+        String topic = "hello";
+        
this.topicMetadataRequestManager.requestTopicMetadata(Optional.of("hello"));
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
+            res.unsentRequests.get(0),
+            Optional.of(topic),
+            error));
+        List<TopicMetadataRequestManager.TopicMetadataRequestState> inflights 
= this.topicMetadataRequestManager.inflightRequests();
+
+        if (shouldRetry) {
+            assertEquals(1, inflights.size());
+            assertEquals(topic, inflights.get(0).topic().orElse(null));
+        } else {
+            assertEquals(0, inflights.size());
+        }
+    }
+
+    @Test
+    public void testEnsureRequestRemovedFromInflightsOnErrorResponse() {
+        
this.topicMetadataRequestManager.requestTopicMetadata(Optional.of("hello"));
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        res.unsentRequests.get(0).future().completeExceptionally(new 
KafkaException("some error"));
+
+        List<TopicMetadataRequestManager.TopicMetadataRequestState> inflights 
= this.topicMetadataRequestManager.inflightRequests();
+        assertTrue(inflights.isEmpty());
+    }
+
+    @ParameterizedTest
+    @MethodSource("topicsProvider")
+    public void testSendingTheSameRequest(Optional<String> topic) {
+        CompletableFuture<Map<String, List<PartitionInfo>>> future = 
this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        CompletableFuture<Map<String, List<PartitionInfo>>> future2 =
+            this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+
+        
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
+            res.unsentRequests.get(0),
+            topic,
+            Errors.NONE));
+
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        try {
+            future.get();
+        } catch (Throwable e) {
+            System.out.println(e.getMessage());

Review Comment:
   In general, we should use logging instead of print.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##########
@@ -287,14 +298,13 @@ public void testAssignmentChangeEvent() {
         
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
 
         backgroundThread.runOnce();
         
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
         verify(networkClient, times(1)).poll(anyLong(), anyLong());
         verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs);
         verify(commitManager, times(1)).maybeAutoCommit(offset);
-
-        backgroundThread.close();

Review Comment:
   It seems that we close backgroundThread in other cases?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * <p>
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * </p>
+ * <ul>
+ * <li>listTopics</li>
+ * <li>partitionsFor</li>
+ * </ul>
+ * <p>
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * </p>
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+    private final boolean allowAutoTopicCreation;
+    private final Map<Optional<String>, TopicMetadataRequestState> 
inflightRequests;
+    private final long retryBackoffMs;
+    private final long retryBackoffMaxMs;
+    private final Logger log;
+    private final LogContext logContext;
+
+    public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+        this.logContext = logContext;
+        this.log = logContext.logger(this.getClass());
+        this.inflightRequests = new HashMap<>();
+        this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+        this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        List<NetworkClientDelegate.UnsentRequest> requests = 
inflightRequests.values().stream()
+            .map(req -> req.send(currentTimeMs))
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .collect(Collectors.toList());
+        return requests.isEmpty() ?
+            new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+            new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+    }
+
+    /**
+     * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+     * inflight.
+     *
+     * @param topic to be requested. If empty, return the metadata for all 
topics.
+     * @return the future of the metadata request.
+     */
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final Optional<String> topic) {
+        if (inflightRequests.containsKey(topic)) {
+            return inflightRequests.get(topic).future;
+        }
+
+        TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+                logContext,
+                topic,
+                retryBackoffMs,
+                retryBackoffMaxMs);
+        inflightRequests.put(topic, newRequest);
+        return newRequest.future;
+    }
+
+    // Visible for testing
+    List<TopicMetadataRequestState> inflightRequests() {
+        return new ArrayList<>(inflightRequests.values());
+    }
+
+    class TopicMetadataRequestState extends RequestState {
+        private final Optional<String> topic;
+        CompletableFuture<Map<String, List<PartitionInfo>>> future;
+
+        public TopicMetadataRequestState(final LogContext logContext,
+                                    final Optional<String> topic,
+                                    final long retryBackoffMs,
+                                    final long retryBackoffMaxMs) {
+            super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
+                retryBackoffMaxMs);
+            this.future = new CompletableFuture<>();
+            this.topic = topic;
+        }
+
+        /**
+         * prepare the metadata request and return an
+         * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
 if needed.
+         */
+        private Optional<NetworkClientDelegate.UnsentRequest> send(final long 
currentTimeMs) {
+            if (!this.canSendRequest(currentTimeMs)) {
+                return Optional.empty();
+            }
+            this.onSendAttempt(currentTimeMs);
+
+            final MetadataRequest.Builder request;
+            request = topic.map(t -> new 
MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation))
+                .orElseGet(MetadataRequest.Builder::allTopics);
+
+            final NetworkClientDelegate.UnsentRequest unsent = new 
NetworkClientDelegate.UnsentRequest(
+                request,
+                Optional.empty(),
+                (response, exception) -> {
+                    if (exception != null) {
+                        this.future.completeExceptionally(new 
KafkaException(exception));
+                        inflightRequests.remove(topic);
+                        return;
+                    }
+
+                    try {
+                        Map<String, List<PartitionInfo>> res = 
handleTopicMetadataResponse((MetadataResponse) response.responseBody());
+                        future.complete(res);
+                        inflightRequests.remove(topic);
+                    } catch (RetriableException e) {
+                        if (e instanceof TimeoutException) {

Review Comment:
   This captures the case when a single request takes long on the broker and 
times out. However, a separate case is that individual request returns on time 
but with other RetriableException. In that case, TopicMetadataRequestManager 
will not clean up inflightRequests even when the client request has timed out.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java:
##########
@@ -19,22 +19,23 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-    final private CompletableFuture<Void> future;
+public class CommitApplicationEvent extends CompletableApplicationEvent<Void> {
     final private Map<TopicPartition, OffsetAndMetadata> offsets;
 
     public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
         super(Type.COMMIT);
-        this.offsets = offsets;
-        Optional<Exception> exception = isValid(offsets);
-        if (exception.isPresent()) {
-            throw new RuntimeException(exception.get());
+        this.offsets = Collections.unmodifiableMap(offsets);
+
+        for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {

Review Comment:
   Yes, it's fine to clean this up separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to