Repository: kafka
Updated Branches:
  refs/heads/0.10.1 b1f8fc7b6 -> aec74ae5b


KAFKA-4469; Fix consumer performance regression from inefficient list removal 
and copy

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>

Closes #2190 from hachikuji/KAFKA-4469

(cherry picked from commit f9d7808bab52a0a6fa879aaac0b1da80e0f33adb)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aec74ae5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aec74ae5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aec74ae5

Branch: refs/heads/0.10.1
Commit: aec74ae5b2cd1815fce0227b0f2773ca972457bf
Parents: b1f8fc7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 30 13:18:04 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 30 13:33:50 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 82 ++++++++++----------
 1 file changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aec74ae5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bfc1a0b..fdcfc30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,7 +62,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -417,25 +416,40 @@ public class Fetcher<K, V> {
         int recordsRemaining = maxPollRecords;
 
         while (recordsRemaining > 0) {
-            if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                 CompletedFetch completedFetch = completedFetches.poll();
                 if (completedFetch == null)
                     break;
 
                 nextInLineRecords = parseFetchedData(completedFetch);
             } else {
-                recordsRemaining -= append(drained, nextInLineRecords, 
recordsRemaining);
+                TopicPartition partition = nextInLineRecords.partition;
+
+                List<ConsumerRecord<K, V>> records = 
drainRecords(nextInLineRecords, recordsRemaining);
+                if (!records.isEmpty()) {
+                    List<ConsumerRecord<K, V>> currentRecords = 
drained.get(partition);
+                    if (currentRecords == null) {
+                        drained.put(partition, records);
+                    } else {
+                        // this case shouldn't usually happen because we only 
send one fetch at a time per partition,
+                        // but it might conceivably happen in some rare cases 
(such as partition leader changes).
+                        // we have to copy to a new list because the old one 
may be immutable
+                        List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
+                        newRecords.addAll(currentRecords);
+                        newRecords.addAll(records);
+                        drained.put(partition, newRecords);
+                    }
+                    recordsRemaining -= records.size();
+                }
             }
         }
 
         return drained;
     }
 
-    private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
-                       PartitionRecords<K, V> partitionRecords,
-                       int maxRecords) {
-        if (partitionRecords.isEmpty())
-            return 0;
+    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> 
partitionRecords, int maxRecords) {
+        if (partitionRecords.isDrained())
+            return Collections.emptyList();
 
         if (!subscriptions.isAssigned(partitionRecords.partition)) {
             // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
@@ -448,22 +462,14 @@ public class Fetcher<K, V> {
                 log.debug("Not returning fetched records for assigned 
partition {} since it is no longer fetchable", partitionRecords.partition);
             } else if (partitionRecords.fetchOffset == position) {
                 // we are ensured to have at least one record since we already 
checked for emptiness
-                List<ConsumerRecord<K, V>> partRecords = 
partitionRecords.take(maxRecords);
+                List<ConsumerRecord<K, V>> partRecords = 
partitionRecords.drainRecords(maxRecords);
                 long nextOffset = partRecords.get(partRecords.size() - 
1).offset() + 1;
 
                 log.trace("Returning fetched records at offset {} for assigned 
partition {} and update " +
                         "position to {}", position, 
partitionRecords.partition, nextOffset);
 
-                List<ConsumerRecord<K, V>> records = 
drained.get(partitionRecords.partition);
-                if (records == null) {
-                    records = partRecords;
-                    drained.put(partitionRecords.partition, records);
-                } else {
-                    records.addAll(partRecords);
-                }
-
                 subscriptions.position(partitionRecords.partition, nextOffset);
-                return partRecords.size();
+                return partRecords;
             } else {
                 // these records aren't next in line based on the last 
consumed position, ignore them
                 // they must be from an obsolete request
@@ -472,8 +478,8 @@ public class Fetcher<K, V> {
             }
         }
 
-        partitionRecords.discard();
-        return 0;
+        partitionRecords.drain();
+        return Collections.emptyList();
     }
 
     /**
@@ -601,7 +607,7 @@ public class Fetcher<K, V> {
 
     private List<TopicPartition> fetchablePartitions() {
         List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
-        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
+        if (nextInLineRecords != null && !nextInLineRecords.isDrained())
             fetchable.remove(nextInLineRecords.partition);
         for (CompletedFetch completedFetch : completedFetches)
             fetchable.remove(completedFetch.partition);
@@ -771,6 +777,7 @@ public class Fetcher<K, V> {
         private long fetchOffset;
         private TopicPartition partition;
         private List<ConsumerRecord<K, V>> records;
+        private int position = 0;
 
         public PartitionRecords(long fetchOffset, TopicPartition partition, 
List<ConsumerRecord<K, V>> records) {
             this.fetchOffset = fetchOffset;
@@ -778,33 +785,26 @@ public class Fetcher<K, V> {
             this.records = records;
         }
 
-        private boolean isEmpty() {
-            return records == null || records.isEmpty();
+        private boolean isDrained() {
+            return records == null || position >= records.size();
         }
 
-        private void discard() {
+        private void drain() {
             this.records = null;
         }
 
-        private List<ConsumerRecord<K, V>> take(int n) {
-            if (records == null)
-                return new ArrayList<>();
-
-            if (n >= records.size()) {
-                List<ConsumerRecord<K, V>> res = this.records;
-                this.records = null;
-                return res;
-            }
+        private List<ConsumerRecord<K, V>> drainRecords(int n) {
+            if (isDrained())
+                return Collections.emptyList();
 
-            List<ConsumerRecord<K, V>> res = new ArrayList<>(n);
-            Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
-            for (int i = 0; i < n; i++) {
-                res.add(iterator.next());
-                iterator.remove();
-            }
+            // using a sublist avoids a potentially expensive list copy 
(depending on the size of the records
+            // and the maximum we can return from poll). The cost is that we 
cannot mutate the returned sublist.
+            int limit = Math.min(records.size(), position + n);
+            List<ConsumerRecord<K, V>> res = 
Collections.unmodifiableList(records.subList(position, limit));
 
-            if (iterator.hasNext())
-                this.fetchOffset = iterator.next().offset();
+            position = limit;
+            if (position < records.size())
+                fetchOffset = records.get(position).offset();
 
             return res;
         }

Reply via email to