[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-10-31 Thread Taylor Gautier (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140231#comment-13140231
 ] 

Taylor Gautier commented on KAFKA-48:
-

Hi - please keep in mind the use case where a consumer is interested in more 
than one topic.

This feature if implemented only for one topic will not be useful for this use 
case - assuming it's infeasible to open multiple tcp connections.

The first proposal I have is to allow the request to contain a list of topics.  
However, upon consideration, this would require the response to also be 
adjusted such that it would contain the name of the topic, otherwise it would 
be next to impossible to ascertain which topic the response corresponds to - 
well it could be done such that the response is returned in the same way as the 
request was requested, and for topics with no messages, an empty response is 
given, but this seems pretty bad from a network bandwidth standpoint.

So my final proposal would be to introduce an epoll like request/response.  The 
consumer would submit a request with a list of interested topics, and the 
response would be a topic and # of messages available on that topic when the 
topic(s) have messages.

The advantage to this solution is that it would be entirely backward 
compatible, since you would simply introduce a new request/response pair and it 
would also allow the consumer to decide which topics to poll (or pull) from 
first, so that it could prioritize, if it wanted.  

Finally, I like the idea of allowing the consumer to specify a min # of 
messages required to trigger the poll, you might want to copy the pattern you 
already setup for log flushing, e.g. max time and/or min # of messages.  So the 
request might look like:

list-of :  topic-name:min msgs:max time

and the response might be:

list-of : topic-name:# msgs available




> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-10-31 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140419#comment-13140419
 ] 

Jay Kreps commented on KAFKA-48:


Yes, these are all good points. The work I have done so far just splits request 
processing into a separate thread pool and enables asynchronous handling. This 
is a fairly general thing we need for a few different use cases. Perhaps this 
should be broken into a separate JIRA.

I have thought a little bit about how to do long poll, though. Logically what I 
want to do is make it possible to give a minimum byte size for the response and 
a maximum delay in ms; then have the server delay the response until we have at 
least min_bytes messages in the response OR we hit the maximum delay time. The 
goal is both to improve latency (by avoiding waiting in between poll requests), 
to reduce load on the server (by not polling), and to make it possible to 
improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you 
effectively get the current behavior. The throughput improvement comes if you 
set the min_bytes > 1; this would give a way to artificially increase the 
response size for requests to the topic (i.e. avoid fetching only a few 
messages at a time) while still giving hard latency guarantees. We have seen, 
the request size is one of the important things for network throughput.

As you say, the only case to really consider is the multi-fetch case. The 
single topic fetch can just be seen as a special case of this. I think your 
first proposal is closer to what I had in mind. Having the response contain an 
empty message set for the topics that have no data has very little overhead 
since it is just positionally indexed, so it is like 4 bytes or something. I 
don't like doing a poll() style interface that just returns ready topics 
doesn't seem very useful to me because the only logical thing you can do is 
then initiate a fetch on those topics, right? So might as well just send back 
the data and have a single request type to worry about?

One of the tricky questions for multifetch is what does the minimum byte size 
pertain to? A straight-forward implementation in the current system would be to 
add the min_bytes and timeout to the fetch request which would effectively 
bundle it up N times in the multi-fetch (currently multi-fetch is just N 
fetches glued together). This doesn't really make sense, though. Which of these 
minimum sizes would cause the single response to be sent? Would it be when all 
conditions were satisfied or when one was satisfied? I think the only thing 
that makes sense is to set these things at the request level. Ideally what I 
would like to do is remove the fetch request entirely because it is redundant 
and fix multi-fetch to have the following:
   [(topic1, partitions1), (topic2, partitions2),...], max_total_size, 
max_wait_ms
This also fixes the weird thing in multifetch now where you have to specify the 
topic with each partition, so a request for 10 partitions on the same topic 
repeats the topic name 10 times. This is an invasive change, though, since it 
means request format changes.

I am also not 100% sure how to implement the min_bytes parameter efficiently 
for multi-fetch. For the single fetch case it is pretty easy, the 
implementation would be to keep a sort of hybrid priority queue by timeout time 
(e.g. the unix timestamp at which we owe a response). When a fetch request came 
in we would try to service it immediately, and if we could meet its 
requirements we would immediately send a response. If we can't meet its 
min_bytes requirement then we would calculate the offset for that 
topic/partition at which the request would be unblocked (e.g. if the current 
offset is X and the min_bytes is M then the target size is X+M). We would 
insert new requests into this watchers list maintaining a sort by increasing 
target size. Each time a produce request is handled we would respond to all the 
watching requests whose target size is < then new offset, this would just 
require walking the list until we see a request with a target size greater than 
the current offset. All the newly unblocked requests would be added to the 
response queue. So this means the only work added to a produce request is the 
work of transferring newly unblocked requests to the response queue and at most 
we only need to examine one blocked request.

The timeout could be implemented by keeping a priority queue of requests based 
on the unix timestamp of the latest allowable response (i.e. the ts the request 
came in, plus the max_wait_ms). We could add a background thread to remove 
items from this as their timeout occurs, and add them to the response queue 
with an empty response.
 
For the multifetch case, things are harder to do efficiently. The timeouts can 
still work the same way. However the min_bytes is now over all the topics the 
request cove

[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-10-31 Thread Taylor Gautier (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140450#comment-13140450
 ] 

Taylor Gautier commented on KAFKA-48:
-

I can see how it would be reasonable to do the first approach.  It does limit 
one use case I was considering, which is to allow the consumer to decide in 
which order to fetch the topics after the poll is triggered, however, this can 
be done at request time when the topics are requested.

As you say, the response is 100% compatible, it's just the request that 
changes.  Therefore it would make sense I think to go ahead and make a new 
request type that doesn't yet exist and then the current fetch request remains 
the same on the wire and the behavior of it is just a degenerate case of this 
new use case with delay and bytes set to 0.

I think you might consider how useful is it to wait for user specified 
time/bytes?  It will add a lot of complexity to your implementation, and 
frankly if I have just the ability to do a multi-fetch that will wait until it 
has something has arrived and send me whatever it has at the current moment 
that will be good enough.  It should also probably provide a simple timeout 
that will respond with nothing if the timeout expires.

I think that's a huge win and you might consider -- is that good enough?

For me - I would prefer to get the simple thing in the short term and work on 
the harder thing in the long-term rather than waiting a longer time for the 
harder thing to be done.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-10-31 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140467#comment-13140467
 ] 

Jay Kreps commented on KAFKA-48:


Hi Taylor,

Could you give a little more detail on your use case for ordering the fetches? 
I think you have a use case I haven't thought of, but I don't know if I 
understand it. Is your motivation some kind of quality of service over the 
topics?

As you say, this would definitely be a new request type for compatibility, and 
we would probably try to deprecate the old format over the next few releases as 
we can get clients updated.

Your point about complexity is valid. I think for our usage since we use kafka 
very heavily the pain of grandfathering in new APIs is the hardest part, and 
the socket server refactoring is next, so I was thinking the difficulty of 
implementing a few internal data structures is not too bad. I suppose it 
depends on if I work out a concrete plan there or not. If the best we can do is 
iterate over the full set of watchers it may not be worth it.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-10-31 Thread Taylor Gautier (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140531#comment-13140531
 ] 

Taylor Gautier commented on KAFKA-48:
-

Actually, I don't have a valid use case for priority fetches, I was just 
thinking ahead.

I agree that it's painful to have message format upgrades.  On the flip side of 
course we probably also agree it's bad to have parameters in the message header 
that don't correspond to real features.  

Can you make a trade-off and reserve some bytes for these two int (or long) 
parameters and/or a few others but just call the space reserved?

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-11-11 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13148835#comment-13148835
 ] 

Jun Rao commented on KAFKA-48:
--

Just had a chance to look at the patch. Agree in principle this would work. 
It's probably better to create a separate jira for moving the requesthandler 
out of socket server. The long poll jira will depend on that jira.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-11-11 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13148971#comment-13148971
 ] 

Jay Kreps commented on KAFKA-48:


Cool, moved it.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2011-11-21 Thread Taylor Gautier (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13154069#comment-13154069
 ] 

Taylor Gautier commented on KAFKA-48:
-

I've been staring at the code for a while - and I'm not sure I understand why 
you need KAFKA-202 to implement this feature.

What I am thinking to do is:
1) Every thread has to open a local socket for read/write
2) Each thread puts the socket into the poll set for reading
3) If a read request fails to read any messages, when it comes back to the 
handler, the handler adds a callback method to the appropriate log and puts the 
read request into a special queue.  When that log gets messages for write, it 
calls the callback.  The callback writes a byte into the special thread socket.
4) The byte wakes up the thread, which sees that the special socket had a byte 
written to it, and so it goes and re-handles the read requests in the special 
queue as if they had just come in from the network. Thus if there are any 
messages available in the log for a given request, they are read just like 
normal and transferred out onto the channel.  If not, they're re-queued as per 
step 3.

I think there is some pieces I haven't quite got right - in particular, I think 
there can only be one active response at a time.  Thus there will have to be 
some sort of response queue built up as each request generates a response, but 
I think that's simple - the handler just writes responses with non-zero 
messages into a response queue and the write logic of the socketserver is 
updated to drain this queue on write events (at the moment, it only deals with 
one response at a time, but now it may have many to send out queued up).

Some other work that is probably going to be more difficult is that the binary 
protocol has to change to include the topic name or else there is no way to 
disambiguate the responses coming back.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-01-02 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13178466#comment-13178466
 ] 

Jun Rao commented on KAFKA-48:
--

Taylor,

Sorry for the late response. I am not sure that I understand your proposal. 

a. Why do we need a local socket? It seems that the same thing can be achieved 
by just turning on the write_interesting bit in the socket key corresponding to 
a client request.

b. It's not clear to me how you correlate a queued client request with the 
corresponding client socket.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-02-03 Thread Taylor Gautier (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13200138#comment-13200138
 ] 

Taylor Gautier commented on KAFKA-48:
-

Jay - that's great to hear!! Would you mind summarizing the way that the 
long-poll works?  I know that several different implementations were suggested 
here on the thread and I wanted to know which one you ultimately decided to go 
with.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-02-03 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13200155#comment-13200155
 ] 

Jay Kreps commented on KAFKA-48:


Hey Taylor, here are the nitty gritty details:
- When a fetch request comes in we immediately check if we have sufficient data 
to satisfy it
   - if so we respond immediately
   - If not we add a "watch" on the topics that the fetch is for, and add it to 
a delay queue to expire it after the given timeout
   - There is a background thread that checks the delay queue for expired 
requests and responds to them with whatever data is available
- When a produce request comes in we update the watchers for all the topics it 
produces to, and increment their byte count. Any requests that have been 
satisfied by this produce, are then executed and responses are sent.

So one of the earlier questions was how to support polling on a very large 
number of topics AND wants very low latency, I think as you described it would 
be possible to implement this by simply multiplexing the requests on the single 
socket and letting the server respond to these as possible.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-02-03 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13200159#comment-13200159
 ] 

Jay Kreps commented on KAFKA-48:


Two other issues with this patch, I forgot to mention:
- There is a race condition between checking the available bytes, and adding 
the watchers for the topics. I *think* this is okay since the min_bytes is a 
minimum not a maximum, so in the rare case that a produce comes in before the 
watchers are added we will just wait slightly longer than we should have. I 
think this is probably better than properly synchronizing and locking out all 
produces on that partition.
- The other issues is that the delay queue is only emptied right now when the 
delay expires. If the request is fulfilled before the delay expires, the 
request is marked completed, but it remains in the delay queue until it 
expires. This is a problem and needs to be fixed. The problem is that if the 
client sets a low min_bytes and a high max_wait these requests may accumulate. 
Currently we would have to do an O(N) walk of the waiting requests to fix this. 
I am going to try to come up with an improved set of data structures to fix 
this without requiring that.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-02-20 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13212329#comment-13212329
 ] 

Jun Rao commented on KAFKA-48:
--

Overall, the patch looks good. Some comments:

1. DelayedItem.compareTo: yourEnd should be delayed.createdMs + delayed.delayMs
2. Suppose that a client issues MultiFetch requests on a hot topic and a cold 
topic. What can happen is that the watcher list for the cold topic won't be 
cleaned up for a long time. One solution is to have a cleaner thread that 
periodically wakes up to remove satisfied items. The cleaner thread can be used 
to clean up the DelayQueue too.
3. MessageSetSend.empty is not used.


> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alan Cabrera
>Assignee: Jay Kreps
> Attachments: KAFKA-48.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-05 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13247409#comment-13247409
 ] 

Jun Rao commented on KAFKA-48:
--

Thanks for patch v3. Some comments:

31. DelayedFetch is keyed off topic. It should be keyed off (topic, partition) 
since a consumer may be interested in only a subset of partitions within a 
topic.

32. KafkaApis: The following 3 lines are duplicated in 2 places.
  val topicData = readMessageSets(delayed.fetch.offsetInfo)
  val response = new FetchResponse(FetchRequest.CurrentVersion, 
delayed.fetch.correlationId, topicData)
  requestChannel.sendResponse(new RequestChannel.Response(delayed.request, 
new FetchResponseSend(response, ErrorMapping.NoError), -1))
Should we put them in a private method and share the code?

33. ExpiredRequestReaper.purgeExpired(): We need to decrement unsatisfied count 
here.

34. FetchRequest: Can we have the default constants for correlationId, 
clientid, etc defined and shared btw the constructor and the request builder?

35. MessageSetSend.empty is unused. Should we remove it?

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-10 Thread Jun Rao (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250856#comment-13250856
 ] 

Jun Rao commented on KAFKA-48:
--

Patch v4 looks good. Just one more comment.

41. RequestPurgatory.update(): if(w == null), could we return a singleton empty 
array, instead of creating a new one every time?

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-10 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250878#comment-13250878
 ] 

Jay Kreps commented on KAFKA-48:


Good point Jun, now it is
if(w == null)
  Seq.empty
else
  w.collectSatisfiedRequests(request)

I will wait for more feedback before making a new patch since this is a pretty 
trivial change.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-10 Thread Neha Narkhede (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250912#comment-13250912
 ] 

Neha Narkhede commented on KAFKA-48:


This patch looks very good. Here are a few questions - 

1. I like the way the expired requests are handled by implementing the logic 
inside the FetchRequestPurgatory. However, can we not do the same for satisfied 
requests by providing a satisfy() abstract API in RequestPurgatory ? That gets 
rid of the handling of fetch requests inside handleProducerRequest() in 
KafkaApis, which is a little awkward to read. When we have the 
ProduceRequestPurgatory, the same satisfy() operation can send responses for 
produce requests once the fetch responses for the followers come in. 

2. I gave the RequestPurgatory data structure some thought. Not sure if this 
buys us anything over the current data structure. How about the following data 
structure for the RequestPurgatory - 

2.1. The watchers would be a priority heap (PriorityQueue), with the head being 
the DelayedItem with the least delay value (earliest expiration time). So for 
each (topic, partition), we have a PQ of watchers. 

2.2. The expiration data structure is another PQ of size n, where n is the 
number of keys in RequestPurgatory. This expiration PQ has the heads of each of 
the watcher lists above. 

2.3. The expiration thread will await on a condition variable with a timeout = 
delay of the head of the expiration PQ. The condition also gets signaled 
whenever the head of any of the n watcher list changes. 

