Repository: kafka Updated Branches: refs/heads/trunk 8d188c911 -> f9d7808ba
KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com> Closes #2190 from hachikuji/KAFKA-4469 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9d7808b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9d7808b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9d7808b Branch: refs/heads/trunk Commit: f9d7808bab52a0a6fa879aaac0b1da80e0f33adb Parents: 8d188c9 Author: Jason Gustafson <ja...@confluent.io> Authored: Wed Nov 30 13:18:04 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Nov 30 13:18:04 2016 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 82 ++++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f9d7808b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 703ea29..e414fcb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,7 +61,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -416,25 +415,40 @@ public class Fetcher<K, V> { int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { + if (nextInLineRecords == null || nextInLineRecords.isDrained()) { CompletedFetch completedFetch = completedFetches.poll(); if (completedFetch == null) break; nextInLineRecords = parseFetchedData(completedFetch); } else { - recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); + TopicPartition partition = nextInLineRecords.partition; + + List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); + if (!records.isEmpty()) { + List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); + if (currentRecords == null) { + drained.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + drained.put(partition, newRecords); + } + recordsRemaining -= records.size(); + } } } return drained; } - private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, - PartitionRecords<K, V> partitionRecords, - int maxRecords) { - if (partitionRecords.isEmpty()) - return 0; + private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) { + if (partitionRecords.isDrained()) + return Collections.emptyList(); if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call @@ -447,22 +461,14 @@ public class Fetcher<K, V> { log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else if (partitionRecords.fetchOffset == position) { // we are ensured to have at least one record since we already checked for emptiness - List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords); + List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords); long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); - List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition); - if (records == null) { - records = partRecords; - drained.put(partitionRecords.partition, records); - } else { - records.addAll(partRecords); - } - subscriptions.position(partitionRecords.partition, nextOffset); - return partRecords.size(); + return partRecords; } else { // these records aren't next in line based on the last consumed position, ignore them // they must be from an obsolete request @@ -471,8 +477,8 @@ public class Fetcher<K, V> { } } - partitionRecords.discard(); - return 0; + partitionRecords.drain(); + return Collections.emptyList(); } /** @@ -600,7 +606,7 @@ public class Fetcher<K, V> { private List<TopicPartition> fetchablePartitions() { List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); - if (nextInLineRecords != null && !nextInLineRecords.isEmpty()) + if (nextInLineRecords != null && !nextInLineRecords.isDrained()) fetchable.remove(nextInLineRecords.partition); for (CompletedFetch completedFetch : completedFetches) fetchable.remove(completedFetch.partition); @@ -768,6 +774,7 @@ public class Fetcher<K, V> { private long fetchOffset; private TopicPartition partition; private List<ConsumerRecord<K, V>> records; + private int position = 0; public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) { this.fetchOffset = fetchOffset; @@ -775,33 +782,26 @@ public class Fetcher<K, V> { this.records = records; } - private boolean isEmpty() { - return records == null || records.isEmpty(); + private boolean isDrained() { + return records == null || position >= records.size(); } - private void discard() { + private void drain() { this.records = null; } - private List<ConsumerRecord<K, V>> take(int n) { - if (records == null) - return new ArrayList<>(); - - if (n >= records.size()) { - List<ConsumerRecord<K, V>> res = this.records; - this.records = null; - return res; - } + private List<ConsumerRecord<K, V>> drainRecords(int n) { + if (isDrained()) + return Collections.emptyList(); - List<ConsumerRecord<K, V>> res = new ArrayList<>(n); - Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); - for (int i = 0; i < n; i++) { - res.add(iterator.next()); - iterator.remove(); - } + // using a sublist avoids a potentially expensive list copy (depending on the size of the records + // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist. + int limit = Math.min(records.size(), position + n); + List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit)); - if (iterator.hasNext()) - this.fetchOffset = iterator.next().offset(); + position = limit; + if (position < records.size()) + fetchOffset = records.get(position).offset(); return res; }