[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-----------------------------

    Attachment: KAFKA-598-v2.patch

Here is an updated patch. After discussing with Jay, here is a proposal that
the patch implements:

- fetch.size now means the maximum fetch size that will be used across all
  partitions in any given multi-fetch attempt.
- If there are incomplete partitions, then log a warning and then serially
  attempt fetching each of those partitions with the fetch.size.

The main drawbacks are as discussed earlier - i.e., there is a change in the
semantics of fetch.size, so many clients may need to be aware of this and
reconfigure their fetch size. If there are several incomplete partitions,
the serial fetches could cause the consumer to start lagging. However, this
is slightly better (and no worse) than the current behavior of getting
wedged.

Couple other comments:

- In the patch, partitionFetchSize (in AbstractFetcherThread) may be very
  small or even zero - but I think that's fine since there is no sensible
  "minMessageSize" and not increasing the partitionFetchSize when consuming
  a large number of partitions would be a client misconfiguration.

- In this patch, the check on whether a partition is incomplete or not is
  accomplished by fetching with the configured fetch.size and then measuring
  validBytes. This is wasteful and can be addressed by preventing the broker
  from responding with an incomplete message, and setting a new error code
  (say, FetchSizeTooSmall) in the FetchResponse. I think we can defer this
  to trunk.

- Another thing that Jay and I discussed a bit: the use of folds vs.
  mutables. I had refactored the code to use foldLeft to avoid using mutable
  sets. Folding and immutables forces you to check that you have accounted
  for all possible paths to the end result - which is helpful especially
  when there are multiple such paths (e.g., cases in a match).  Personally I
  think it is more readable as well, but that is a matter of preference. So 
  if people think we're better off with mutables let me know. In this case,
  actually Set.reduce would be more suitable, but it is unavailable in scala
  2.8.x.

- I would like to further divide the fetch size in ConsumerFetcherThread by
  (queuedchunks.max * numBrokersToFetchFrom) - so regular consumers can also 
  bound memory usage. Let me know if there are any objections to that. For
  one, consumer.max.mem may be a better config name than repurposing the
  fetch.size config but at the same time I would prefer not changing any
  configs. I can post that diff separately.

- The above also raises the separate question of whether we want to queue
  chunks for the replica fetchers. The main issue with this is that advance
  fetch requests from followers could (currently) cause the leader the move
  the HW prematurely. i.e., we would have to handle for that but it might be
  useful to implement this in the future. Right now, all partitions must be
  appended before the next fetch is issued - if we "fetch ahead" then the
  fetch request network I/O can proceed in parallel with disk I/O.

So this is an 0.8 patch, that can be applied to trunk as well (unless we
decide it is not a "blocker" that should go only into trunk). After this, we
can add the additional error code in the FetchResponse if people are okay
with the overall approach.

                
> decouple fetch size from max message size
> -----------------------------------------
>
>                 Key: KAFKA-598
>                 URL: https://issues.apache.org/jira/browse/KAFKA-598
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to