kirktrue commented on code in PR #14308:
URL: https://github.com/apache/kafka/pull/14308#discussion_r1310978474


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -129,6 +132,14 @@ public boolean add(final ApplicationEvent event) {
         return applicationEventQueue.add(event);
     }
 
+    @Override

Review Comment:
   Was the `addAndGet` method addition here taken from a different commit?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);
+        this.isolationLevel = isolationLevel;
+        this.log = logContext.logger(getClass());
+        this.requestsToRetry = new HashSet<>();
+        this.requestsToSend = new ArrayList<>();
+        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
+                time, retryBackoffMs, apiVersions);
+    }
+
+    /**
+     * Determine if a there are pending fetch offsets requests to be sent and 
build a
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
+     * containing it.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (requestsToSend.isEmpty()) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        NetworkClientDelegate.PollResult pollResult =
+                new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>(requestsToSend));
+        this.requestsToSend.clear();
+
+        return pollResult;
+    }
+
+    /**
+     * Retrieve offsets for the given partitions and timestamp.
+     *
+     * @param timestampsToSearch Partitions and target timestamps to get 
offsets for
+     * @param requireTimestamps  True if this should fail with an 
UnsupportedVersionException if the
+     *                           broker does not support fetching precise 
timestamps for offsets
+     * @return Future containing the map of {@link TopicPartition} and {@link 
OffsetAndTimestamp}
+     * found (offset of the first message whose timestamp is greater than or 
equals to the target
+     * timestamp).The future will complete when the requests responses are 
received and
+     * processed, following a call to {@link #poll(long)}
+     */
+    public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsets(
+            final Map<TopicPartition, Long> timestampsToSearch,
+            final boolean requireTimestamps) {
+        if (timestampsToSearch.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+        
metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));

Review Comment:
   Truth be told: I still don't know what `addTransientTopics` does 🤷‍♂️ 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);

Review Comment:
   I believe it's a non-issue because the callback can only be invoked by our 
network thread which hasn't started executing yet. Perhaps a comment will 
suffice here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);
+        this.isolationLevel = isolationLevel;
+        this.log = logContext.logger(getClass());
+        this.requestsToRetry = new HashSet<>();
+        this.requestsToSend = new ArrayList<>();
+        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
+                time, retryBackoffMs, apiVersions);
+    }
+
+    /**
+     * Determine if a there are pending fetch offsets requests to be sent and 
build a
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
+     * containing it.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (requestsToSend.isEmpty()) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        NetworkClientDelegate.PollResult pollResult =
+                new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>(requestsToSend));
+        this.requestsToSend.clear();
+
+        return pollResult;
+    }
+
+    /**
+     * Retrieve offsets for the given partitions and timestamp.
+     *
+     * @param timestampsToSearch Partitions and target timestamps to get 
offsets for
+     * @param requireTimestamps  True if this should fail with an 
UnsupportedVersionException if the
+     *                           broker does not support fetching precise 
timestamps for offsets
+     * @return Future containing the map of {@link TopicPartition} and {@link 
OffsetAndTimestamp}
+     * found (offset of the first message whose timestamp is greater than or 
equals to the target

Review Comment:
   Nitpicky/style: the comment for the `@return` annotation is great. Perhaps 
move it up to the main method-level documentation so it's not tucked away in 
the `@return` documentation?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);

Review Comment:
   It isn't good practice to pass a reference to `this` to another object in 
its constructor. It's possible that the `ClusterResourceListener` callback 
could get invoked before the object has finished initialization.
   
   I don't have any good suggestions just yet, but I'll noodle on it.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);
+        this.isolationLevel = isolationLevel;
+        this.log = logContext.logger(getClass());
+        this.requestsToRetry = new HashSet<>();
+        this.requestsToSend = new ArrayList<>();
+        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
+                time, retryBackoffMs, apiVersions);
+    }
+
+    /**
+     * Determine if a there are pending fetch offsets requests to be sent and 
build a
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
+     * containing it.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (requestsToSend.isEmpty()) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());

Review Comment:
   I really want an idiomatic way for the `poll()` method of returning a 
non-op. Maybe we should change the signature of `poll()` to return an 
`Optional`?
   
   Anyway, that's a change we'd need to make elsewhere, so no need to deal with 
that here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
+import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manager responsible for building the following requests to retrieve 
partition offsets, and
+ * processing its responses.
+ * <ul>
+ *      <li>ListOffset request</li>
+ *      <li>OffsetForLeaderEpoch request</li>
+ * </ul>
+ * Requests are kept in-memory ready to be sent on the next call to {@link 
#poll(long)}.
+ * <br>
+ * Partition leadership information required to build ListOffset requests is 
retrieved from the
+ * {@link ConsumerMetadata}, so this implements {@link 
ClusterResourceListener} to get notified
+ * when the cluster metadata is updated.
+ */
+public class OffsetsRequestManager implements RequestManager, 
ClusterResourceListener {
+
+    private final ConsumerMetadata metadata;
+    private final IsolationLevel isolationLevel;
+    private final Logger log;
+    private final OffsetFetcherUtils offsetFetcherUtils;
+
+    private final Set<ListOffsetsRequestState> requestsToRetry;
+    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
+
+    public OffsetsRequestManager(final SubscriptionState subscriptionState,
+                                 final ConsumerMetadata metadata,
+                                 final IsolationLevel isolationLevel,
+                                 final Time time,
+                                 final long retryBackoffMs,
+                                 final ApiVersions apiVersions,
+                                 final LogContext logContext) {
+        requireNonNull(subscriptionState);
+        requireNonNull(metadata);
+        requireNonNull(isolationLevel);
+        requireNonNull(time);
+        requireNonNull(apiVersions);
+        requireNonNull(logContext);
+
+        this.metadata = metadata;
+        this.metadata.addClusterUpdateListener(this);
+        this.isolationLevel = isolationLevel;
+        this.log = logContext.logger(getClass());
+        this.requestsToRetry = new HashSet<>();
+        this.requestsToSend = new ArrayList<>();
+        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
+                time, retryBackoffMs, apiVersions);
+    }
+
+    /**
+     * Determine if a there are pending fetch offsets requests to be sent and 
build a
+     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
+     * containing it.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (requestsToSend.isEmpty()) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        NetworkClientDelegate.PollResult pollResult =
+                new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>(requestsToSend));
+        this.requestsToSend.clear();
+
+        return pollResult;
+    }
+
+    /**
+     * Retrieve offsets for the given partitions and timestamp.
+     *
+     * @param timestampsToSearch Partitions and target timestamps to get 
offsets for
+     * @param requireTimestamps  True if this should fail with an 
UnsupportedVersionException if the
+     *                           broker does not support fetching precise 
timestamps for offsets
+     * @return Future containing the map of {@link TopicPartition} and {@link 
OffsetAndTimestamp}
+     * found (offset of the first message whose timestamp is greater than or 
equals to the target
+     * timestamp).The future will complete when the requests responses are 
received and
+     * processed, following a call to {@link #poll(long)}
+     */
+    public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsets(
+            final Map<TopicPartition, Long> timestampsToSearch,
+            final boolean requireTimestamps) {
+        if (timestampsToSearch.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+        
metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));

Review Comment:
   Truth be told: I still don't know what `addTransientTopics` does 🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to