kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364699488


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate networkClientDelegate;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+                Collections.emptyMap(), singletonMap(topicName, 4),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    private void assignFromUser(TopicPartition partition) {
+        subscriptions.assignFromUser(singleton(partition));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(partition.topic(), 1), Collections.emptyMap()));
+
+        // A dummy metadata update to ensure valid leader epoch.
+        metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1,
+                Collections.emptyMap(), singletonMap(partition.topic(), 1),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        if (metrics != null)
+            metrics.close();
+        if (fetcher != null)
+            fetcher.close();
+    }
+
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();
+        return fetcher.sendFetches();
+    }
+
+    @Test
+    public void testFetchNormal() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp0);
+        assertEquals(3, records.size());
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testInflightFetchOnPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        subscriptions.markPendingRevocation(singleton(tp0));
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertNull(fetchedRecords().get(tp0));
+    }
+
+    @Test
+    public void testCloseShouldBeIdempotent() {
+        buildFetcher();
+
+        fetcher.close();
+        fetcher.close();
+        fetcher.close();
+
+        verify(fetcher, times(1)).closeInternal(any(Timer.class));
+    }
+
+    @Test
+    public void testFetcherCloseClosesFetchSessionsInBroker() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        final FetchResponse fetchResponse = fullFetchResponse(tidp0, records, 
Errors.NONE, 100L, 0);
+        client.prepareResponse(fetchResponse);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        assertEquals(0, networkClientDelegate.pendingRequestCount());
+
+        final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = 
ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class);
+
+        // send request to close the fetcher
+        Timer timer = time.timer(Duration.ofSeconds(10));
+        // fetcher.close(timer);
+        //
+        // NOTE: by design the FetchRequestManager doesn't perform network I/O 
internally. That means that calling
+        // close with a Timer will NOT send out the close session requests on 
close. The network I/O logic is
+        // handled inside ConsumerNetworkThread.runAtClose, so we need to run 
that logic here.
+        ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, timer);
+
+        NetworkClientDelegate.PollResult pollResult = 
fetcher.poll(time.milliseconds());
+        networkClientDelegate.addAll(pollResult.unsentRequests);
+        networkClientDelegate.poll(timer);
+
+        // validate that Fetcher.close() has sent a request with final epoch. 
2 requests are sent, one for the normal
+        // fetch earlier and another for the finish fetch here.
+        verify(networkClientDelegate, times(2)).doSend(argument.capture(), 
any(Long.class));
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
argument.getValue();
+        FetchRequest.Builder builder = (FetchRequest.Builder) 
unsentRequest.requestBuilder();
+        // session Id is the same
+        assertEquals(fetchResponse.sessionId(), 
builder.metadata().sessionId());
+        // contains final epoch
+        assertEquals(FetchMetadata.FINAL_EPOCH, builder.metadata().epoch());  
// final epoch indicates we want to close the session
+        assertTrue(builder.fetchData().isEmpty()); // partition data should be 
empty
+    }
+
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testFetchWithNoTopicId() {
+        // Should work and default to using old request type.
+        buildFetcher();
+
+        TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new 
TopicPartition("noId", 0));
+        assignFromUser(noId.topicPartition());
+        subscriptions.seek(noId.topicPartition(), 0);
+
+        // Fetch should use request version 12
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12, noId, 0, 
Optional.of(validLeaderEpoch)),
+                fullFetchResponse(noId, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(noId.topicPartition()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(noId.topicPartition());
+        assertEquals(3, records.size());
+        assertEquals(4L, 
subscriptions.position(noId.topicPartition()).offset); // this is the next 
fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchWithTopicId() {
+        buildFetcher();
+
+        TopicIdPartition tp = new TopicIdPartition(topicId, new 
TopicPartition(topicName, 0));
+        assignFromUser(singleton(tp.topicPartition()));
+        subscriptions.seek(tp.topicPartition(), 0);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Fetch should use latest version
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), tp, 0, 
Optional.of(validLeaderEpoch)),
+                fullFetchResponse(tp, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp.topicPartition()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp.topicPartition());
+        assertEquals(3, records.size());
+        assertEquals(4L, subscriptions.position(tp.topicPartition()).offset); 
// this is the next fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchForgetTopicIdWhenUnassigned() {
+        buildFetcher();
+
+        TopicIdPartition foo = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition bar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+
+        // Assign foo and bar.
+        subscriptions.assignFromUser(singleton(foo.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(foo), tp -> validLeaderEpoch));
+        subscriptions.seek(foo.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(foo, new PartitionData(
+                                foo.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, foo, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Assign bar and unassign foo.
+        subscriptions.assignFromUser(singleton(bar.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(bar), tp -> validLeaderEpoch));
+        subscriptions.seek(bar.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(bar, new PartitionData(
+                                bar.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        singletonList(foo)
+                ),
+                fullFetchResponse(1, bar, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    @Test
+    public void testFetchForgetTopicIdWhenReplaced() {
+        buildFetcher();
+
+        TopicIdPartition fooWithOldTopicId = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+        TopicIdPartition fooWithNewTopicId = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+
+        // Assign foo with old topic id.
+        
subscriptions.assignFromUser(singleton(fooWithOldTopicId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithOldTopicId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithOldTopicId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithOldTopicId, new PartitionData(
+                                fooWithOldTopicId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithOldTopicId, records, Errors.NONE, 
100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Replace foo with old topic id with foo with new topic id.
+        
subscriptions.assignFromUser(singleton(fooWithNewTopicId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithNewTopicId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithNewTopicId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithNewTopicId, new PartitionData(
+                                fooWithNewTopicId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        singletonList(fooWithOldTopicId)
+                ),
+                fullFetchResponse(1, fooWithNewTopicId, records, Errors.NONE, 
100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    @Test
+    public void testFetchTopicIdUpgradeDowngrade() {
+        buildFetcher();
+
+        TopicIdPartition fooWithoutId = new TopicIdPartition(Uuid.ZERO_UUID, 
new TopicPartition("foo", 0));
+        TopicIdPartition fooWithId = new TopicIdPartition(Uuid.randomUuid(), 
new TopicPartition("foo", 0));
+
+        // Assign foo without a topic id.
+        subscriptions.assignFromUser(singleton(fooWithoutId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithoutId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithoutId.topicPartition(), 0);
+
+        // Fetch should use version 12.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12,
+                        singletonMap(fooWithoutId, new PartitionData(
+                                fooWithoutId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithoutId, records, Errors.NONE, 100L, 
0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Upgrade.
+        subscriptions.assignFromUser(singleton(fooWithId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithId, new PartitionData(
+                                fooWithId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithId, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Downgrade.
+        subscriptions.assignFromUser(singleton(fooWithoutId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithoutId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithoutId.topicPartition(), 0);
+
+        // Fetch should use version 12.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12,
+                        singletonMap(fooWithoutId, new PartitionData(
+                                fooWithoutId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithoutId, records, Errors.NONE, 100L, 
0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    private MockClient.RequestMatcher fetchRequestMatcher(
+            short expectedVersion,
+            TopicIdPartition tp,
+            long expectedFetchOffset,
+            Optional<Integer> expectedCurrentLeaderEpoch
+    ) {
+        return fetchRequestMatcher(
+                expectedVersion,
+                singletonMap(tp, new PartitionData(
+                        tp.topicId(),
+                        expectedFetchOffset,
+                        FetchRequest.INVALID_LOG_START_OFFSET,
+                        fetchSize,
+                        expectedCurrentLeaderEpoch
+                )),
+                emptyList()
+        );
+    }
+
+    private MockClient.RequestMatcher fetchRequestMatcher(
+            short expectedVersion,
+            Map<TopicIdPartition, PartitionData> fetch,
+            List<TopicIdPartition> forgotten
+    ) {
+        return body -> {
+            if (body instanceof FetchRequest) {
+                FetchRequest fetchRequest = (FetchRequest) body;
+                assertEquals(expectedVersion, fetchRequest.version());
+                assertEquals(fetch, fetchRequest.fetchData(topicNames(new 
ArrayList<>(fetch.keySet()))));
+                assertEquals(forgotten, 
fetchRequest.forgottenTopics(topicNames(forgotten)));
+                return true;
+            } else {
+                fail("Should have seen FetchRequest");
+                return false;
+            }
+        };
+    }
+
+    private Map<Uuid, String> topicNames(List<TopicIdPartition> partitions) {
+        Map<Uuid, String> topicNames = new HashMap<>();
+        partitions.forEach(partition -> 
topicNames.putIfAbsent(partition.topicId(), partition.topic()));
+        return topicNames;
+    }
+
+    @Test
+    public void testMissingLeaderEpochInRecords() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V0,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 
System.currentTimeMillis(),
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+        builder.append(0L, "key".getBytes(), "1".getBytes());
+        builder.append(0L, "key".getBytes(), "2".getBytes());
+        MemoryRecords records = builder.build();
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertEquals(2, partitionRecords.get(tp0).size());
+
+        for (ConsumerRecord<byte[], byte[]> record : 
partitionRecords.get(tp0)) {
+            assertEquals(Optional.empty(), record.leaderEpoch());
+        }
+    }
+
+    @Test
+    public void testLeaderEpochInConsumerRecord() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        Integer partitionLeaderEpoch = 1;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 
System.currentTimeMillis(),
+                partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        partitionLeaderEpoch += 7;
+
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 2L, System.currentTimeMillis(), 
partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        partitionLeaderEpoch += 5;
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 3L, System.currentTimeMillis(), 
partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertEquals(6, partitionRecords.get(tp0).size());
+
+        for (ConsumerRecord<byte[], byte[]> record : 
partitionRecords.get(tp0)) {
+            int expectedLeaderEpoch = 
Integer.parseInt(Utils.utf8(record.value()));
+            assertEquals(Optional.of(expectedLeaderEpoch), 
record.leaderEpoch());
+        }
+    }
+
+    @Test
+    public void testClearBufferedDataForTopicPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+        newAssignedTopicPartitions.add(tp1);
+
+        
fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+        assertFalse(fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testFetchSkipsBlackedOutNodes() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        Node node = initialUpdateResponse.brokers().iterator().next();
+
+        client.backoff(node, 500);
+        assertEquals(0, sendFetches());
+
+        time.sleep(500);
+        assertEquals(1, sendFetches());
+    }
+
+    @Test
+    public void testFetcherIgnoresControlRecords() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        long producerId = 1;
+        short producerEpoch = 0;
+        int baseSequence = 0;
+        int partitionLeaderEpoch = 0;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, 
CompressionType.NONE, 0L, producerId,
+                producerEpoch, baseSequence);
+        builder.append(0L, "key".getBytes(), null);
+        builder.close();
+
+        MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 
time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
+                new EndTransactionMarker(ControlRecordType.ABORT, 0));
+
+        buffer.flip();
+
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp0);
+        assertEquals(1, records.size());
+        assertEquals(2L, subscriptions.position(tp0).offset);
+
+        ConsumerRecord<byte[], byte[]> record = records.get(0);
+        assertArrayEquals("key".getBytes(), record.key());
+    }
+
+    @Test
+    public void testFetchError() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+    }
+
+    private MockClient.RequestMatcher matchesOffset(final TopicIdPartition tp, 
final long offset) {
+        return body -> {
+            FetchRequest fetch = (FetchRequest) body;
+            Map<TopicIdPartition, FetchRequest.PartitionData> fetchData = 
fetch.fetchData(topicNames);
+            return fetchData.containsKey(tp) &&
+                    fetchData.get(tp).fetchOffset == offset;
+        };
+    }
+
+    @Test
+    public void testFetchedRecordsRaisesOnSerializationErrors() {
+        // raise an exception from somewhere in the middle of the fetch 
response
+        // so that we can verify that our position does not advance after 
raising
+        ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
+            int i = 0;
+            @Override
+            public byte[] deserialize(String topic, byte[] data) {
+                if (i++ % 2 == 1) {
+                    // Should be blocked on the value deserialization of the 
first record.
+                    assertEquals("value-1", new String(data, 
StandardCharsets.UTF_8));
+                    throw new SerializationException();
+                }
+                return data;
+            }
+        };
+
+        buildFetcher(deserializer, deserializer);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, records, Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        // The fetcher should block on Deserialization error
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.collectFetch();
+                fail("fetchedRecords should have raised");
+            } catch (SerializationException e) {
+                // the position should not advance since no data has been 
returned
+                assertEquals(1, subscriptions.position(tp0).offset);
+            }
+        }
+    }
+
+    @Test
+    public void testParseCorruptedRecord() throws Exception {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        DataOutputStream out = new DataOutputStream(new 
ByteBufferOutputStream(buffer));
+
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        byte[] key = "foo".getBytes();
+        byte[] value = "baz".getBytes();
+        long offset = 0;
+        long timestamp = 500L;
+
+        int size = LegacyRecord.recordSize(magic, key.length, value.length);
+        byte attributes = LegacyRecord.computeAttributes(magic, 
CompressionType.NONE, TimestampType.CREATE_TIME);
+        long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, 
key, value);
+
+        // write one valid record
+        out.writeLong(offset);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // and one invalid record (note the crc)
+        out.writeLong(offset + 1);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc + 1, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // write one valid record
+        out.writeLong(offset + 2);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // Write a record whose size field is invalid.
+        out.writeLong(offset + 3);
+        out.writeInt(1);
+
+        // write one valid record
+        out.writeLong(offset + 4);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        buffer.flip();
+
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.empty(), 
metadata.currentLeader(tp0)));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // the first fetchedRecords() should return the first valid message
+        assertEquals(1, fetchedRecords().get(tp0).size());
+        assertEquals(1, subscriptions.position(tp0).offset);
+
+        ensureBlockOnRecord(1L);
+        seekAndConsumeRecord(buffer, 2L);
+        ensureBlockOnRecord(3L);
+        try {
+            // For a record that cannot be retrieved from the iterator, we 
cannot seek over it within the batch.
+            seekAndConsumeRecord(buffer, 4L);
+            fail("Should have thrown exception when fail to retrieve a record 
from iterator.");
+        } catch (KafkaException ke) {
+            // let it go
+        }
+        ensureBlockOnRecord(4L);
+    }
+
+    private void ensureBlockOnRecord(long blockedOffset) {
+        // the fetchedRecords() should always throw exception due to the 
invalid message at the starting offset.
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.collectFetch();
+                fail("fetchedRecords should have raised KafkaException");
+            } catch (KafkaException e) {
+                assertEquals(blockedOffset, 
subscriptions.position(tp0).offset);
+            }
+        }
+    }
+
+    private void seekAndConsumeRecord(ByteBuffer responseBuffer, long 
toOffset) {
+        // Seek to skip the bad record and fetch again.
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(toOffset, Optional.empty(), 
metadata.currentLeader(tp0)));
+        // Should not throw exception after the seek.
+        fetcher.collectFetch();
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(responseBuffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        List<ConsumerRecord<byte[], byte[]>> records = 
recordsByPartition.get(tp0);
+        assertEquals(1, records.size());
+        assertEquals(toOffset, records.get(0).offset());
+        assertEquals(toOffset + 1, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testInvalidDefaultRecordBatch() {
+        buildFetcher();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out,
+                DefaultRecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE,
+                TimestampType.CREATE_TIME,
+                0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024);
+        builder.append(10L, "key".getBytes(), "value".getBytes());
+        builder.close();
+        buffer.flip();
+
+        // Garble the CRC
+        buffer.position(17);
+        buffer.put("beef".getBytes());
+        buffer.position(0);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // the fetchedRecords() should always throw exception due to the bad 
batch.
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.collectFetch();
+                fail("fetchedRecords should have raised KafkaException");
+            } catch (KafkaException e) {
+                assertEquals(0, subscriptions.position(tp0).offset);
+            }
+        }
+    }
+
+    @Test
+    public void testParseInvalidRecordBatch() {
+        buildFetcher();
+        MemoryRecords records = 
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        ByteBuffer buffer = records.buffer();
+
+        // flip some bits to fail the crc
+        buffer.putInt(32, buffer.get(32) ^ 87238423);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        try {
+            fetcher.collectFetch();
+            fail("fetchedRecords should have raised");
+        } catch (KafkaException e) {
+            // the position should not advance since no data has been returned
+            assertEquals(0, subscriptions.position(tp0).offset);
+        }
+    }
+
+    @Test
+    public void testHeaders() {
+        buildFetcher();
+
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, 1L);
+        builder.append(0L, "key".getBytes(), "value-1".getBytes());
+
+        Header[] headersArray = new Header[1];
+        headersArray[0] = new RecordHeader("headerKey", 
"headerValue".getBytes(StandardCharsets.UTF_8));
+        builder.append(0L, "key".getBytes(), "value-2".getBytes(), 
headersArray);
+
+        Header[] headersArray2 = new Header[2];
+        headersArray2[0] = new RecordHeader("headerKey", 
"headerValue".getBytes(StandardCharsets.UTF_8));
+        headersArray2[1] = new RecordHeader("headerKey", 
"headerValue2".getBytes(StandardCharsets.UTF_8));
+        builder.append(0L, "key".getBytes(), "value-3".getBytes(), 
headersArray2);
+
+        MemoryRecords memoryRecords = builder.build();
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, memoryRecords, Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        records = recordsByPartition.get(tp0);
+
+        assertEquals(3, records.size());
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
records.iterator();
+
+        ConsumerRecord<byte[], byte[]> record = recordIterator.next();
+        assertNull(record.headers().lastHeader("headerKey"));
+
+        record = recordIterator.next();
+        assertEquals("headerValue", new 
String(record.headers().lastHeader("headerKey").value(), 
StandardCharsets.UTF_8));
+        assertEquals("headerKey", 
record.headers().lastHeader("headerKey").key());
+
+        record = recordIterator.next();
+        assertEquals("headerValue2", new 
String(record.headers().lastHeader("headerKey").value(), 
StandardCharsets.UTF_8));
+        assertEquals("headerKey", 
record.headers().lastHeader("headerKey").key());
+    }
+
+    @Test
+    public void testFetchMaxPollRecords() {
+        buildFetcher(2);
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tidp0, 4), 
fullFetchResponse(tidp0, nextRecords, Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        records = recordsByPartition.get(tp0);
+        assertEquals(2, records.size());
+        assertEquals(3L, subscriptions.position(tp0).offset);
+        assertEquals(1, records.get(0).offset());
+        assertEquals(2, records.get(1).offset());
+
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        recordsByPartition = fetchedRecords();
+        records = recordsByPartition.get(tp0);
+        assertEquals(1, records.size());
+        assertEquals(4L, subscriptions.position(tp0).offset);
+        assertEquals(3, records.get(0).offset());
+
+        assertTrue(sendFetches() > 0);
+        networkClientDelegate.poll(time.timer(0));
+        recordsByPartition = fetchedRecords();
+        records = recordsByPartition.get(tp0);
+        assertEquals(2, records.size());
+        assertEquals(6L, subscriptions.position(tp0).offset);
+        assertEquals(4, records.get(0).offset());
+        assertEquals(5, records.get(1).offset());
+    }
+
+    /**
+     * Test the scenario where a partition with fetched but not consumed 
records (i.e. max.poll.records is
+     * less than the number of fetched records) is unassigned and a different 
partition is assigned. This is a
+     * pattern used by Streams state restoration and KAFKA-5097 would have 
been caught by this test.
+     */
+    @Test
+    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
+        buildFetcher(2);
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        // Returns 3 records while `max.poll.records` is configured to 2
+        client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        records = recordsByPartition.get(tp0);
+        assertEquals(2, records.size());
+        assertEquals(3L, subscriptions.position(tp0).offset);
+        assertEquals(1, records.get(0).offset());
+        assertEquals(2, records.get(1).offset());
+
+        assignFromUser(singleton(tp1));
+        client.prepareResponse(matchesOffset(tidp1, 4), 
fullFetchResponse(tidp1, nextRecords, Errors.NONE, 100L, 0));
+        subscriptions.seek(tp1, 4);
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertNull(fetchedRecords.get(tp0));
+        records = fetchedRecords.get(tp1);
+        assertEquals(2, records.size());
+        assertEquals(6L, subscriptions.position(tp1).offset);
+        assertEquals(4, records.get(0).offset());
+        assertEquals(5, records.get(1).offset());
+    }
+
+    @Test
+    public void testFetchNonContinuousRecords() {
+        // if we are fetching from a compacted topic, there may be gaps in the 
returned records
+        // this test verifies the fetcher updates the current fetched/consumed 
positions correctly for this case
+        buildFetcher();
+
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        builder.appendWithOffset(15L, 0L, "key".getBytes(), 
"value-1".getBytes());
+        builder.appendWithOffset(20L, 0L, "key".getBytes(), 
"value-2".getBytes());
+        builder.appendWithOffset(30L, 0L, "key".getBytes(), 
"value-3".getBytes());
+        MemoryRecords records = builder.build();
+
+        List<ConsumerRecord<byte[], byte[]>> consumerRecords;
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        consumerRecords = recordsByPartition.get(tp0);
+        assertEquals(3, consumerRecords.size());
+        assertEquals(31L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+        assertEquals(15L, consumerRecords.get(0).offset());
+        assertEquals(20L, consumerRecords.get(1).offset());
+        assertEquals(30L, consumerRecords.get(2).offset());
+    }
+
+    /**
+     * Test the case where the client makes a pre-v3 FetchRequest, but the 
server replies with only a partial
+     * request. This happens when a single message is larger than the 
per-partition limit.
+     */
+    @Test
+    public void testFetchRequestWhenRecordTooLarge() {
+        try {
+            buildFetcher();
+
+            client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.FETCH.id, 
(short) 2, (short) 2));
+            makeFetchRequestWithIncompleteRecord();
+            try {
+                fetcher.collectFetch();
+                fail("RecordTooLargeException should have been raised");
+            } catch (RecordTooLargeException e) {
+                assertTrue(e.getMessage().startsWith("There are some messages 
at [Partition=Offset]: "));
+                // the position should not advance since no data has been 
returned
+                assertEquals(0, subscriptions.position(tp0).offset);
+            }
+        } finally {
+            client.setNodeApiVersions(NodeApiVersions.create());
+        }
+    }
+
+    /**
+     * Test the case where the client makes a post KIP-74 FetchRequest, but 
the server replies with only a
+     * partial request. For v3 and later FetchRequests, the implementation of 
KIP-74 changed the behavior
+     * so that at least one message is always returned. Therefore, this case 
should not happen, and it indicates
+     * that an internal error has taken place.
+     */
+    @Test
+    public void testFetchRequestInternalError() {
+        buildFetcher();
+        makeFetchRequestWithIncompleteRecord();
+        try {
+            fetcher.collectFetch();
+            fail("RecordTooLargeException should have been raised");
+        } catch (KafkaException e) {
+            assertTrue(e.getMessage().startsWith("Failed to make progress 
reading messages"));
+            // the position should not advance since no data has been returned
+            assertEquals(0, subscriptions.position(tp0).offset);
+        }
+    }
+
+    private void makeFetchRequestWithIncompleteRecord() {
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        MemoryRecords partialRecord = MemoryRecords.readableRecords(
+                ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
+        client.prepareResponse(fullFetchResponse(tidp0, partialRecord, 
Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testUnauthorizedTopic() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // resize the limit of the buffer to pretend it is only fetch-size 
large
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        try {
+            fetcher.collectFetch();
+            fail("fetchedRecords should have thrown");
+        } catch (TopicAuthorizationException e) {
+            assertEquals(singleton(topicName), e.unauthorizedTopics());
+        }
+    }
+
+    @Test
+    public void testFetchDuringEagerRebalance() {
+        buildFetcher();
+
+        subscriptions.subscribe(singleton(topicName), listener);
+        subscriptions.assignFromSubscribed(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+                1, singletonMap(topicName, 4), tp -> validLeaderEpoch, 
topicIds));
+
+        assertEquals(1, sendFetches());
+
+        // Now the eager rebalance happens and fetch positions are cleared
+        subscriptions.assignFromSubscribed(Collections.emptyList());
+
+        subscriptions.assignFromSubscribed(singleton(tp0));
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // The active fetch should be ignored since its position is no longer 
valid
+        assertTrue(fetchedRecords().isEmpty());
+    }
+
+    @Test
+    public void testFetchDuringCooperativeRebalance() {
+        buildFetcher();
+
+        subscriptions.subscribe(singleton(topicName), listener);
+        subscriptions.assignFromSubscribed(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+                1, singletonMap(topicName, 4), tp -> validLeaderEpoch, 
topicIds));
+
+        assertEquals(1, sendFetches());
+
+        // Now the cooperative rebalance happens and fetch positions are NOT 
cleared for unrevoked partitions
+        subscriptions.assignFromSubscribed(singleton(tp0));
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+
+        // The active fetch should NOT be ignored since the position for tp0 
is still valid
+        assertEquals(1, fetchedRecords.size());
+        assertEquals(3, fetchedRecords.get(tp0).size());
+    }
+
+    @Test
+    public void testInFlightFetchOnPausedPartition() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        subscriptions.pause(tp0);
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertNull(fetchedRecords().get(tp0));
+    }
+
+    @Test
+    public void testFetchOnPausedPartition() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        subscriptions.pause(tp0);
+        assertFalse(sendFetches() > 0);
+        assertTrue(client.requests().isEmpty());
+    }
+
+    @Test
+    public void testFetchOnCompletedFetchesForPausedAndResumedPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+
+        subscriptions.pause(tp0);
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEmptyFetch("Should not return any records or advance position 
when partition is paused");
+
+        assertTrue(fetcher.hasCompletedFetches(), "Should still contain 
completed fetches");
+        assertFalse(fetcher.hasAvailableFetches(), "Should not have any 
available (non-paused) completed fetches");
+        assertEquals(0, sendFetches());
+
+        subscriptions.resume(tp0);
+
+        assertTrue(fetcher.hasAvailableFetches(), "Should have available 
(non-paused) completed fetches");
+
+        networkClientDelegate.poll(time.timer(0));
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertEquals(1, fetchedRecords.size(), "Should return records when 
partition is resumed");
+        assertNotNull(fetchedRecords.get(tp0));
+        assertEquals(3, fetchedRecords.get(tp0).size());
+
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position after 
previously paused partitions are fetched");
+        assertFalse(fetcher.hasCompletedFetches(), "Should no longer contain 
completed fetches");
+    }
+
+    @Test
+    public void testFetchOnCompletedFetchesForSomePausedPartitions() {
+        buildFetcher();
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords;
+
+        assignFromUser(mkSet(tp0, tp1));
+
+        // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
+
+        // #1 seek, request, poll, response
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp0)));
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // #2 seek, request, poll, response
+        subscriptions.seekUnvalidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp1, nextRecords, 
Errors.NONE, 100L, 0));
+
+        subscriptions.pause(tp0);
+        networkClientDelegate.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+        assertEquals(1, fetchedRecords.size(), "Should return completed fetch 
for unpaused partitions");
+        assertTrue(fetcher.hasCompletedFetches(), "Should still contain 
completed fetches");
+        assertNotNull(fetchedRecords.get(tp1));
+        assertNull(fetchedRecords.get(tp0));
+
+        assertEmptyFetch("Should not return records or advance position for 
remaining paused partition");
+        assertTrue(fetcher.hasCompletedFetches(), "Should still contain 
completed fetches");
+    }
+
+    @Test
+    public void testFetchOnCompletedFetchesForAllPausedPartitions() {
+        buildFetcher();
+
+        assignFromUser(mkSet(tp0, tp1));
+
+        // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
+
+        // #1 seek, request, poll, response
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp0)));
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // #2 seek, request, poll, response
+        subscriptions.seekUnvalidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp1, nextRecords, 
Errors.NONE, 100L, 0));
+
+        subscriptions.pause(tp0);
+        subscriptions.pause(tp1);
+
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEmptyFetch("Should not return records or advance position for 
all paused partitions");
+        assertTrue(fetcher.hasCompletedFetches(), "Should still contain 
completed fetches");
+        assertFalse(fetcher.hasAvailableFetches(), "Should not have any 
available (non-paused) completed fetches");
+    }
+
+    @Test
+    public void testPartialFetchWithPausedPartitions() {
+        // this test sends creates a completed fetch with 3 records and a max 
poll of 2 records to assert
+        // that a fetch that must be returned over at least 2 polls can be 
cached successfully when its partition is
+        // paused, then returned successfully after its been resumed again 
later
+        buildFetcher(2);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords;
+
+        assignFromUser(mkSet(tp0, tp1));
+
+        subscriptions.seek(tp0, 1);
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEquals(2, fetchedRecords.get(tp0).size(), "Should return 2 
records from fetch with 3 records");
+        assertFalse(fetcher.hasCompletedFetches(), "Should have no completed 
fetches");
+
+        subscriptions.pause(tp0);
+        networkClientDelegate.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEmptyFetch("Should not return records or advance position for 
paused partitions");
+        assertTrue(fetcher.hasCompletedFetches(), "Should have 1 entry in 
completed fetches");
+        assertFalse(fetcher.hasAvailableFetches(), "Should not have any 
available (non-paused) completed fetches");
+
+        subscriptions.resume(tp0);
+
+        networkClientDelegate.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEquals(1, fetchedRecords.get(tp0).size(), "Should return last 
remaining record");
+        assertFalse(fetcher.hasCompletedFetches(), "Should have no completed 
fetches");
+    }
+
+    @Test
+    public void 
testFetchDiscardedAfterPausedPartitionResumedAndSeekedToNewOffset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        subscriptions.pause(tp0);
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+
+        subscriptions.seek(tp0, 3);
+        subscriptions.resume(tp0);
+        networkClientDelegate.poll(time.timer(0));
+
+        assertTrue(fetcher.hasCompletedFetches(), "Should have 1 entry in 
completed fetches");
+        Fetch<byte[], byte[]> fetch = collectFetch();
+        assertEquals(emptyMap(), fetch.records(), "Should not return any 
records because we seeked to a new offset");
+        assertFalse(fetch.positionAdvanced());
+        assertFalse(fetcher.hasCompletedFetches(), "Should have no completed 
fetches");
+    }
+
+    @Test
+    public void testFetchNotLeaderOrFollower() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchUnknownTopicOrPartition() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchUnknownTopicId() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.UNKNOWN_TOPIC_ID, -1L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchSessionIdError() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fetchResponseWithTopLevelError(tidp0, 
Errors.FETCH_SESSION_TOPIC_ID_ERROR, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchInconsistentTopicId() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.INCONSISTENT_TOPIC_ID, -1L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchFencedLeaderEpoch() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.FENCED_LEADER_EPOCH, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()), 
"Should have requested metadata update");
+    }
+
+    @Test
+    public void testFetchUnknownLeaderEpoch() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.UNKNOWN_LEADER_EPOCH, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertNotEquals(0L, metadata.timeToNextUpdate(time.milliseconds()), 
"Should not have requested metadata update");
+    }
+
+    @Test
+    public void testEpochSetInFetchRequest() {
+        buildFetcher();
+        subscriptions.assignFromUser(singleton(tp0));
+        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWithIds("dummy", 1,
+                Collections.emptyMap(), Collections.singletonMap(topicName, 
4), tp -> 99, topicIds);
+        client.updateMetadata(metadataResponse);
+
+        subscriptions.seek(tp0, 10);
+        assertEquals(1, sendFetches());
+
+        // Check for epoch in outgoing request
+        MockClient.RequestMatcher matcher = body -> {
+            if (body instanceof FetchRequest) {
+                FetchRequest fetchRequest = (FetchRequest) body;
+                
fetchRequest.fetchData(topicNames).values().forEach(partitionData -> {
+                    assertTrue(partitionData.currentLeaderEpoch.isPresent(), 
"Expected Fetcher to set leader epoch in request");
+                    assertEquals(99, 
partitionData.currentLeaderEpoch.get().longValue(), "Expected leader epoch to 
match epoch from metadata update");
+                });
+                return true;
+            } else {
+                fail("Should have seen FetchRequest");
+                return false;
+            }
+        };
+        client.prepareResponse(matcher, fullFetchResponse(tidp0, records, 
Errors.NONE, 100L, 0));
+        networkClientDelegate.pollNoWakeup();
+    }
+
+    @Test
+    public void testFetchOffsetOutOfRange() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertNull(subscriptions.validPosition(tp0));
+        assertNull(subscriptions.position(tp0));
+    }
+
+    @Test
+    public void testStaleOutOfRangeError() {
+        // verify that an out of range error which arrives after a seek
+        // does not cause us to reset our position or throw an exception
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        subscriptions.seek(tp0, 1);
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
fetch error");
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(1, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testFetchedRecordsAfterSeek() {
+        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertTrue(sendFetches() > 0);
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        subscriptions.seek(tp0, 2);
+        assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
+    }
+
+    @Test
+    public void testFetchOffsetOutOfRangeException() {
+        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        sendFetches();
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        for (int i = 0; i < 2; i++) {
+            OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, () ->
+                    fetcher.collectFetch());
+            assertEquals(singleton(tp0), 
e.offsetOutOfRangePartitions().keySet());
+            assertEquals(0L, 
e.offsetOutOfRangePartitions().get(tp0).longValue());
+        }
+    }
+
+    @Test
+    public void testFetchPositionAfterException() {
+        // verify the advancement in the next fetch offset equals to the 
number of fetched records when
+        // some fetched partitions cause Exception. This ensures that consumer 
won't lose record upon exception
+        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_UNCOMMITTED);
+        assignFromUser(mkSet(tp0, tp1));
+        subscriptions.seek(tp0, 1);
+        subscriptions.seek(tp1, 1);
+
+        assertEquals(1, sendFetches());
+
+        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new LinkedHashMap<>();
+        partitions.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setHighWatermark(100)
+                .setRecords(records));
+        partitions.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+                .setHighWatermark(100));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        networkClientDelegate.poll(time.timer(0));
+
+        List<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new 
ArrayList<>();
+        fetchRecordsInto(allFetchedRecords);
+
+        assertEquals(1, subscriptions.position(tp0).offset);
+        assertEquals(4, subscriptions.position(tp1).offset);
+        assertEquals(3, allFetchedRecords.size());
+
+        OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, () ->
+                fetchRecordsInto(allFetchedRecords));
+
+        assertEquals(singleton(tp0), e.offsetOutOfRangePartitions().keySet());
+        assertEquals(1L, e.offsetOutOfRangePartitions().get(tp0).longValue());
+
+        assertEquals(1, subscriptions.position(tp0).offset);
+        assertEquals(4, subscriptions.position(tp1).offset);
+        assertEquals(3, allFetchedRecords.size());
+    }
+
+    private void fetchRecordsInto(List<ConsumerRecord<byte[], byte[]>> 
allFetchedRecords) {
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        fetchedRecords.values().forEach(allFetchedRecords::addAll);
+    }
+
+    @Test
+    public void testCompletedFetchRemoval() {
+        // Ensure the removal of completed fetches that cause an Exception if 
and only if they contain empty records.
+        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_UNCOMMITTED);
+        assignFromUser(mkSet(tp0, tp1, tp2, tp3));
+
+        subscriptions.seek(tp0, 1);
+        subscriptions.seek(tp1, 1);
+        subscriptions.seek(tp2, 1);
+        subscriptions.seek(tp3, 1);
+
+        assertEquals(1, sendFetches());
+
+        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new LinkedHashMap<>();
+        partitions.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setHighWatermark(100)
+                .setRecords(records));
+        partitions.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+                .setHighWatermark(100));
+        partitions.put(tidp2, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp2.partition())
+                .setHighWatermark(100)
+                .setLastStableOffset(4)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        partitions.put(tidp3, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp3.partition())
+                .setHighWatermark(100)
+                .setLastStableOffset(4)
+                .setLogStartOffset(0)
+                .setRecords(partialRecords));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        networkClientDelegate.poll(time.timer(0));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new 
ArrayList<>();
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
recordsByPartition = fetchedRecords();
+        for (List<ConsumerRecord<byte[], byte[]>> records : 
recordsByPartition.values())
+            fetchedRecords.addAll(records);
+
+        assertEquals(fetchedRecords.size(), subscriptions.position(tp1).offset 
- 1);
+        assertEquals(4, subscriptions.position(tp1).offset);
+        assertEquals(3, fetchedRecords.size());
+
+        List<OffsetOutOfRangeException> oorExceptions = new ArrayList<>();
+        try {
+            recordsByPartition = fetchedRecords();
+            for (List<ConsumerRecord<byte[], byte[]>> records : 
recordsByPartition.values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException oor) {
+            oorExceptions.add(oor);
+        }
+
+        // Should have received one OffsetOutOfRangeException for partition tp1
+        assertEquals(1, oorExceptions.size());
+        OffsetOutOfRangeException oor = oorExceptions.get(0);
+        assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
+        assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+
+        recordsByPartition = fetchedRecords();
+        for (List<ConsumerRecord<byte[], byte[]>> records : 
recordsByPartition.values())
+            fetchedRecords.addAll(records);
+
+        // Should not have received an Exception for tp2.
+        assertEquals(6, subscriptions.position(tp2).offset);
+        assertEquals(5, fetchedRecords.size());
+
+        int numExceptionsExpected = 3;
+        List<KafkaException> kafkaExceptions = new ArrayList<>();
+        for (int i = 1; i <= numExceptionsExpected; i++) {
+            try {
+                recordsByPartition = fetchedRecords();
+                for (List<ConsumerRecord<byte[], byte[]>> records : 
recordsByPartition.values())
+                    fetchedRecords.addAll(records);
+            } catch (KafkaException e) {
+                kafkaExceptions.add(e);
+            }
+        }
+        // Should have received as much as numExceptionsExpected Kafka 
exceptions for tp3.
+        assertEquals(numExceptionsExpected, kafkaExceptions.size());
+    }
+
+    @Test
+    public void testSeekBeforeException() {
+        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
+
+        assignFromUser(mkSet(tp0));
+        subscriptions.seek(tp0, 1);
+        assertEquals(1, sendFetches());
+        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
+        partitions.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setHighWatermark(100)
+                .setRecords(records));
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(2, fetchedRecords().get(tp0).size());
+
+        subscriptions.assignFromUser(mkSet(tp0, tp1));
+        subscriptions.seekUnvalidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
+
+        assertEquals(1, sendFetches());
+        partitions = new HashMap<>();
+        partitions.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+                .setHighWatermark(100));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(1, fetchedRecords().get(tp0).size());
+
+        subscriptions.seek(tp1, 10);
+        // Should not throw OffsetOutOfRangeException after the seek
+        assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partitions");
+    }
+
+    @Test
+    public void testFetchDisconnected() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0), true);
+        networkClientDelegate.poll(time.timer(0));
+        assertEmptyFetch("Should not return records or advance position on 
disconnect");
+
+        // disconnects should have no affect on subscription state
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(0, subscriptions.position(tp0).offset);
+    }
+
+    /*
+     * Send multiple requests. Verify that the client side quota metrics have 
the right values
+     */
+    @Test
+    public void testQuotaMetrics() {
+        buildFetcher();
+
+        MockSelector selector = new MockSelector(time);
+        Cluster cluster = TestUtils.singletonCluster("test", 1);
+        Node node = cluster.nodes().get(0);
+        NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
+                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext());
+
+        ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
+                400, ApiMessageType.ListenerType.ZK_BROKER);
+        ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, 
ApiKeys.API_VERSIONS.latestVersion(), 0);
+
+        selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
+        while (!client.ready(node, time.milliseconds())) {
+            client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure 
progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
+        }
+        selector.clear();
+
+        for (int i = 1; i <= 3; i++) {
+            int throttleTimeMs = 100 * i;
+            FetchRequest.Builder builder = 
FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion(), 100, 100, new 
LinkedHashMap<>());
+            builder.rackId("");
+            ClientRequest request = client.newClientRequest(node.idString(), 
builder, time.milliseconds(), true);
+            client.send(request, time.milliseconds());
+            client.poll(1, time.milliseconds());
+            FetchResponse response = fullFetchResponse(tidp0, nextRecords, 
Errors.NONE, i, throttleTimeMs);
+            buffer = RequestTestUtils.serializeResponseWithHeader(response, 
ApiKeys.FETCH.latestVersion(), request.correlationId());
+            selector.completeReceive(new NetworkReceive(node.idString(), 
buffer));
+            client.poll(1, time.milliseconds());
+            // If a throttled response is received, advance the time to ensure 
progress.
+            time.sleep(client.throttleDelayMs(node, time.milliseconds()));
+            selector.clear();
+        }
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric avgMetric = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
+        KafkaMetric maxMetric = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
+        // Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
+        assertEquals(250, (Double) avgMetric.metricValue(), EPSILON);
+        assertEquals(400, (Double) maxMetric.metricValue(), EPSILON);
+        client.close();
+    }
+
+    /*
+     * Send multiple requests. Verify that the client side quota metrics have 
the right values
+     */
+    @Test
+    public void testFetcherMetrics() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        MetricName maxLagMetric = 
metrics.metricInstance(metricsRegistry.recordsLagMax);
+        Map<String, String> tags = new HashMap<>();
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLagMetric = metrics.metricName("records-lag", 
metricGroup, tags);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
+
+        // recordsFetchLagMax should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLagMax.metricValue(), 
EPSILON);
+
+        // recordsFetchLagMax should be hw - fetchOffset after receiving an 
empty FetchResponse
+        fetchRecords(tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
+        assertEquals(100, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+
+        KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
+        assertEquals(100, (Double) partitionLag.metricValue(), EPSILON);
+
+        // recordsFetchLagMax should be hw - offset of the last message after 
receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        fetchRecords(tidp0, builder.build(), Errors.NONE, 200L, 0);
+        assertEquals(197, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+        assertEquals(197, (Double) partitionLag.metricValue(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        sendFetches();
+        assertFalse(allMetrics.containsKey(partitionLagMetric));
+    }
+
+    @Test
+    public void testFetcherLeadMetric() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        MetricName minLeadMetric = 
metrics.metricInstance(metricsRegistry.recordsLeadMin);
+        Map<String, String> tags = new HashMap<>(2);
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLeadMetric = metrics.metricName("records-lead", 
metricGroup, "", tags);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
+
+        // recordsFetchLeadMin should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLeadMin.metricValue(), 
EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after 
receiving an empty FetchResponse
+        fetchRecords(tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 
0);
+        assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
+
+        KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
+        assertEquals(0L, (Double) partitionLead.metricValue(), EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after 
receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++) {
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        }
+        fetchRecords(tidp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
+        assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
+        assertEquals(3L, (Double) partitionLead.metricValue(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        sendFetches();
+        assertFalse(allMetrics.containsKey(partitionLeadMetric));
+    }
+
+    @Test
+    public void testReadCommittedLagMetric() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        MetricName maxLagMetric = 
metrics.metricInstance(metricsRegistry.recordsLagMax);
+
+        Map<String, String> tags = new HashMap<>();
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLagMetric = metrics.metricName("records-lag", 
metricGroup, tags);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
+
+        // recordsFetchLagMax should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLagMax.metricValue(), 
EPSILON);
+
+        // recordsFetchLagMax should be lso - fetchOffset after receiving an 
empty FetchResponse
+        fetchRecords(tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
+        assertEquals(50, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+
+        KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
+        assertEquals(50, (Double) partitionLag.metricValue(), EPSILON);
+
+        // recordsFetchLagMax should be lso - offset of the last message after 
receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        fetchRecords(tidp0, builder.build(), Errors.NONE, 200L, 150L, 0);
+        assertEquals(147, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+        assertEquals(147, (Double) partitionLag.metricValue(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        sendFetches();
+        assertFalse(allMetrics.containsKey(partitionLagMetric));
+    }
+
+    @Test
+    public void testFetchResponseMetrics() {
+        buildFetcher();
+
+        String topic1 = "foo";
+        String topic2 = "bar";
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+
+        subscriptions.assignFromUser(mkSet(tp1, tp2));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic1, 1);
+        partitionCounts.put(topic2, 1);
+        topicIds.put(topic1, Uuid.randomUuid());
+        topicIds.put(topic2, Uuid.randomUuid());
+        TopicIdPartition tidp1 = new TopicIdPartition(topicIds.get(topic1), 
tp1);
+        TopicIdPartition tidp2 = new TopicIdPartition(topicIds.get(topic2), 
tp2);
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
partitionCounts, tp -> validLeaderEpoch, topicIds));
+
+        int expectedBytes = 0;
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
fetchPartitionData = new LinkedHashMap<>();
+
+        for (TopicIdPartition tp : mkSet(tidp1, tidp2)) {
+            subscriptions.seek(tp.topicPartition(), 0);
+
+            MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                    TimestampType.CREATE_TIME, 0L);
+            for (int v = 0; v < 3; v++)
+                builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+            MemoryRecords records = builder.build();
+            for (Record record : records.records())
+                expectedBytes += record.sizeInBytes();
+
+            fetchPartitionData.put(tp, new FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp.topicPartition().partition())
+                    .setHighWatermark(15)
+                    .setLogStartOffset(0)
+                    .setRecords(records));
+        }
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, fetchPartitionData));
+        networkClientDelegate.poll(time.timer(0));
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertEquals(3, fetchedRecords.get(tp1).size());
+        assertEquals(3, fetchedRecords.get(tp2).size());
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), 
EPSILON);
+        assertEquals(6, (Double) recordsCountAverage.metricValue(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsPartialResponse() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
+
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        int expectedBytes = 0;
+        for (Record record : records.records()) {
+            if (record.offset() >= 1)
+                expectedBytes += record.sizeInBytes();
+        }
+
+        fetchRecords(tidp0, records, Errors.NONE, 100L, 0);
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), 
EPSILON);
+        assertEquals(2, (Double) recordsCountAverage.metricValue(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsWithOnePartitionError() {
+        buildFetcher();
+        assignFromUser(mkSet(tp0, tp1));
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
+
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
+        partitions.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setHighWatermark(100)
+                .setLogStartOffset(0)
+                .setRecords(records));
+        partitions.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+                .setHighWatermark(100)
+                .setLogStartOffset(0));
+
+        assertEquals(1, sendFetches());
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        networkClientDelegate.poll(time.timer(0));
+        fetcher.collectFetch();
+
+        int expectedBytes = 0;
+        for (Record record : records.records())
+            expectedBytes += record.sizeInBytes();
+
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), 
EPSILON);
+        assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
+    }
+
+    @Test
+    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
+        buildFetcher();
+
+        assignFromUser(mkSet(tp0, tp1));
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = 
allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
+
+        // send the fetch and then seek to a new offset
+        assertEquals(1, sendFetches());
+        subscriptions.seek(tp1, 5);
+
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
+        MemoryRecords records = builder.build();
+
+        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
+        partitions.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setHighWatermark(100)
+                .setLogStartOffset(0)
+                .setRecords(records));
+        partitions.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setHighWatermark(100)
+                .setLogStartOffset(0)
+                .setRecords(MemoryRecords.withRecords(CompressionType.NONE, 
new SimpleRecord("val".getBytes()))));
+
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        networkClientDelegate.poll(time.timer(0));
+        fetcher.collectFetch();
+
+        // we should have ignored the record at the wrong offset
+        int expectedBytes = 0;
+        for (Record record : records.records())
+            expectedBytes += record.sizeInBytes();
+
+        assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), 
EPSILON);
+        assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
+    }
+
+    @Test
+    public void testFetcherMetricsTemplates() {
+        Map<String, String> clientTags = Collections.singletonMap("client-id", 
"clientA");
+        buildFetcher(new MetricConfig().tags(clientTags), 
OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_UNCOMMITTED);
+
+        // Fetch from topic to generate topic metrics
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        // Verify that all metrics except metrics-count have registered 
templates
+        Set<MetricNameTemplate> allMetrics = new HashSet<>();
+        for (MetricName n : metrics.metrics().keySet()) {
+            String name = n.name().replaceAll(tp0.toString(), 
"{topic}-{partition}");
+            if (!n.group().equals("kafka-metrics-count"))
+                allMetrics.add(new MetricNameTemplate(name, n.group(), "", 
n.tags().keySet()));
+        }
+        TestUtils.checkEquals(allMetrics, new 
HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates");
+    }
+
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchRecords(
+            TopicIdPartition tp, MemoryRecords records, Errors error, long hw, 
int throttleTime) {
+        return fetchRecords(tp, records, error, hw, 
FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime);
+    }
+
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchRecords(
+            TopicIdPartition tp, MemoryRecords records, Errors error, long hw, 
long lastStableOffset, int throttleTime) {
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tp, records, error, hw, 
lastStableOffset, throttleTime));
+        networkClientDelegate.poll(time.timer(0));
+        return fetchedRecords();
+    }
+
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchRecords(
+            TopicIdPartition tp, MemoryRecords records, Errors error, long hw, 
long lastStableOffset, long logStartOffset, int throttleTime) {
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fetchResponse(tp, records, error, hw, 
lastStableOffset, logStartOffset, throttleTime));
+        networkClientDelegate.poll(time.timer(0));
+        return fetchedRecords();
+    }
+
+    @Test
+    public void testSkippingAbortedTransactions() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 0;
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+        abortTransaction(buffer, 1L, currentOffset);
+
+        buffer.flip();
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(1).setFirstOffset(0));
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Fetch<byte[], byte[]> fetch = collectFetch();
+        assertEquals(emptyMap(), fetch.records());
+        assertTrue(fetch.positionAdvanced());
+    }
+
+    @Test
+    public void testReturnCommittedTransactions() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 0;
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(body -> {
+            FetchRequest request = (FetchRequest) body;
+            assertEquals(IsolationLevel.READ_COMMITTED, 
request.isolationLevel());
+            return true;
+        }, fullFetchResponseWithAbortedTransactions(records, 
Collections.emptyList(), Errors.NONE, 100L, 100L, 0));
+
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
+    @Test
+    public void testReadCommittedWithCommittedAndAbortedTransactions() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = new 
ArrayList<>();
+
+        long pid1 = 1L;
+        long pid2 = 2L;
+
+        // Appends for producer 1 (eventually committed)
+        appendTransactionalRecords(buffer, pid1, 0L,
+                new SimpleRecord("commit1-1".getBytes(), "value".getBytes()),
+                new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
+
+        // Appends for producer 2 (eventually aborted)
+        appendTransactionalRecords(buffer, pid2, 2L,
+                new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
+
+        // commit producer 1
+        commitTransaction(buffer, pid1, 3L);
+
+        // append more for producer 2 (eventually aborted)
+        appendTransactionalRecords(buffer, pid2, 4L,
+                new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
+
+        // abort producer 2
+        abortTransaction(buffer, pid2, 5L);
+        abortedTransactions.add(new 
FetchResponseData.AbortedTransaction().setProducerId(pid2).setFirstOffset(2L));
+
+        // New transaction for producer 1 (eventually aborted)
+        appendTransactionalRecords(buffer, pid1, 6L,
+                new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
+
+        // New transaction for producer 2 (eventually committed)
+        appendTransactionalRecords(buffer, pid2, 7L,
+                new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
+
+        // Add messages for producer 1 (eventually aborted)
+        appendTransactionalRecords(buffer, pid1, 8L,
+                new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
+
+        // abort producer 1
+        abortTransaction(buffer, pid1, 9L);
+        abortedTransactions.add(new 
FetchResponseData.AbortedTransaction().setProducerId(1).setFirstOffset(6));
+
+        // commit producer 2
+        commitTransaction(buffer, pid2, 10L);
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        // There are only 3 committed records
+        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = 
fetchedRecords.get(tp0);
+        Set<String> fetchedKeys = new HashSet<>();
+        for (ConsumerRecord<byte[], byte[]> consumerRecord : 
fetchedConsumerRecords) {
+            fetchedKeys.add(new String(consumerRecord.key(), 
StandardCharsets.UTF_8));
+        }
+        assertEquals(mkSet("commit1-1", "commit1-2", "commit2-1"), 
fetchedKeys);
+    }
+
+    @Test
+    public void testMultipleAbortMarkers() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 0;
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), 
"value".getBytes()));
+
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
+        // Duplicate abort -- should be ignored.
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
+        // Now commit a transaction.
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), 
"value".getBytes()));
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(1).setFirstOffset(0)
+        );
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = 
fetchedRecords.get(tp0);
+        Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", 
"commit1-2"));
+        Set<String> actuallyCommittedKeys = new HashSet<>();
+        for (ConsumerRecord<byte[], byte[]> consumerRecord : 
fetchedConsumerRecords) {
+            actuallyCommittedKeys.add(new String(consumerRecord.key(), 
StandardCharsets.UTF_8));
+        }
+        assertEquals(actuallyCommittedKeys, committedKeys);
+    }
+
+    @Test
+    public void testReadCommittedAbortMarkerWithNoData() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(),
+                new StringDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        long producerId = 1L;
+
+        abortTransaction(buffer, producerId, 5L);
+
+        appendTransactionalRecords(buffer, producerId, 6L,
+                new SimpleRecord("6".getBytes(), null),
+                new SimpleRecord("7".getBytes(), null),
+                new SimpleRecord("8".getBytes(), null));
+
+        commitTransaction(buffer, producerId, 9L);
+
+        buffer.flip();
+
+        // send the fetch
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+
+        // prepare the response. the aborted transactions begin at offsets 
which are no longer in the log
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(producerId).setFirstOffset(0L));
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(buffer),
+                abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> 
allFetchedRecords = fetchedRecords();
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<String, String>> fetchedRecords = 
allFetchedRecords.get(tp0);
+        assertEquals(3, fetchedRecords.size());
+        assertEquals(Arrays.asList(6L, 7L, 8L), 
collectRecordOffsets(fetchedRecords));
+    }
+
+    @Test
+    public void testUpdatePositionWithLastRecordMissingFromBatch() {
+        buildFetcher();
+
+        MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord("0".getBytes(), "v".getBytes()),
+                new SimpleRecord("1".getBytes(), "v".getBytes()),
+                new SimpleRecord("2".getBytes(), "v".getBytes()),
+                new SimpleRecord(null, "value".getBytes()));
+
+        // Remove the last record to simulate compaction
+        MemoryRecords.FilterResult result = records.filterTo(tp0, new 
MemoryRecords.RecordFilter(0, 0) {
+            @Override
+            protected BatchRetentionResult checkBatchRetention(RecordBatch 
batch) {
+                return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, 
false);
+            }
+
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, 
Record record) {
+                return record.key() != null;
+            }
+        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
+        result.outputBuffer().flip();
+        MemoryRecords compactedRecords = 
MemoryRecords.readableRecords(result.outputBuffer());
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, compactedRecords, 
Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
allFetchedRecords = fetchedRecords();
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
allFetchedRecords.get(tp0);
+        assertEquals(3, fetchedRecords.size());
+
+        for (int i = 0; i < 3; i++) {
+            assertEquals(Integer.toString(i), new 
String(fetchedRecords.get(i).key()));
+        }
+
+        // The next offset should point to the next batch
+        assertEquals(4L, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testUpdatePositionOnEmptyBatch() {
+        buildFetcher();
+
+        long producerId = 1;
+        short producerEpoch = 0;
+        int sequence = 1;
+        long baseOffset = 37;
+        long lastOffset = 54;
+        int partitionLeaderEpoch = 7;
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+                sequence, baseOffset, lastOffset, partitionLeaderEpoch, 
TimestampType.CREATE_TIME,
+                System.currentTimeMillis(), false, false);
+        buffer.flip();
+        MemoryRecords recordsWithEmptyBatch = 
MemoryRecords.readableRecords(buffer);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, recordsWithEmptyBatch, 
Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Fetch<byte[], byte[]> fetch = collectFetch();
+        assertEquals(emptyMap(), fetch.records());
+        assertTrue(fetch.positionAdvanced());
+
+        // The next offset should point to the next batch
+        assertEquals(lastOffset + 1, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testReadCommittedWithCompactedTopic() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(),
+                new StringDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        long pid1 = 1L;
+        long pid2 = 2L;
+        long pid3 = 3L;
+
+        appendTransactionalRecords(buffer, pid3, 3L,
+                new SimpleRecord("3".getBytes(), "value".getBytes()),
+                new SimpleRecord("4".getBytes(), "value".getBytes()));
+
+        appendTransactionalRecords(buffer, pid2, 15L,
+                new SimpleRecord("15".getBytes(), "value".getBytes()),
+                new SimpleRecord("16".getBytes(), "value".getBytes()),
+                new SimpleRecord("17".getBytes(), "value".getBytes()));
+
+        appendTransactionalRecords(buffer, pid1, 22L,
+                new SimpleRecord("22".getBytes(), "value".getBytes()),
+                new SimpleRecord("23".getBytes(), "value".getBytes()));
+
+        abortTransaction(buffer, pid2, 28L);
+
+        appendTransactionalRecords(buffer, pid3, 30L,
+                new SimpleRecord("30".getBytes(), "value".getBytes()),
+                new SimpleRecord("31".getBytes(), "value".getBytes()),
+                new SimpleRecord("32".getBytes(), "value".getBytes()));
+
+        commitTransaction(buffer, pid3, 35L);
+
+        appendTransactionalRecords(buffer, pid1, 39L,
+                new SimpleRecord("39".getBytes(), "value".getBytes()),
+                new SimpleRecord("40".getBytes(), "value".getBytes()));
+
+        // transaction from pid1 is aborted, but the marker is not included in 
the fetch
+
+        buffer.flip();
+
+        // send the fetch
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+
+        // prepare the response. the aborted transactions begin at offsets 
which are no longer in the log
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Arrays.asList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(pid2).setFirstOffset(6),
+                new 
FetchResponseData.AbortedTransaction().setProducerId(pid1).setFirstOffset(0)
+        );
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(buffer),
+                abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> 
allFetchedRecords = fetchedRecords();
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<String, String>> fetchedRecords = 
allFetchedRecords.get(tp0);
+        assertEquals(5, fetchedRecords.size());
+        assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), 
collectRecordOffsets(fetchedRecords));
+    }
+
+    @Test
+    public void testReturnAbortedTransactionsinUncommittedMode() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_UNCOMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 0;
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+        abortTransaction(buffer, 1L, currentOffset);
+
+        buffer.flip();
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(1).setFirstOffset(0));
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+    }
+
+    @Test
+    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        long currentOffset = 0;
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), 
"value".getBytes()));
+
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(1).setFirstOffset(0));
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        
client.prepareResponse(fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+
+        // Ensure that we don't return any of the aborted records, but yet 
advance the consumer position.
+        assertFalse(fetchedRecords.containsKey(tp0));
+        assertEquals(currentOffset, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testConsumingViaIncrementalFetchRequests() {
+        buildFetcher(2);
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        subscriptions.seekValidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.empty(), 
metadata.currentLeader(tp0)));
+        subscriptions.seekValidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
+
+        // Fetch some records and establish an incremental fetch session.
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions1 = new LinkedHashMap<>();
+        partitions1.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setHighWatermark(2)
+                .setLastStableOffset(2)
+                .setLogStartOffset(0)
+                .setRecords(this.records));
+        partitions1.put(tidp1, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp1.partition())
+                .setHighWatermark(100)
+                .setLogStartOffset(0)
+                .setRecords(emptyRecords));
+        FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions1);
+        client.prepareResponse(resp1);
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertFalse(fetchedRecords.containsKey(tp1));
+        records = fetchedRecords.get(tp0);
+        assertEquals(2, records.size());
+        assertEquals(3L, subscriptions.position(tp0).offset);
+        assertEquals(1L, subscriptions.position(tp1).offset);
+        assertEquals(1, records.get(0).offset());
+        assertEquals(2, records.get(1).offset());
+
+        // There is still a buffered record.
+        assertEquals(0, sendFetches());
+        fetchedRecords = fetchedRecords();
+        assertFalse(fetchedRecords.containsKey(tp1));
+        records = fetchedRecords.get(tp0);
+        assertEquals(1, records.size());
+        assertEquals(3, records.get(0).offset());
+        assertEquals(4L, subscriptions.position(tp0).offset);
+
+        // The second response contains no new records.
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions2 = new LinkedHashMap<>();
+        FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions2);
+        client.prepareResponse(resp2);
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.isEmpty());
+        assertEquals(4L, subscriptions.position(tp0).offset);
+        assertEquals(1L, subscriptions.position(tp1).offset);
+
+        // The third response contains some new records for tp0.
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions3 = new LinkedHashMap<>();
+        partitions3.put(tidp0, new FetchResponseData.PartitionData()
+                .setPartitionIndex(tp0.partition())
+                .setHighWatermark(100)
+                .setLastStableOffset(4)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions3);
+        client.prepareResponse(resp3);
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        fetchedRecords = fetchedRecords();
+        assertFalse(fetchedRecords.containsKey(tp1));
+        records = fetchedRecords.get(tp0);
+        assertEquals(2, records.size());
+        assertEquals(6L, subscriptions.position(tp0).offset);
+        assertEquals(1L, subscriptions.position(tp1).offset);
+        assertEquals(4, records.get(0).offset());
+        assertEquals(5, records.get(1).offset());
+    }
+
+    @Test
+    public void testEmptyControlBatch() {
+        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, 
TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(body -> {
+            FetchRequest request = (FetchRequest) body;
+            assertEquals(IsolationLevel.READ_COMMITTED, 
request.isolationLevel());
+            return true;
+        }, fullFetchResponseWithAbortedTransactions(records, 
Collections.emptyList(), Errors.NONE, 100L, 100L, 0));
+
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
+    private MemoryRecords buildRecords(long baseOffset, int count, long 
firstMessageId) {
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, baseOffset);
+        for (int i = 0; i < count; i++)
+            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + 
i)).getBytes());
+        return builder.build();
+    }
+
+    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long 
baseOffset, int baseSequence, SimpleRecord... records) {
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, baseOffset, time.milliseconds(), 
pid, (short) 0, baseSequence, true,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+
+        for (SimpleRecord record : records) {
+            builder.append(record);
+        }
+        builder.build();
+        return records.length;
+    }
+
+    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long 
baseOffset, SimpleRecord... records) {
+        return appendTransactionalRecords(buffer, pid, baseOffset, (int) 
baseOffset, records);
+    }
+
+    private void commitTransaction(ByteBuffer buffer, long producerId, long 
baseOffset) {
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, 
time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
+                new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+    }
+
+    private int abortTransaction(ByteBuffer buffer, long producerId, long 
baseOffset) {
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, 
time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
+                new EndTransactionMarker(ControlRecordType.ABORT, 0));
+        return 1;
+    }
+
+    @Test
+    public void testSubscriptionPositionUpdatedWithEpoch() {
+        // Create some records that include a leader epoch (1)
+        MemoryRecordsBuilder builder = MemoryRecords.builder(
+                ByteBuffer.allocate(1024),
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE,
+                TimestampType.CREATE_TIME,
+                0L,
+                RecordBatch.NO_TIMESTAMP,
+                RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE,
+                false,
+                1
+        );
+        builder.appendWithOffset(0L, 0L, "key".getBytes(), 
"value-1".getBytes());
+        builder.appendWithOffset(1L, 0L, "key".getBytes(), 
"value-2".getBytes());
+        builder.appendWithOffset(2L, 0L, "key".getBytes(), 
"value-3".getBytes());
+        MemoryRecords records = builder.build();
+
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        // Initialize the epoch=1
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), 
partitionCounts, tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Seek
+        subscriptions.seek(tp0, 0);
+
+        // Do a normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.pollNoWakeup();
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        assertEquals(subscriptions.position(tp0).offset, 3L);
+        assertOptional(subscriptions.position(tp0).offsetEpoch, value -> 
assertEquals(value.intValue(), 1));
+    }
+
+    @Test
+    public void testPreferredReadReplica() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+
+        // Node preferred replica before first fetch response
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        // Verify
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=2, which isn't in our metadata, 
should revert to leader
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
+    @Test
+    public void testFetchDisconnectedShouldClearPreferredReadReplica() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect - preferred read replica should be cleared.
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0), true);
+
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
+    @Test
+    public void 
testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
+    @Test
+    public void testFetchErrorShouldClearPreferredReadReplica() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Error - preferred read replica should be cleared. An actual error 
response will contain -1 as the
+        // preferred read replica. In the test we want to ensure that we are 
handling the error.
+        client.prepareResponse(fullFetchResponse(tidp0, MemoryRecords.EMPTY, 
Errors.NOT_LEADER_OR_FOLLOWER, -1L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
+    @Test
+    public void testPreferredReadReplicaOffsetError() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), 1);
+
+        // Return an error, should unset the preferred read replica
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.empty()));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(selected.id(), -1);
+    }
+
+    @Test
+    public void testFetchCompletedBeforeHandlerAdded() {

Review Comment:
   Filed KAFKA-15637.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to