2.4. When the expiration thread gets signaled, it removes its head element, 
expires it if its ready, ignores if its satisfied, and adds an element from the 
watch list it came from. It keeps doing this until its head has expiration time 
in the future. Then it goes back to awaiting on the condition variable. 

2.5. The item to be expired gets removed from its watch list as well as 
expiration PQ in O(1). 

2.6. The item that gets satisfied sets a flag and gets removed from its watcher 
list. If the satisfied item is the head of the watcher list, the expiration 
thread gets signaled to add new head to its PQ. 

2.7 Pros 
2.7.1. The watcher list doesn't maintain expired items, so doesn't need 
state-keeping for liveCount and maybePurge() 
2.7.2. During a watch operation, items only enter the expiration PQ if they are 
the head of the watcher list 
2.7.3. The expiration thread does a more informed get operation, instead of 
polling the queue in a loop. 

2.8. Cons 
2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a 
key 
2.8.2 The forcePurge() operation on the expiration data structure still needs 
to happen in O(n) 

Did I miss something here ? Thoughts ? 

3. On the other hand, this is a huge non-trivial patch and you must be pretty 
tired of rebasing and working through unit tests. We could just discuss the 
above changes, and maybe file another JIRA to track it, instead of delaying 
this patch further. But that is your call.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-10 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250982#comment-13250982
 ] 

Jay Kreps commented on KAFKA-48:


Hey Neha, yes, my hope is to get the patch evaluated as is, and then take 
another pass at cleaning up the way we handle the satisfaction action as Jun 
and you requested and try out other approaches to the purgatory data structure 
asynchronously. That should take these cleanup/polishing items out of the 
critical path.

I like your idea of the dual priority queues, but I need to work through it 
more to fully understand it.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-17 Thread Joel Koshy (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13256073#comment-13256073
 ] 

Joel Koshy commented on KAFKA-48:
-

+1 on the patch. I have a few minor comments:

KafkaRequestHandlers :
- requestLogger unused.

ConsumerConfig:
- maxFetchWait -> rename the prop to max.fetch.wait.ms and the val to
  maxFetchWaitMs
- Can we get rid of fetcherBackoffMs? It says it is deprecated, but had a
  reference in FetcherRunnable which you removed.
- May want to have an explicit constraint that consumerTimeoutMs <=
  maxFetchWait

RequestPurgatory:

- Unused import.
- The parameterized types and overall tricky nature of this component make
  it somewhat difficult to follow. I (think) I understood it only after
  looking at its usage in KafkaApis, so the comments and javadocs (including
  class' summary on top) can only go so far.  Even so, I think the comments
  seem slightly out of sync with the code and can be improved a bit. E.g.,
  what is "given size" in the update method's comment? current keys in the
  comment for watch == the given request's keys. and so on.
- Also, it may be easier to follow if we do some renaming, but it's a matter
  of taste and I may have misunderstood the code to begin with:
  - I find it confusing that there's a map called watchers which is a map
from keys to Watcher objects, and the Watcher class itself has a
linked-list of delayed requests called watchers. May be unwieldy, but
how about renaming:
- RequestPurgatory.watchers to watchedRequestsForKey
- Watchers to WatchedRequests
- Watchers.watchers to requests
  - Rename DelayedRequest.satisfied to satisfiedOrExpired (I find it weird
that the reaper marks expired requests as satisfied.)
  - update -> maybeNotify?
- In collectSatisfiedRequests, the comment on "another thread has satisfied
  this request". That can only be the ExpiredRequestReaper thread right?
- It is slightly odd that we have to call the reaper's satisfyRequest method
  from Watcher. Would it work to move the unsatisfied counter up to
  RequestPurgatory?


> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira




[jira] [Commented] (KAFKA-48) Implement optional "long poll" support in fetch request

2012-04-17 Thread Jay Kreps (Commented) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13256099#comment-13256099
 ] 

Jay Kreps commented on KAFKA-48:


Joel, this is great feedback. I will address these issues in the commit since 
most are naming/documentation related.

> Implement optional "long poll" support in fetch request
> ---
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira