Repository: kafka Updated Branches: refs/heads/trunk f0acc4a0d -> efeaf1298
KAFKA-4405; Avoid unnecessary network poll in consumer if no fetches sent Author: Eno Thereska <eno.there...@gmail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #2193 from enothereska/KAFKA-4405-prefetch Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efeaf129 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efeaf129 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efeaf129 Branch: refs/heads/trunk Commit: efeaf129890c2195b4753d5b9eece4f1b7cdf756 Parents: f0acc4a Author: Eno Thereska <eno.there...@gmail.com> Authored: Mon Dec 12 09:38:25 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Dec 12 09:38:25 2016 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 5 +-- .../clients/consumer/internals/Fetcher.java | 19 ++++++---- .../consumer/internals/SubscriptionState.java | 2 +- .../clients/consumer/internals/FetcherTest.java | 38 ++++++++++---------- 4 files changed, 36 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f89460b..5e66a8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -992,8 +992,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. - fetcher.sendFetches(); - client.pollNoWakeup(); + if (fetcher.sendFetches() > 0) { + client.pollNoWakeup(); + } if (this.interceptors == null) return new ConsumerRecords<>(records); http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4bfe466..6fb4229 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -143,9 +143,11 @@ public class Fetcher<K, V> { /** * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have * an in-flight fetch or pending fetch data. + * @return number of fetches sent */ - public void sendFetches() { - for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) { + public int sendFetches() { + Map<Node, FetchRequest> fetchRequestMap = createFetchRequests(); + for (Map.Entry<Node, FetchRequest> fetchEntry : fetchRequestMap.entrySet()) { final FetchRequest request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); @@ -183,6 +185,7 @@ public class Fetcher<K, V> { } }); } + return fetchRequestMap.size(); } /** @@ -605,11 +608,15 @@ public class Fetcher<K, V> { } private List<TopicPartition> fetchablePartitions() { + Set<TopicPartition> exclude = new HashSet<>(); List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); - if (nextInLineRecords != null && !nextInLineRecords.isDrained()) - fetchable.remove(nextInLineRecords.partition); - for (CompletedFetch completedFetch : completedFetches) - fetchable.remove(completedFetch.partition); + if (nextInLineRecords != null && !nextInLineRecords.isDrained()) { + exclude.add(nextInLineRecords.partition); + } + for (CompletedFetch completedFetch : completedFetches) { + exclude.add(completedFetch.partition); + } + fetchable.removeAll(exclude); return fetchable; } http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 003d1a1..a476221 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -285,7 +285,7 @@ public class SubscriptionState { } public List<TopicPartition> fetchablePartitions() { - List<TopicPartition> fetchable = new ArrayList<>(); + List<TopicPartition> fetchable = new ArrayList<>(assignment.size()); for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) { if (state.value().isFetchable()) fetchable.add(state.topicPartition()); http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 140e041..6d5896f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -126,7 +126,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); // normal fetch - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); @@ -151,7 +151,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); @@ -194,7 +194,7 @@ public class FetcherTest { client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -235,7 +235,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); // normal fetch - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); try { @@ -258,7 +258,7 @@ public class FetcherTest { client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0)); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(2, records.size()); @@ -266,14 +266,14 @@ public class FetcherTest { assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); - fetcher.sendFetches(); + assertEquals(0, fetcher.sendFetches()); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(1, records.size()); assertEquals(4L, subscriptions.position(tp).longValue()); assertEquals(3, records.get(0).offset()); - fetcher.sendFetches(); + assertTrue(fetcher.sendFetches() > 0); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(2, records.size()); @@ -298,7 +298,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); // normal fetch - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); consumerRecords = fetcher.fetchedRecords().get(tp); @@ -316,7 +316,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); // resize the limit of the buffer to pretend it is only fetch-size large - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0)); consumerClient.poll(0); try { @@ -333,7 +333,7 @@ public class FetcherTest { subscriptions.assignFromSubscribed(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); // Now the rebalance happens and fetch positions are cleared subscriptions.assignFromSubscribed(singleton(tp)); @@ -349,7 +349,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); subscriptions.pause(tp); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); @@ -363,7 +363,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); subscriptions.pause(tp); - fetcher.sendFetches(); + assertFalse(fetcher.sendFetches() > 0); assertTrue(client.requests().isEmpty()); } @@ -372,7 +372,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -384,7 +384,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -396,7 +396,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -411,7 +411,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); subscriptions.seek(tp, 1); consumerClient.poll(0); @@ -425,7 +425,7 @@ public class FetcherTest { subscriptionsNoAutoReset.assignFromUser(singleton(tp)); subscriptionsNoAutoReset.seek(tp, 0); - fetcherNoAutoReset.sendFetches(); + assertTrue(fetcherNoAutoReset.sendFetches() > 0); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); @@ -458,7 +458,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -617,7 +617,7 @@ public class FetcherTest { } this.records.close(); } - fetcher.sendFetches(); + assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp);