Repository: kafka
Updated Branches:
  refs/heads/trunk f0acc4a0d -> efeaf1298


KAFKA-4405; Avoid unnecessary network poll in consumer if no fetches sent

Author: Eno Thereska <eno.there...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2193 from enothereska/KAFKA-4405-prefetch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efeaf129
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efeaf129
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efeaf129

Branch: refs/heads/trunk
Commit: efeaf129890c2195b4753d5b9eece4f1b7cdf756
Parents: f0acc4a
Author: Eno Thereska <eno.there...@gmail.com>
Authored: Mon Dec 12 09:38:25 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Dec 12 09:38:25 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  5 +--
 .../clients/consumer/internals/Fetcher.java     | 19 ++++++----
 .../consumer/internals/SubscriptionState.java   |  2 +-
 .../clients/consumer/internals/FetcherTest.java | 38 ++++++++++----------
 4 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f89460b..5e66a8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -992,8 +992,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     //
                     // NOTE: since the consumed position has already been 
updated, we must not allow
                     // wakeups or any other errors to be triggered prior to 
returning the fetched records.
-                    fetcher.sendFetches();
-                    client.pollNoWakeup();
+                    if (fetcher.sendFetches() > 0) {
+                        client.pollNoWakeup();
+                    }
 
                     if (this.interceptors == null)
                         return new ConsumerRecords<>(records);

http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4bfe466..6fb4229 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -143,9 +143,11 @@ public class Fetcher<K, V> {
     /**
      * Set-up a fetch request for any node that we have assigned partitions 
for which doesn't already have
      * an in-flight fetch or pending fetch data.
+     * @return number of fetches sent
      */
-    public void sendFetches() {
-        for (Map.Entry<Node, FetchRequest> fetchEntry : 
createFetchRequests().entrySet()) {
+    public int sendFetches() {
+        Map<Node, FetchRequest> fetchRequestMap = createFetchRequests();
+        for (Map.Entry<Node, FetchRequest> fetchEntry : 
fetchRequestMap.entrySet()) {
             final FetchRequest request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
@@ -183,6 +185,7 @@ public class Fetcher<K, V> {
                         }
                     });
         }
+        return fetchRequestMap.size();
     }
 
     /**
@@ -605,11 +608,15 @@ public class Fetcher<K, V> {
     }
 
     private List<TopicPartition> fetchablePartitions() {
+        Set<TopicPartition> exclude = new HashSet<>();
         List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
-        if (nextInLineRecords != null && !nextInLineRecords.isDrained())
-            fetchable.remove(nextInLineRecords.partition);
-        for (CompletedFetch completedFetch : completedFetches)
-            fetchable.remove(completedFetch.partition);
+        if (nextInLineRecords != null && !nextInLineRecords.isDrained()) {
+            exclude.add(nextInLineRecords.partition);
+        }
+        for (CompletedFetch completedFetch : completedFetches) {
+            exclude.add(completedFetch.partition);
+        }
+        fetchable.removeAll(exclude);
         return fetchable;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 003d1a1..a476221 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -285,7 +285,7 @@ public class SubscriptionState {
     }
 
     public List<TopicPartition> fetchablePartitions() {
-        List<TopicPartition> fetchable = new ArrayList<>();
+        List<TopicPartition> fetchable = new ArrayList<>(assignment.size());
         for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
             if (state.value().isFetchable())
                 fetchable.add(state.topicPartition());

http://git-wip-us.apache.org/repos/asf/kafka/blob/efeaf129/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 140e041..6d5896f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -126,7 +126,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 0));
@@ -151,7 +151,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
@@ -194,7 +194,7 @@ public class FetcherTest {
 
         client.prepareResponse(matchesOffset(tp, 1), 
fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -235,7 +235,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 
0));
         consumerClient.poll(0);
         try {
@@ -258,7 +258,7 @@ public class FetcherTest {
         client.prepareResponse(matchesOffset(tp, 1), 
fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         client.prepareResponse(matchesOffset(tp, 4), 
fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
@@ -266,14 +266,14 @@ public class FetcherTest {
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
-        fetcher.sendFetches();
+        assertEquals(0, fetcher.sendFetches());
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(1, records.size());
         assertEquals(4L, subscriptions.position(tp).longValue());
         assertEquals(3, records.get(0).offset());
 
-        fetcher.sendFetches();
+        assertTrue(fetcher.sendFetches() > 0);
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
@@ -298,7 +298,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(records.buffer(), 
Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
@@ -316,7 +316,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size 
large
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
         consumerClient.poll(0);
         try {
@@ -333,7 +333,7 @@ public class FetcherTest {
         subscriptions.assignFromSubscribed(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(singleton(tp));
@@ -349,7 +349,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         subscriptions.pause(tp);
 
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 0));
@@ -363,7 +363,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
-        fetcher.sendFetches();
+        assertFalse(fetcher.sendFetches() > 0);
         assertTrue(client.requests().isEmpty());
     }
 
@@ -372,7 +372,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -384,7 +384,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -396,7 +396,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -411,7 +411,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         subscriptions.seek(tp, 1);
         consumerClient.poll(0);
@@ -425,7 +425,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.assignFromUser(singleton(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
-        fetcherNoAutoReset.sendFetches();
+        assertTrue(fetcherNoAutoReset.sendFetches() > 0);
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -458,7 +458,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches();
+        assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -617,7 +617,7 @@ public class FetcherTest {
                 }
                 this.records.close();
             }
-            fetcher.sendFetches();
+            assertEquals(1, fetcher.sendFetches());
             client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);

Reply via email to