[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-

Attachment: KAFKA-598-v1.patch

I spent the better part of a day rebasing - serves me right for letting this
patch sit for so long.

The basic idea is to keep track of incomplete partitions in a multi-fetch
request and reissue a fetch request with a higher fetch size for those
incomplete partitions.

I had considered the possibility of "pipelining" fetch requests for
incomplete partitions - i.e., without using an "upper" fetch size. That
would entail issuing fetch requests at increasing offsets (with the same
fetch size) until the message is complete - during which the (partial)
message would be buffered. With this approach we would probably add an
additional "maxFetchMem" config.  However, with logical offsets we don't
have byte-addressability anymore - so it is not possible right now.
Furthermore, with a maxFetchMem param it becomes somewhat similar to the
upperFetchSize approach (in the sense that the client has to be prepared to
handle a certain amount of memory) - so we don't really gain much. The ideal
case would be to support streaming over a single fetch request but this is
obviously a much more complex feature to implement.

Also fixed a bug in the use of partitionMapLock - i.e., one line synchronized
on the reentrant lock instead of locking it.

BTW, for the ReplicaFetchTest change to make sense I could have it expect to 
"fail" with a smaller upper fetch size, and then repeat with a higher upper
fetch size, but that would add to the test duration - and it's not mocked
out.


> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-12-04 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-

Attachment: KAFKA-598-v2.patch

Here is an updated patch. After discussing with Jay, here is a proposal that
the patch implements:

- fetch.size now means the maximum fetch size that will be used across all
  partitions in any given multi-fetch attempt.
- If there are incomplete partitions, then log a warning and then serially
  attempt fetching each of those partitions with the fetch.size.

The main drawbacks are as discussed earlier - i.e., there is a change in the
semantics of fetch.size, so many clients may need to be aware of this and
reconfigure their fetch size. If there are several incomplete partitions,
the serial fetches could cause the consumer to start lagging. However, this
is slightly better (and no worse) than the current behavior of getting
wedged.

Couple other comments:

- In the patch, partitionFetchSize (in AbstractFetcherThread) may be very
  small or even zero - but I think that's fine since there is no sensible
  "minMessageSize" and not increasing the partitionFetchSize when consuming
  a large number of partitions would be a client misconfiguration.

- In this patch, the check on whether a partition is incomplete or not is
  accomplished by fetching with the configured fetch.size and then measuring
  validBytes. This is wasteful and can be addressed by preventing the broker
  from responding with an incomplete message, and setting a new error code
  (say, FetchSizeTooSmall) in the FetchResponse. I think we can defer this
  to trunk.

- Another thing that Jay and I discussed a bit: the use of folds vs.
  mutables. I had refactored the code to use foldLeft to avoid using mutable
  sets. Folding and immutables forces you to check that you have accounted
  for all possible paths to the end result - which is helpful especially
  when there are multiple such paths (e.g., cases in a match).  Personally I
  think it is more readable as well, but that is a matter of preference. So 
  if people think we're better off with mutables let me know. In this case,
  actually Set.reduce would be more suitable, but it is unavailable in scala
  2.8.x.

- I would like to further divide the fetch size in ConsumerFetcherThread by
  (queuedchunks.max * numBrokersToFetchFrom) - so regular consumers can also 
  bound memory usage. Let me know if there are any objections to that. For
  one, consumer.max.mem may be a better config name than repurposing the
  fetch.size config but at the same time I would prefer not changing any
  configs. I can post that diff separately.

- The above also raises the separate question of whether we want to queue
  chunks for the replica fetchers. The main issue with this is that advance
  fetch requests from followers could (currently) cause the leader the move
  the HW prematurely. i.e., we would have to handle for that but it might be
  useful to implement this in the future. Right now, all partitions must be
  appended before the next fetch is issued - if we "fetch ahead" then the
  fetch request network I/O can proceed in parallel with disk I/O.

So this is an 0.8 patch, that can be applied to trunk as well (unless we
decide it is not a "blocker" that should go only into trunk). After this, we
can add the additional error code in the FetchResponse if people are okay
with the overall approach.


> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-12-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-598:
--

Priority: Blocker  (was: Major)

> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-12-14 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-

Attachment: KAFKA-598-v3.patch

Quick overview of revised patch:

1 - Addressed your comment about the previous behavior in ConsumerIterator
  (good catch on that!) and the config defaults.
2 - Changed semantics of fetch size to max memory. Max mem is a long (as int
  would currently limit to 2G). The actual partition fetch size is checked
  for overflow (in which case it is set to Int.MaxValue).
3 - Also introduced a DeprecatedProperties convenience class that will be
  checked upon config verification. I added this because i think max.memory
  is a more meaningful config than fetch.size and we can use this to
  deprecate other configs if needed.
4 - The partition count is a volatile int - I chose that over a method only to
  avoid traversal (for each request) to determine the count.


> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
> KAFKA-598-v3.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2013-01-21 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-598:


Labels: p4  (was: )

> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
>  Labels: p4
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
> KAFKA-598-v3.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira