[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140419#comment-13140419 ]
Jay Kreps commented on KAFKA-48: -------------------------------- Yes, these are all good points. The work I have done so far just splits request processing into a separate thread pool and enables asynchronous handling. This is a fairly general thing we need for a few different use cases. Perhaps this should be broken into a separate JIRA. I have thought a little bit about how to do long poll, though. Logically what I want to do is make it possible to give a minimum byte size for the response and a maximum delay in ms; then have the server delay the response until we have at least min_bytes messages in the response OR we hit the maximum delay time. The goal is both to improve latency (by avoiding waiting in between poll requests), to reduce load on the server (by not polling), and to make it possible to improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you effectively get the current behavior. The throughput improvement comes if you set the min_bytes > 1; this would give a way to artificially increase the response size for requests to the topic (i.e. avoid fetching only a few messages at a time) while still giving hard latency guarantees. We have seen, the request size is one of the important things for network throughput. As you say, the only case to really consider is the multi-fetch case. The single topic fetch can just be seen as a special case of this. I think your first proposal is closer to what I had in mind. Having the response contain an empty message set for the topics that have no data has very little overhead since it is just positionally indexed, so it is like 4 bytes or something. I don't like doing a poll() style interface that just returns ready topics doesn't seem very useful to me because the only logical thing you can do is then initiate a fetch on those topics, right? So might as well just send back the data and have a single request type to worry about? One of the tricky questions for multifetch is what does the minimum byte size pertain to? A straight-forward implementation in the current system would be to add the min_bytes and timeout to the fetch request which would effectively bundle it up N times in the multi-fetch (currently multi-fetch is just N fetches glued together). This doesn't really make sense, though. Which of these minimum sizes would cause the single response to be sent? Would it be when all conditions were satisfied or when one was satisfied? I think the only thing that makes sense is to set these things at the request level. Ideally what I would like to do is remove the fetch request entirely because it is redundant and fix multi-fetch to have the following: [(topic1, partitions1), (topic2, partitions2),...], max_total_size, max_wait_ms This also fixes the weird thing in multifetch now where you have to specify the topic with each partition, so a request for 10 partitions on the same topic repeats the topic name 10 times. This is an invasive change, though, since it means request format changes. I am also not 100% sure how to implement the min_bytes parameter efficiently for multi-fetch. For the single fetch case it is pretty easy, the implementation would be to keep a sort of hybrid priority queue by timeout time (e.g. the unix timestamp at which we owe a response). When a fetch request came in we would try to service it immediately, and if we could meet its requirements we would immediately send a response. If we can't meet its min_bytes requirement then we would calculate the offset for that topic/partition at which the request would be unblocked (e.g. if the current offset is X and the min_bytes is M then the target size is X+M). We would insert new requests into this watchers list maintaining a sort by increasing target size. Each time a produce request is handled we would respond to all the watching requests whose target size is < then new offset, this would just require walking the list until we see a request with a target size greater than the current offset. All the newly unblocked requests would be added to the response queue. So this means the only work added to a produce request is the work of transferring newly unblocked requests to the response queue and at most we only need to examine one blocked request. The timeout could be implemented by keeping a priority queue of requests based on the unix timestamp of the latest allowable response (i.e. the ts the request came in, plus the max_wait_ms). We could add a background thread to remove items from this as their timeout occurs, and add them to the response queue with an empty response. For the multifetch case, things are harder to do efficiently. The timeouts can still work the same way. However the min_bytes is now over all the topics the request covers. The only way I can see to implement this is to keep a counter associated with each watcher, and have the watcher watch all the requested topics. But now on each produce request we need to increment ALL the watchers on the topic produced to. Dunno, maybe for practical numbers of blocked requests (a few hundred? a thousand?) this doesn't matter. Or maybe there is a more clever approach. Ideas welcome. > Implement optional "long poll" support in fetch request > ------------------------------------------------------- > > Key: KAFKA-48 > URL: https://issues.apache.org/jira/browse/KAFKA-48 > Project: Kafka > Issue Type: Bug > Reporter: Alan Cabrera > Assignee: Jay Kreps > Attachments: KAFKA-48-socket-server-refactor-draft.patch > > > Currently, the fetch request is non-blocking. If there is nothing on the > broker for the consumer to retrieve, the broker simply returns an empty set > to the consumer. This can be inefficient, if you want to ensure low-latency > because you keep polling over and over. We should make a blocking version of > the fetch request so that the fetch request is not returned until the broker > has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira