This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new de4185d128b KAFKA-20131: ClassicKafkaConsumer does not clear
endOffsetRequested flag on failed LIST_OFFSETS calls (#21596)
de4185d128b is described below
commit de4185d128bbcbdb41ae44566109c3bf24f95f29
Author: Kirk True <[email protected]>
AuthorDate: Fri Feb 27 06:36:13 2026 -0800
KAFKA-20131: ClassicKafkaConsumer does not clear endOffsetRequested flag on
failed LIST_OFFSETS calls (#21596)
Cherry-pick of #21457 to 4.2.
Updates the `ClassicKafkaConsumer` to clear out the `SubscriptionState`
`endOffsetRequested` flag if the `LIST_OFFSETS` call fails.
Reviewers: Andrew Schofield <[email protected]>
---
.../consumer/internals/ClassicKafkaConsumer.java | 20 +--
.../clients/consumer/internals/OffsetFetcher.java | 46 ++++-
.../consumer/internals/OffsetFetcherUtils.java | 41 +++++
.../consumer/internals/SubscriptionState.java | 15 ++
.../kafka/clients/consumer/KafkaConsumerTest.java | 193 +++++++++++++++++++--
5 files changed, 274 insertions(+), 41 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index c227a9511b7..5b54a759a98 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -1049,25 +1049,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
- final Long lag = subscriptions.partitionLag(topicPartition,
isolationLevel);
-
- // if the log end offset is not known and hence cannot return lag
and there is
- // no in-flight list offset requested yet,
- // issue a list offset request for that partition so that next time
- // we may get the answer; we do not need to wait for the return
value
- // since we would not try to poll the network client synchronously
- if (lag == null) {
- if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
-
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
- log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
- subscriptions.requestPartitionEndOffset(topicPartition);
-
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
- }
-
- return OptionalLong.empty();
- }
-
- return OptionalLong.of(lag);
+ return offsetFetcher.currentLag(topicPartition);
} finally {
release();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f9cca5c1339..446f39686de 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -126,7 +127,7 @@ public class OffsetFetcher {
try {
Map<TopicPartition, ListOffsetData> fetchedOffsets =
fetchOffsetsByTimes(timestampsToSearch,
- timer, true).fetchedOffsets;
+ timer, true, false).fetchedOffsets;
return buildOffsetsForTimesResult(timestampsToSearch,
fetchedOffsets);
} finally {
@@ -136,7 +137,8 @@ public class OffsetFetcher {
private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long>
timestampsToSearch,
Timer timer,
- boolean requireTimestamps) {
+ boolean requireTimestamps,
+ boolean
updatePartitionEndOffsetsFlag) {
ListOffsetResult result = new ListOffsetResult();
if (timestampsToSearch.isEmpty())
return result;
@@ -153,11 +155,17 @@ public class OffsetFetcher {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets,
isolationLevel);
+
+ if (updatePartitionEndOffsetsFlag)
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
}
}
@Override
public void onFailure(RuntimeException e) {
+ if (updatePartitionEndOffsetsFlag)
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
+
if (!(e instanceof RetriableException)) {
throw future.exception();
}
@@ -185,23 +193,49 @@ public class OffsetFetcher {
}
public Map<TopicPartition, Long>
beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
- return beginningOrEndOffset(partitions,
ListOffsetsRequest.EARLIEST_TIMESTAMP, timer);
+ return beginningOrEndOffset(partitions,
ListOffsetsRequest.EARLIEST_TIMESTAMP, timer, false);
}
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions, Timer timer) {
- return beginningOrEndOffset(partitions,
ListOffsetsRequest.LATEST_TIMESTAMP, timer);
+ return beginningOrEndOffset(partitions,
ListOffsetsRequest.LATEST_TIMESTAMP, timer, false);
+ }
+
+ public OptionalLong currentLag(TopicPartition topicPartition) {
+ final Long lag = subscriptions.partitionLag(topicPartition,
isolationLevel);
+
+ // if the log end offset is not known and hence cannot return lag and
there is
+ // no in-flight list offset requested yet,
+ // issue a list offset request for that partition so that next time
+ // we may get the answer; we do not need to wait for the return value
+ // since we would not try to poll the network client synchronously
+ if (lag == null) {
+ if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
+
offsetFetcherUtils.maybeSetPartitionEndOffsetRequest(topicPartition)) {
+ beginningOrEndOffset(
+ Set.of(topicPartition),
+ ListOffsetsRequest.LATEST_TIMESTAMP,
+ time.timer(0L),
+ true
+ );
+ }
+
+ return OptionalLong.empty();
+ }
+
+ return OptionalLong.of(lag);
}
private Map<TopicPartition, Long>
beginningOrEndOffset(Collection<TopicPartition> partitions,
long timestamp,
- Timer timer) {
+ Timer timer,
+ boolean
updatePartitionEndOffsetsFlag) {
metadata.addTransientTopics(topicsForPartitions(partitions));
try {
Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
.distinct()
.collect(Collectors.toMap(Function.identity(), tp ->
timestamp));
- ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch,
timer, false);
+ ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch,
timer, false, updatePartitionEndOffsetsFlag);
return result.fetchedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().offset));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index a92237d0fc9..f6f324be070 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -277,6 +277,47 @@ class OffsetFetcherUtils {
log.trace("Updating high watermark for partition {} to
{}", partition, offset);
subscriptionState.updateHighWatermark(partition, offset);
}
+ } else {
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ log.warn("Not updating last stable offset for partition {}
as it is no longer assigned", partition);
+ } else {
+ log.warn("Not updating high watermark for partition {} as
it is no longer assigned", partition);
+ }
+ }
+ }
+ }
+
+ /**
+ * The {@code LIST_OFFSETS} lag lookup is serialized, so if there's an
inflight request it must finish before
+ * another request can be issued. This serialization mechanism is
controlled by the 'end offset requested'
+ * flag in {@link SubscriptionState}.
+ *
+ * @return {@code true} if the partition's end offset can be requested,
{@code false} if there's already an
+ * in-flight request
+ */
+ boolean maybeSetPartitionEndOffsetRequest(TopicPartition partition) {
+ if (subscriptionState.partitionEndOffsetRequested(partition)) {
+ log.info("Not requesting the log end offset for {} to compute lag
as an outstanding request already exists", partition);
+ return false;
+ } else {
+ log.info("Requesting the log end offset for {} in order to compute
lag", partition);
+ subscriptionState.requestPartitionEndOffset(partition);
+ return true;
+ }
+ }
+
+ /**
+ * If any of the given partitions are assigned, this will clear the
partition's 'end offset requested' flag so
+ * that the next attempt to look up the lag will properly issue another
<code>LIST_OFFSETS</code> request. This
+ * is only intended to be called when <code>LIST_OFFSETS</code> fails.
Successful <code>LIST_OFFSETS</code> calls
+ * should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
+ *
+ * @param partitions Partitions for which the 'end offset requested' flag
should be cleared (if still assigned)
+ */
+ void clearPartitionEndOffsetRequests(Collection<TopicPartition>
partitions) {
+ for (final TopicPartition partition : partitions) {
+ if
(subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
+ log.trace("Clearing end offset requested for partition {}",
partition);
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 38ed6c668d9..820b99235e8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -667,6 +667,17 @@ public class SubscriptionState {
return topicPartitionState.endOffsetRequested();
}
+ public synchronized boolean
maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
+ TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+
+ if (topicPartitionState != null &&
topicPartitionState.endOffsetRequested()) {
+ topicPartitionState.clearEndOffset();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
synchronized Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null :
topicPartitionState.position.offset - topicPartitionState.logStartOffset;
@@ -1037,6 +1048,10 @@ public class SubscriptionState {
endOffsetRequested = true;
}
+ public void clearEndOffset() {
+ endOffsetRequested = false;
+ }
+
private void transitionState(FetchState newState, Runnable
runIfTransitioned) {
FetchState nextState = this.fetchState.transitionTo(newState);
if (nextState.equals(newState)) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index db2b652d836..5687c422552 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
@@ -177,6 +178,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@@ -233,7 +235,7 @@ public class KafkaConsumerTest {
private final Collection<TopicPartition> singleTopicPartition = Set.of(new
TopicPartition(topic, 0));
private final Time time = new MockTime();
- private final SubscriptionState subscription = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.EARLIEST);
+ private final SubscriptionState subscription = spy(new
SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST));
private final ConsumerPartitionAssignor assignor = new
RoundRobinAssignor();
private KafkaConsumer<?, ?> consumer;
@@ -2663,21 +2665,7 @@ public class KafkaConsumerTest {
public void testCurrentLag(GroupProtocol groupProtocol) throws
InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
-
- initMetadata(client, Map.of(topic, 1));
-
- consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
-
- // throws for unassigned partition
- assertThrows(IllegalStateException.class, () ->
consumer.currentLag(tp0));
-
- consumer.assign(Set.of(tp0));
-
- // poll once to update with the current metadata
- consumer.poll(Duration.ofMillis(0));
- TestUtils.waitForCondition(() -> requestGenerated(client,
ApiKeys.FIND_COORDINATOR),
- "No metadata requests sent");
- client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
+ consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
// no error for no current position
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
@@ -2729,6 +2717,155 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
}
+ // TODO: this test validate that the consumer clears the
endOffsetRequested flag, but this is not yet implemented
+ // in the CONSUMER group protocol (see KAFKA-20187).
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ public void testCurrentLagPreventsMultipleInFlightRequests(GroupProtocol
groupProtocol) throws InterruptedException {
+ final ConsumerMetadata metadata = createMetadata(subscription);
+ final MockClient client = new MockClient(time, metadata);
+ consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+ // Validate the state of the endOffsetRequested flag. It should be
unset before the call to currentLag(),
+ // then set immediately afterward.
+ assertFalse(subscription.partitionEndOffsetRequested(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background
+ // thread on the next background thread poll.
+ consumer.poll(Duration.ofMillis(0));
+ }
+
+ long count = client.requests().stream()
+ .filter(request ->
request.requestBuilder().apiKey().equals(ApiKeys.LIST_OFFSETS))
+ .count();
+ assertEquals(
+ 1L,
+ count,
+ "Expected only one in-flight LIST_OFFSETS request for
consumerLag(), but consumer submitted " + count + " requests"
+ );
+ }
+
+ // TODO: this test validate that the consumer clears the
endOffsetRequested flag, but this is not yet implemented
+ // in the CONSUMER group protocol (see KAFKA-20187).
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ public void testCurrentLagClearsFlagOnFatalPartitionError(GroupProtocol
groupProtocol) throws InterruptedException {
+ final ConsumerMetadata metadata = createMetadata(subscription);
+ final MockClient client = new MockClient(time, metadata);
+ consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+ // Validate the state of the endOffsetRequested flag. It should be
unset before the call to currentLag(),
+ // then set immediately afterward.
+ assertFalse(subscription.partitionEndOffsetRequested(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ verify(subscription).requestPartitionEndOffset(tp0);
+
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background
+ // thread on the next background thread poll.
+ consumer.poll(Duration.ofMillis(0));
+ }
+
+ TestUtils.waitForCondition(
+ () -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
+ "No LIST_OFFSETS request sent within allotted timeout"
+ );
+
+ clearInvocations(subscription);
+
+ // Validate the state of the endOffsetRequested flag. It should still
be set before the call to
+ // currentLag(), because the previous LIST_OFFSETS call has not
received a response. In this case,
+ // the SubscriptionState.requestPartitionEndOffset() method should
*not* have been invoked.
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ verify(subscription, never()).requestPartitionEndOffset(tp0);
+
+ // Now respond to the LIST_OFFSETS request with an error in the
partition.
+ ClientRequest listOffsetRequest = findRequest(client,
ApiKeys.LIST_OFFSETS);
+ client.respondToRequest(listOffsetRequest,
listOffsetsResponse(Map.of(), Map.of(tp0, Errors.TOPIC_AUTHORIZATION_FAILED)));
+
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background
+ // thread on the next background thread poll.
+ assertThrows(TopicAuthorizationException.class, () ->
consumer.poll(Duration.ofMillis(0)));
+ }
+
+ // AsyncKafkaConsumer may take a moment to poll and process the
LIST_OFFSETS response, so a repeated
+ // wait is appropriate here.
+ TestUtils.waitForCondition(
+ () -> !subscription.partitionEndOffsetRequested(tp0),
+ "endOffsetRequested flag was not cleared within allotted timeout"
+ );
+ }
+
+ // TODO: this test validate that the consumer clears the
endOffsetRequested flag, but this is not yet implemented
+ // in the CONSUMER group protocol (see KAFKA-20187).
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ public void
testCurrentLagClearsFlagOnRetriablePartitionError(GroupProtocol groupProtocol)
throws InterruptedException {
+ final ConsumerMetadata metadata = createMetadata(subscription);
+ final MockClient client = new MockClient(time, metadata);
+ consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata);
+
+ // Validate the state of the endOffsetRequested flag. It should be
unset before the call to currentLag(),
+ // then set immediately afterward.
+ assertFalse(subscription.partitionEndOffsetRequested(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ verify(subscription).requestPartitionEndOffset(tp0);
+
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background
+ // thread on the next background thread poll.
+ consumer.poll(Duration.ofMillis(0));
+ }
+
+ TestUtils.waitForCondition(
+ () -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
+ "No LIST_OFFSETS request sent within allotted timeout"
+ );
+
+ clearInvocations(subscription);
+
+ // Validate the state of the endOffsetRequested flag. It should still
be set before the call to
+ // currentLag(), because the previous LIST_OFFSETS call has not
received a response. In this case,
+ // the SubscriptionState.requestPartitionEndOffset() method should
*not* have been invoked.
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertTrue(subscription.partitionEndOffsetRequested(tp0));
+ verify(subscription, never()).requestPartitionEndOffset(tp0);
+
+ // Now respond to the LIST_OFFSETS request with an error in the
partition.
+ ClientRequest listOffsetRequest = findRequest(client,
ApiKeys.LIST_OFFSETS);
+ client.respondToRequest(listOffsetRequest,
listOffsetsResponse(Map.of(), Map.of(tp0, Errors.OFFSET_NOT_AVAILABLE)));
+
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background
+ // thread on the next background thread poll.
+ consumer.poll(Duration.ofMillis(0));
+ }
+
+ // AsyncKafkaConsumer may take a moment to poll and process the
LIST_OFFSETS response, so a repeated
+ // wait is appropriate here.
+ TestUtils.waitForCondition(
+ () -> !subscription.partitionEndOffsetRequested(tp0),
+ "endOffsetRequested flag was not cleared within allotted timeout"
+ );
+ }
+
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testListOffsetShouldUpdateSubscriptions(GroupProtocol
groupProtocol) {
@@ -2748,6 +2885,30 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
}
+ private KafkaConsumer<String, String>
setUpConsumerForCurrentLag(GroupProtocol groupProtocol,
+
MockClient client,
+
ConsumerMetadata metadata) throws InterruptedException {
+ initMetadata(client, Map.of(topic, 1));
+
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata, assignor, false,
+ groupId, groupInstanceId, false);
+
+ // throws for unassigned partition
+ assertThrows(IllegalStateException.class, () ->
consumer.currentLag(tp0));
+
+ consumer.assign(Set.of(tp0));
+
+ // poll once to update with the current metadata
+ consumer.poll(Duration.ofMillis(0));
+ TestUtils.waitForCondition(
+ () -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
+ "No FIND_COORDINATOR request sent within allotted timeout"
+ );
+ client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
+
+ return consumer;
+ }
+
private ClientRequest findRequest(MockClient client, ApiKeys apiKey) {
Optional<ClientRequest> request = client.requests().stream().filter(r
-> r.requestBuilder().apiKey().equals(apiKey)).findFirst();
assertTrue(request.isPresent(), "No " + apiKey + " request was
submitted to the client");