This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abcbef6a4c8 KAFKA-20131: ClassicKafkaConsumer does not clear 
endOffsetRequested flag on failed LIST_OFFSETS calls (#21457)
abcbef6a4c8 is described below

commit abcbef6a4c825a17372c4c26aeef0102438c792c
Author: Kirk True <[email protected]>
AuthorDate: Fri Feb 20 13:24:05 2026 -0800

    KAFKA-20131: ClassicKafkaConsumer does not clear endOffsetRequested flag on 
failed LIST_OFFSETS calls (#21457)
    
    Updates the `ClassicKafkaConsumer` to clear out the `SubscriptionState`
    `endOffsetRequested` flag if the `LIST_OFFSETS` call fails.
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>, Lianet Magrans
    <[email protected]>, Andrew Schofield <[email protected]>
---
 .../consumer/internals/ClassicKafkaConsumer.java   |  20 +--
 .../clients/consumer/internals/OffsetFetcher.java  |  46 ++++-
 .../consumer/internals/OffsetFetcherUtils.java     |  41 +++++
 .../consumer/internals/SubscriptionState.java      |  15 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 193 +++++++++++++++++++--
 5 files changed, 274 insertions(+), 41 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index c227a9511b7..5b54a759a98 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -1049,25 +1049,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
-            final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-
-            // if the log end offset is not known and hence cannot return lag 
and there is
-            // no in-flight list offset requested yet,
-            // issue a list offset request for that partition so that next time
-            // we may get the answer; we do not need to wait for the return 
value
-            // since we would not try to poll the network client synchronously
-            if (lag == null) {
-                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
-                        
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
-                    log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
-                    subscriptions.requestPartitionEndOffset(topicPartition);
-                    
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
-                }
-
-                return OptionalLong.empty();
-            }
-
-            return OptionalLong.of(lag);
+            return offsetFetcher.currentLag(topicPartition);
         } finally {
             release();
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f9cca5c1339..446f39686de 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -44,6 +44,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -126,7 +127,7 @@ public class OffsetFetcher {
 
         try {
             Map<TopicPartition, ListOffsetData> fetchedOffsets = 
fetchOffsetsByTimes(timestampsToSearch,
-                    timer, true).fetchedOffsets;
+                    timer, true, false).fetchedOffsets;
 
             return buildOffsetsForTimesResult(timestampsToSearch, 
fetchedOffsets);
         } finally {
@@ -136,7 +137,8 @@ public class OffsetFetcher {
 
     private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> 
timestampsToSearch,
                                                  Timer timer,
-                                                 boolean requireTimestamps) {
+                                                 boolean requireTimestamps,
+                                                 boolean 
updatePartitionEndOffsetsFlag) {
         ListOffsetResult result = new ListOffsetResult();
         if (timestampsToSearch.isEmpty())
             return result;
@@ -153,11 +155,17 @@ public class OffsetFetcher {
                         
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
 
                         
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, 
isolationLevel);
+
+                        if (updatePartitionEndOffsetsFlag)
+                            
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
                     }
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
+                    if (updatePartitionEndOffsetsFlag)
+                        
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
+
                     if (!(e instanceof RetriableException)) {
                         throw future.exception();
                     }
@@ -185,23 +193,49 @@ public class OffsetFetcher {
     }
 
     public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
-        return beginningOrEndOffset(partitions, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, timer);
+        return beginningOrEndOffset(partitions, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, timer, false);
     }
 
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions, Timer timer) {
-        return beginningOrEndOffset(partitions, 
ListOffsetsRequest.LATEST_TIMESTAMP, timer);
+        return beginningOrEndOffset(partitions, 
ListOffsetsRequest.LATEST_TIMESTAMP, timer, false);
+    }
+
+    public OptionalLong currentLag(TopicPartition topicPartition) {
+        final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
+
+        // if the log end offset is not known and hence cannot return lag and 
there is
+        // no in-flight list offset requested yet,
+        // issue a list offset request for that partition so that next time
+        // we may get the answer; we do not need to wait for the return value
+        // since we would not try to poll the network client synchronously
+        if (lag == null) {
+            if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
+                    
offsetFetcherUtils.maybeSetPartitionEndOffsetRequest(topicPartition)) {
+                beginningOrEndOffset(
+                    Set.of(topicPartition),
+                    ListOffsetsRequest.LATEST_TIMESTAMP,
+                    time.timer(0L),
+                    true
+                );
+            }
+
+            return OptionalLong.empty();
+        }
+
+        return OptionalLong.of(lag);
     }
 
     private Map<TopicPartition, Long> 
beginningOrEndOffset(Collection<TopicPartition> partitions,
                                                            long timestamp,
-                                                           Timer timer) {
+                                                           Timer timer,
+                                                           boolean 
updatePartitionEndOffsetsFlag) {
         metadata.addTransientTopics(topicsForPartitions(partitions));
         try {
             Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
                     .distinct()
                     .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 
-            ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, 
timer, false);
+            ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, 
timer, false, updatePartitionEndOffsetsFlag);
 
             return result.fetchedOffsets.entrySet().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().offset));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index a92237d0fc9..f6f324be070 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -277,6 +277,47 @@ class OffsetFetcherUtils {
                     log.trace("Updating high watermark for partition {} to 
{}", partition, offset);
                     subscriptionState.updateHighWatermark(partition, offset);
                 }
+            } else {
+                if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+                    log.warn("Not updating last stable offset for partition {} 
as it is no longer assigned", partition);
+                } else {
+                    log.warn("Not updating high watermark for partition {} as 
it is no longer assigned", partition);
+                }
+            }
+        }
+    }
+
+    /**
+     * The {@code LIST_OFFSETS} lag lookup is serialized, so if there's an 
inflight request it must finish before
+     * another request can be issued. This serialization mechanism is 
controlled by the 'end offset requested'
+     * flag in {@link SubscriptionState}.
+     *
+     * @return {@code true} if the partition's end offset can be requested, 
{@code false} if there's already an
+     *         in-flight request
+     */
+    boolean maybeSetPartitionEndOffsetRequest(TopicPartition partition) {
+        if (subscriptionState.partitionEndOffsetRequested(partition)) {
+            log.info("Not requesting the log end offset for {} to compute lag 
as an outstanding request already exists", partition);
+            return false;
+        } else {
+            log.info("Requesting the log end offset for {} in order to compute 
lag", partition);
+            subscriptionState.requestPartitionEndOffset(partition);
+            return true;
+        }
+    }
+
+    /**
+     * If any of the given partitions are assigned, this will clear the 
partition's 'end offset requested' flag so
+     * that the next attempt to look up the lag will properly issue another 
<code>LIST_OFFSETS</code> request. This
+     * is only intended to be called when <code>LIST_OFFSETS</code> fails. 
Successful <code>LIST_OFFSETS</code> calls
+     * should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
+     *
+     * @param partitions Partitions for which the 'end offset requested' flag 
should be cleared (if still assigned)
+     */
+    void clearPartitionEndOffsetRequests(Collection<TopicPartition> 
partitions) {
+        for (final TopicPartition partition : partitions) {
+            if 
(subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
+                log.trace("Clearing end offset requested for partition {}", 
partition);
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 38ed6c668d9..820b99235e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -667,6 +667,17 @@ public class SubscriptionState {
         return topicPartitionState.endOffsetRequested();
     }
 
+    public synchronized boolean 
maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
+        TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+
+        if (topicPartitionState != null && 
topicPartitionState.endOffsetRequested()) {
+            topicPartitionState.clearEndOffset();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     synchronized Long partitionLead(TopicPartition tp) {
         TopicPartitionState topicPartitionState = assignedState(tp);
         return topicPartitionState.logStartOffset == null ? null : 
topicPartitionState.position.offset - topicPartitionState.logStartOffset;
@@ -1037,6 +1048,10 @@ public class SubscriptionState {
             endOffsetRequested = true;
         }
 
+        public void clearEndOffset() {
+            endOffsetRequested = false;
+        }
+
         private void transitionState(FetchState newState, Runnable 
runIfTransitioned) {
             FetchState nextState = this.fetchState.transitionTo(newState);
             if (nextState.equals(newState)) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a6fa2f0fa79..9d1011601ab 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.Headers;
@@ -177,6 +178,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -233,7 +235,7 @@ public class KafkaConsumerTest {
 
     private final Collection<TopicPartition> singleTopicPartition = Set.of(new 
TopicPartition(topic, 0));
     private final Time time = new MockTime();
-    private final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.EARLIEST);
+    private final SubscriptionState subscription = spy(new 
SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST));
     private final ConsumerPartitionAssignor assignor = new 
RoundRobinAssignor();
 
     private KafkaConsumer<?, ?> consumer;
@@ -2663,21 +2665,7 @@ public class KafkaConsumerTest {
     public void testCurrentLag(GroupProtocol groupProtocol) throws 
InterruptedException {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
-
-        initMetadata(client, Map.of(topic, 1));
-
-        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
-
-        // throws for unassigned partition
-        assertThrows(IllegalStateException.class, () -> 
consumer.currentLag(tp0));
-
-        consumer.assign(Set.of(tp0));
-
-        // poll once to update with the current metadata
-        consumer.poll(Duration.ofMillis(0));
-        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
-                "No metadata requests sent");
-        client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
+        consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
 
         // no error for no current position
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
@@ -2729,6 +2717,155 @@ public class KafkaConsumerTest {
         assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
     }
 
+    // TODO: this test validate that the consumer clears the 
endOffsetRequested flag, but this is not yet implemented
+    //       in the CONSUMER group protocol (see KAFKA-20187).
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    public void testCurrentLagPreventsMultipleInFlightRequests(GroupProtocol 
groupProtocol) throws InterruptedException {
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+        consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+        // Validate the state of the endOffsetRequested flag. It should be 
unset before the call to currentLag(),
+        // then set immediately afterward.
+        assertFalse(subscription.partitionEndOffsetRequested(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background
+            // thread on the next background thread poll.
+            consumer.poll(Duration.ofMillis(0));
+        }
+
+        long count = client.requests().stream()
+            .filter(request -> 
request.requestBuilder().apiKey().equals(ApiKeys.LIST_OFFSETS))
+            .count();
+        assertEquals(
+            1L,
+            count,
+            "Expected only one in-flight LIST_OFFSETS request for 
consumerLag(), but consumer submitted " + count + " requests"
+        );
+    }
+
+    // TODO: this test validate that the consumer clears the 
endOffsetRequested flag, but this is not yet implemented
+    //       in the CONSUMER group protocol (see KAFKA-20187).
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    public void testCurrentLagClearsFlagOnFatalPartitionError(GroupProtocol 
groupProtocol) throws InterruptedException {
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+        consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+        // Validate the state of the endOffsetRequested flag. It should be 
unset before the call to currentLag(),
+        // then set immediately afterward.
+        assertFalse(subscription.partitionEndOffsetRequested(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        verify(subscription).requestPartitionEndOffset(tp0);
+
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background
+            // thread on the next background thread poll.
+            consumer.poll(Duration.ofMillis(0));
+        }
+
+        TestUtils.waitForCondition(
+            () -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
+            "No LIST_OFFSETS request sent within allotted timeout"
+        );
+
+        clearInvocations(subscription);
+
+        // Validate the state of the endOffsetRequested flag. It should still 
be set before the call to
+        // currentLag(), because the previous LIST_OFFSETS call has not 
received a response. In this case,
+        // the SubscriptionState.requestPartitionEndOffset() method should 
*not* have been invoked.
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        verify(subscription, never()).requestPartitionEndOffset(tp0);
+
+        // Now respond to the LIST_OFFSETS request with an error in the 
partition.
+        ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+        client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(Map.of(), Map.of(tp0, Errors.TOPIC_AUTHORIZATION_FAILED)));
+
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background
+            // thread on the next background thread poll.
+            assertThrows(TopicAuthorizationException.class, () -> 
consumer.poll(Duration.ofMillis(0)));
+        }
+
+        // AsyncKafkaConsumer may take a moment to poll and process the 
LIST_OFFSETS response, so a repeated
+        // wait is appropriate here.
+        TestUtils.waitForCondition(
+            () -> !subscription.partitionEndOffsetRequested(tp0),
+            "endOffsetRequested flag was not cleared within allotted timeout"
+        );
+    }
+
+    // TODO: this test validate that the consumer clears the 
endOffsetRequested flag, but this is not yet implemented
+    //       in the CONSUMER group protocol (see KAFKA-20187).
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    public void 
testCurrentLagClearsFlagOnRetriablePartitionError(GroupProtocol groupProtocol) 
throws InterruptedException {
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+        consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+        // Validate the state of the endOffsetRequested flag. It should be 
unset before the call to currentLag(),
+        // then set immediately afterward.
+        assertFalse(subscription.partitionEndOffsetRequested(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        verify(subscription).requestPartitionEndOffset(tp0);
+
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background
+            // thread on the next background thread poll.
+            consumer.poll(Duration.ofMillis(0));
+        }
+
+        TestUtils.waitForCondition(
+            () -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
+            "No LIST_OFFSETS request sent within allotted timeout"
+        );
+
+        clearInvocations(subscription);
+
+        // Validate the state of the endOffsetRequested flag. It should still 
be set before the call to
+        // currentLag(), because the previous LIST_OFFSETS call has not 
received a response. In this case,
+        // the SubscriptionState.requestPartitionEndOffset() method should 
*not* have been invoked.
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+        assertTrue(subscription.partitionEndOffsetRequested(tp0));
+        verify(subscription, never()).requestPartitionEndOffset(tp0);
+
+        // Now respond to the LIST_OFFSETS request with an error in the 
partition.
+        ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+        client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(Map.of(), Map.of(tp0, Errors.OFFSET_NOT_AVAILABLE)));
+
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background
+            // thread on the next background thread poll.
+            consumer.poll(Duration.ofMillis(0));
+        }
+
+        // AsyncKafkaConsumer may take a moment to poll and process the 
LIST_OFFSETS response, so a repeated
+        // wait is appropriate here.
+        TestUtils.waitForCondition(
+            () -> !subscription.partitionEndOffsetRequested(tp0),
+            "endOffsetRequested flag was not cleared within allotted timeout"
+        );
+    }
+
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void testListOffsetShouldUpdateSubscriptions(GroupProtocol 
groupProtocol) {
@@ -2748,6 +2885,30 @@ public class KafkaConsumerTest {
         assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
     }
 
+    private KafkaConsumer<String, String> 
setUpConsumerForCurrentLag(GroupProtocol groupProtocol,
+                                                                     
MockClient client,
+                                                                     
ConsumerMetadata metadata) throws InterruptedException {
+        initMetadata(client, Map.of(topic, 1));
+
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata, assignor, false,
+            groupId, groupInstanceId, false);
+
+        // throws for unassigned partition
+        assertThrows(IllegalStateException.class, () -> 
consumer.currentLag(tp0));
+
+        consumer.assign(Set.of(tp0));
+
+        // poll once to update with the current metadata
+        consumer.poll(Duration.ofMillis(0));
+        TestUtils.waitForCondition(
+            () -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
+            "No FIND_COORDINATOR request sent within allotted timeout"
+        );
+        client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
+
+        return consumer;
+    }
+
     private ClientRequest findRequest(MockClient client, ApiKeys apiKey) {
         Optional<ClientRequest> request = client.requests().stream().filter(r 
-> r.requestBuilder().apiKey().equals(apiKey)).findFirst();
         assertTrue(request.isPresent(), "No " + apiKey + " request was 
submitted to the client");

Reply via email to