Repository: kafka Updated Branches: refs/heads/trunk 148f8c254 -> 17ce2a730
KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased Author: Dong Lin <[email protected]> Author: Dong Lin <[email protected]> Reviewers: Jiangjie Qin <[email protected]> Closes #2859 from lindong28/KAFKA-5075 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17ce2a73 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17ce2a73 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17ce2a73 Branch: refs/heads/trunk Commit: 17ce2a7307222d7476eb60318fab2e672eebe559 Parents: 148f8c2 Author: Dong Lin <[email protected]> Authored: Sun Apr 16 23:23:25 2017 -0700 Committer: Jiangjie Qin <[email protected]> Committed: Sun Apr 16 23:23:25 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 81 ++++++++++++++------ .../clients/consumer/internals/FetcherTest.java | 77 +++++++++++++++++++ .../kafka/api/AuthorizerIntegrationTest.scala | 2 + 3 files changed, 135 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2eeef11..6b0adc7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -99,8 +99,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; - private PartitionRecords nextInLineRecords = null; + private ExceptionMetadata nextInLineExceptionMetadata = null; public Fetcher(ConsumerNetworkClient client, int minBytes, @@ -467,37 +467,55 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { * the defaultResetPolicy is NONE */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { + if (nextInLineExceptionMetadata != null) { + ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata; + nextInLineExceptionMetadata = null; + TopicPartition tp = exceptionMetadata.partition; + if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset) + throw exceptionMetadata.exception; + } Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; + // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch + TopicPartition fetchedPartition = null; + long fetchedOffset = -1; - while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isFetched) { - CompletedFetch completedFetch = completedFetches.poll(); - if (completedFetch == null) - break; - - nextInLineRecords = parseCompletedFetch(completedFetch); - } else { - TopicPartition partition = nextInLineRecords.partition; - List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); - if (!records.isEmpty()) { - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + try { + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isFetched) { + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) break; + + fetchedPartition = completedFetch.partition; + fetchedOffset = completedFetch.fetchedOffset; + nextInLineRecords = parseCompletedFetch(completedFetch); + } else { + fetchedPartition = nextInLineRecords.partition; + fetchedOffset = nextInLineRecords.nextFetchOffset; + List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); + TopicPartition partition = nextInLineRecords.partition; + if (!records.isEmpty()) { + List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); + if (currentRecords == null) { + fetched.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + fetched.put(partition, newRecords); + } + recordsRemaining -= records.size(); } - recordsRemaining -= records.size(); } } + } catch (KafkaException e) { + if (fetched.isEmpty()) + throw e; + nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e); } - return fetched; } @@ -969,6 +987,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } } + private static class ExceptionMetadata { + private final TopicPartition partition; + private final long fetchedOffset; + private final KafkaException exception; + + private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) { + this.partition = partition; + this.fetchedOffset = fetchedOffset; + this.exception = exception; + } + } + private static class CompletedFetch { private final TopicPartition partition; private final long fetchedOffset; @@ -1189,6 +1219,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { public void close() { if (nextInLineRecords != null) nextInLineRecords.drain(); + nextInLineExceptionMetadata = null; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 092f549..b8f493a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -593,6 +593,83 @@ public class FetcherTest { } @Test + public void testFetchPositionAfterException() { + // verify the advancement in the next fetch offset equals the number of fetched records when + // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2)); + subscriptionsNoAutoReset.seek(tp1, 1); + subscriptionsNoAutoReset.seek(tp2, 1); + + assertEquals(1, fetcherNoAutoReset.sendFetches()); + + Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); + partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); + partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); + client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); + consumerClient.poll(0); + + List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>(); + List<OffsetOutOfRangeException> exceptions = new ArrayList<>(); + + try { + for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) + fetchedRecords.addAll(records); + } catch (OffsetOutOfRangeException e) { + exceptions.add(e); + } + + assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1); + + try { + for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) + fetchedRecords.addAll(records); + } catch (OffsetOutOfRangeException e) { + exceptions.add(e); + } + + assertEquals(4, subscriptionsNoAutoReset.position(tp2).longValue()); + assertEquals(3, fetchedRecords.size()); + + // Should have received one OffsetOutOfRangeException for partition tp1 + assertEquals(1, exceptions.size()); + OffsetOutOfRangeException e = exceptions.get(0); + assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); + assertEquals(e.offsetOutOfRangePartitions().size(), 1); + } + + @Test + public void testSeekBeforeException() { + Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2); + + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1)); + subscriptionsNoAutoReset.seek(tp1, 1); + assertEquals(1, fetcher.sendFetches()); + Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); + partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(0); + + assertEquals(2, fetcher.fetchedRecords().get(tp1).size()); + + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2)); + subscriptionsNoAutoReset.seek(tp2, 1); + assertEquals(1, fetcher.sendFetches()); + partitions = new HashMap<>(); + partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); + client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); + consumerClient.poll(0); + assertEquals(1, fetcher.fetchedRecords().get(tp1).size()); + + subscriptionsNoAutoReset.seek(tp2, 10); + // Should not throw OffsetOutOfRangeException after the seek + assertEquals(0, fetcher.fetchedRecords().size()); + } + + @Test public void testFetchDisconnected() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 1300629..757e216 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -614,6 +614,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) try { consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException + consumeRecords(consumer) consumeRecords(consumer) Assert.fail("Expected TopicAuthorizationException") } catch {
