Repository: kafka Updated Branches: refs/heads/0.10.2 863867582 -> a26731674
KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased Author: Dong Lin <[email protected]> Author: Dong Lin <[email protected]> Reviewers: Jiangjie Qin <[email protected]> Closes #2859 from lindong28/KAFKA-5075 This is a backport patch for 0.10.2 after resolving the following conflict. Conflicts: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2673167 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2673167 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2673167 Branch: refs/heads/0.10.2 Commit: a26731674815a57368ad4500a77e84a218a40e63 Parents: 8638675 Author: Dong Lin <[email protected]> Authored: Sun Apr 16 23:23:25 2017 -0700 Committer: Jiangjie Qin <[email protected]> Committed: Mon Apr 17 17:27:41 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 78 ++++++++++----- .../clients/consumer/internals/FetcherTest.java | 99 +++++++++++++++++--- .../kafka/api/AuthorizerIntegrationTest.scala | 1 + 3 files changed, 143 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 6a13d46..c8bbfa3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -95,6 +95,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { private final Deserializer<V> valueDeserializer; private PartitionRecords<K, V> nextInLineRecords = null; + private ExceptionMetadata nextInLineExceptionMetadata = null; public Fetcher(ConsumerNetworkClient client, int minBytes, @@ -461,35 +462,54 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { * the defaultResetPolicy is NONE */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { + if (nextInLineExceptionMetadata != null) { + ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata; + nextInLineExceptionMetadata = null; + TopicPartition tp = exceptionMetadata.partition; + if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset) + throw exceptionMetadata.exception; + } + Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; + TopicPartition fetchedPartition = null; + long fetchedOffsets = -1L; - while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isDrained()) { - CompletedFetch completedFetch = completedFetches.poll(); - if (completedFetch == null) - break; - - nextInLineRecords = parseCompletedFetch(completedFetch); - } else { - TopicPartition partition = nextInLineRecords.partition; - List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); - if (!records.isEmpty()) { - List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); - if (currentRecords == null) { - drained.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - drained.put(partition, newRecords); + try { + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isDrained()) { + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) break; + + fetchedPartition = completedFetch.partition; + fetchedOffsets = completedFetch.fetchedOffset; + nextInLineRecords = parseCompletedFetch(completedFetch); + } else { + TopicPartition partition = nextInLineRecords.partition; + fetchedPartition = partition; + fetchedOffsets = subscriptions.position(partition); + List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); + if (!records.isEmpty()) { + List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); + if (currentRecords == null) { + drained.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + drained.put(partition, newRecords); + } + recordsRemaining -= records.size(); } - recordsRemaining -= records.size(); } } + } catch (KafkaException e) { + if (drained.isEmpty()) + throw e; + nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffsets, e); } return drained; @@ -1139,4 +1159,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { } } + private static class ExceptionMetadata { + private final TopicPartition partition; + private final long fetchedOffset; + private final KafkaException exception; + + private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) { + this.partition = partition; + this.fetchedOffset = fetchedOffset; + this.exception = exception; + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4d388e6..24ba434 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -84,6 +85,7 @@ public class FetcherTest { private String groupId = "test-group"; private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; private TopicPartition tp = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); private int minBytes = 1; private int maxBytes = Integer.MAX_VALUE; private int maxWaitMs = 0; @@ -92,7 +94,7 @@ public class FetcherTest { private MockTime time = new MockTime(1); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private MockClient client = new MockClient(time, metadata); - private Cluster cluster = TestUtils.singletonCluster(topicName, 1); + private Cluster cluster = TestUtils.singletonCluster(topicName, 2); private Node node = cluster.nodes().get(0); private Metrics metrics = new Metrics(time); private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); @@ -827,28 +829,28 @@ public class FetcherTest { } private void testGetOffsetsForTimesWithError(Errors errorForTp0, - Errors errorForTp1, + Errors errorFortp, long offsetForTp0, - long offsetForTp1, + long offsetFortp, Long expectedOffsetForTp0, - Long expectedOffsetForTp1) { + Long expectedOffsetFortp) { client.reset(); TopicPartition tp0 = tp; - TopicPartition tp1 = new TopicPartition(topicName, 1); + TopicPartition tp = new TopicPartition(topicName, 1); // Ensure metadata has both partition. Cluster cluster = TestUtils.clusterWith(2, topicName, 2); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); // First try should fail due to metadata error. client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0)); - client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1)); + client.prepareResponseFrom(listOffsetResponse(tp, errorFortp, offsetFortp, offsetFortp), cluster.leaderFor(tp)); // Second try should succeed. client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0)); - client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1)); + client.prepareResponseFrom(listOffsetResponse(tp, Errors.NONE, offsetFortp, offsetFortp), cluster.leaderFor(tp)); Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); timestampToSearch.put(tp0, 0L); - timestampToSearch.put(tp1, 0L); + timestampToSearch.put(tp, 0L); Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE); if (expectedOffsetForTp0 == null) @@ -858,14 +860,87 @@ public class FetcherTest { assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset()); } - if (expectedOffsetForTp1 == null) - assertNull(offsetAndTimestampMap.get(tp1)); + if (expectedOffsetFortp == null) + assertNull(offsetAndTimestampMap.get(tp)); else { - assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp()); - assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset()); + assertEquals(expectedOffsetFortp.longValue(), offsetAndTimestampMap.get(tp).timestamp()); + assertEquals(expectedOffsetFortp.longValue(), offsetAndTimestampMap.get(tp).offset()); } } + @Test + public void testFetchPositionAfterException() { + // verify the advancement in the next fetch offset equals the number of fetched records when + // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp, tp1)); + subscriptionsNoAutoReset.seek(tp, 1); + subscriptionsNoAutoReset.seek(tp1, 1); + + assertEquals(1, fetcherNoAutoReset.sendFetches()); + + Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); + partitions.put(tp, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(), 100, MemoryRecords.EMPTY)); + partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE.code(), 100, records)); + client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); + consumerClient.poll(0); + + List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>(); + List<OffsetOutOfRangeException> exceptions = new ArrayList<>(); + + try { + for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) + fetchedRecords.addAll(records); + } catch (OffsetOutOfRangeException e) { + exceptions.add(e); + } + + assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp1) - 1); + + try { + for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) + fetchedRecords.addAll(records); + } catch (OffsetOutOfRangeException e) { + exceptions.add(e); + } + + assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue()); + assertEquals(3, fetchedRecords.size()); + + // Should have received one OffsetOutOfRangeException for partition tp + assertEquals(1, exceptions.size()); + OffsetOutOfRangeException e = exceptions.get(0); + assertTrue(e.offsetOutOfRangePartitions().containsKey(tp)); + assertEquals(e.offsetOutOfRangePartitions().size(), 1); + } + + @Test + public void testSeekBeforeException() { + Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2); + + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp)); + subscriptionsNoAutoReset.seek(tp, 1); + assertEquals(1, fetcher.sendFetches()); + Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); + partitions.put(tp, new FetchResponse.PartitionData(Errors.NONE.code(), 100, records)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + consumerClient.poll(0); + + assertEquals(2, fetcher.fetchedRecords().get(tp).size()); + + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp, tp1)); + subscriptionsNoAutoReset.seek(tp1, 1); + assertEquals(1, fetcher.sendFetches()); + partitions = new HashMap<>(); + partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(), 100, MemoryRecords.EMPTY)); + client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); + consumerClient.poll(0); + assertEquals(1, fetcher.fetchedRecords().get(tp).size()); + + subscriptionsNoAutoReset.seek(tp1, 10); + // Should not throw OffsetOutOfRangeException after the seek + assertEquals(0, fetcher.fetchedRecords().size()); + } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp return new MockClient.RequestMatcher() { http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c9d35af..1ba3214 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -605,6 +605,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { try { consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) consumeRecords(consumer) + consumeRecords(consumer) Assert.fail("Expected TopicAuthorizationException") } catch { case _: TopicAuthorizationException => //expected
