[ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705691#comment-15705691
 ] 

Ismael Juma commented on KAFKA-4405:
------------------------------------

[~ysysberserk], are you really seeing the behaviour you have described here? 
Fetcher should only try to prefetch if we don't have records for that 
partition, see the code below (`fetchablePartitions` in particular):

{code}
    private List<TopicPartition> fetchablePartitions() {
        List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
            fetchable.remove(nextInLineRecords.partition);
        for (CompletedFetch completedFetch : completedFetches)
            fetchable.remove(completedFetch.partition);
        return fetchable;
    }

    /**
     * Create fetch requests for all nodes for which we have assigned partitions
     * that have no existing requests in flight.
     */
    private Map<Node, FetchRequest> createFetchRequests() {
        // create the fetch info
        Cluster cluster = metadata.fetch();
        Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> 
fetchable = new LinkedHashMap<>();
        for (TopicPartition partition : fetchablePartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                metadata.requestUpdate();
            } else if (this.client.pendingRequestCount(node) == 0) {
                // if there is a leader and no in-flight requests, issue a new 
fetch
                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch 
= fetchable.get(node);
                if (fetch == null) {
                    fetch = new LinkedHashMap<>();
                    fetchable.put(node, fetch);
                }

                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, 
this.fetchSize));
                log.trace("Added fetch request for partition {} at offset {}", 
partition, position);
            } else {
                log.trace("Skipping fetch for partition {} because there is an 
in-flight request to {}", partition, node);
            }
        }

        // create the fetches
        Map<Node, FetchRequest> requests = new HashMap<>();
        for (Map.Entry<Node, LinkedHashMap<TopicPartition, 
FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
            Node node = entry.getKey();
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, 
this.minBytes, this.maxBytes, entry.getValue());
            requests.put(node, fetch);
        }
        return requests;
    }
{code}

> Kafka consumer improperly send prefetch request
> -----------------------------------------------
>
>                 Key: KAFKA-4405
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4405
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.0.1
>            Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to