[
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705691#comment-15705691
]
Ismael Juma commented on KAFKA-4405:
------------------------------------
[~ysysberserk], are you really seeing the behaviour you have described here?
Fetcher should only try to prefetch if we don't have records for that
partition, see the code below (`fetchablePartitions` in particular):
{code}
private List<TopicPartition> fetchablePartitions() {
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
fetchable.remove(completedFetch.partition);
return fetchable;
}
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>>
fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new
fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch
= fetchable.get(node);
if (fetch == null) {
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
}
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position,
this.fetchSize));
log.trace("Added fetch request for partition {} at offset {}",
partition, position);
} else {
log.trace("Skipping fetch for partition {} because there is an
in-flight request to {}", partition, node);
}
}
// create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition,
FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs,
this.minBytes, this.maxBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
}
{code}
> Kafka consumer improperly send prefetch request
> -----------------------------------------------
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.0.1
> Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages
> return by poll().
> According to KIP-41, to implement max.poll.records, the prefetch request
> should only be sent when the total number of retained records is less than
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it
> retained any records and never check if total number of retained records is
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message
> fetched , the poll() loop will send a lot of requests than expected and will
> have more and more records fetched and stored in memory before they can be
> consumed.
> So before sending a prefetch request , the consumer must check if total
> number of retained records is less than max.poll.records.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)