[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140419#comment-13140419
 ] 

Jay Kreps commented on KAFKA-48:
--------------------------------

Yes, these are all good points. The work I have done so far just splits request 
processing into a separate thread pool and enables asynchronous handling. This 
is a fairly general thing we need for a few different use cases. Perhaps this 
should be broken into a separate JIRA.

I have thought a little bit about how to do long poll, though. Logically what I 
want to do is make it possible to give a minimum byte size for the response and 
a maximum delay in ms; then have the server delay the response until we have at 
least min_bytes messages in the response OR we hit the maximum delay time. The 
goal is both to improve latency (by avoiding waiting in between poll requests), 
to reduce load on the server (by not polling), and to make it possible to 
improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you 
effectively get the current behavior. The throughput improvement comes if you 
set the min_bytes > 1; this would give a way to artificially increase the 
response size for requests to the topic (i.e. avoid fetching only a few 
messages at a time) while still giving hard latency guarantees. We have seen, 
the request size is one of the important things for network throughput.

As you say, the only case to really consider is the multi-fetch case. The 
single topic fetch can just be seen as a special case of this. I think your 
first proposal is closer to what I had in mind. Having the response contain an 
empty message set for the topics that have no data has very little overhead 
since it is just positionally indexed, so it is like 4 bytes or something. I 
don't like doing a poll() style interface that just returns ready topics 
doesn't seem very useful to me because the only logical thing you can do is 
then initiate a fetch on those topics, right? So might as well just send back 
the data and have a single request type to worry about?

One of the tricky questions for multifetch is what does the minimum byte size 
pertain to? A straight-forward implementation in the current system would be to 
add the min_bytes and timeout to the fetch request which would effectively 
bundle it up N times in the multi-fetch (currently multi-fetch is just N 
fetches glued together). This doesn't really make sense, though. Which of these 
minimum sizes would cause the single response to be sent? Would it be when all 
conditions were satisfied or when one was satisfied? I think the only thing 
that makes sense is to set these things at the request level. Ideally what I 
would like to do is remove the fetch request entirely because it is redundant 
and fix multi-fetch to have the following:
   [(topic1, partitions1), (topic2, partitions2),...], max_total_size, 
max_wait_ms
This also fixes the weird thing in multifetch now where you have to specify the 
topic with each partition, so a request for 10 partitions on the same topic 
repeats the topic name 10 times. This is an invasive change, though, since it 
means request format changes.

I am also not 100% sure how to implement the min_bytes parameter efficiently 
for multi-fetch. For the single fetch case it is pretty easy, the 
implementation would be to keep a sort of hybrid priority queue by timeout time 
(e.g. the unix timestamp at which we owe a response). When a fetch request came 
in we would try to service it immediately, and if we could meet its 
requirements we would immediately send a response. If we can't meet its 
min_bytes requirement then we would calculate the offset for that 
topic/partition at which the request would be unblocked (e.g. if the current 
offset is X and the min_bytes is M then the target size is X+M). We would 
insert new requests into this watchers list maintaining a sort by increasing 
target size. Each time a produce request is handled we would respond to all the 
watching requests whose target size is < then new offset, this would just 
require walking the list until we see a request with a target size greater than 
the current offset. All the newly unblocked requests would be added to the 
response queue. So this means the only work added to a produce request is the 
work of transferring newly unblocked requests to the response queue and at most 
we only need to examine one blocked request.

The timeout could be implemented by keeping a priority queue of requests based 
on the unix timestamp of the latest allowable response (i.e. the ts the request 
came in, plus the max_wait_ms). We could add a background thread to remove 
items from this as their timeout occurs, and add them to the response queue 
with an empty response.
 
For the multifetch case, things are harder to do efficiently. The timeouts can 
still work the same way. However the min_bytes is now over all the topics the 
request covers. The only way I can see to implement this is to keep a counter 
associated with each watcher, and have the watcher watch all the requested 
topics. But now on each produce request we need to increment ALL the watchers 
on the topic produced to.

Dunno, maybe for practical numbers of blocked requests (a few hundred? a 
thousand?) this doesn't matter. Or maybe there is a more clever approach. Ideas 
welcome.

                
> Implement optional "long poll" support in fetch request
> -------------------------------------------------------
>
>                 Key: KAFKA-48
>                 URL: https://issues.apache.org/jira/browse/KAFKA-48
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Alan Cabrera
>            Assignee: Jay Kreps
>         Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


Reply via email to