[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joel Koshy updated KAFKA-598: ----------------------------- Attachment: KAFKA-598-v2.patch Here is an updated patch. After discussing with Jay, here is a proposal that the patch implements: - fetch.size now means the maximum fetch size that will be used across all partitions in any given multi-fetch attempt. - If there are incomplete partitions, then log a warning and then serially attempt fetching each of those partitions with the fetch.size. The main drawbacks are as discussed earlier - i.e., there is a change in the semantics of fetch.size, so many clients may need to be aware of this and reconfigure their fetch size. If there are several incomplete partitions, the serial fetches could cause the consumer to start lagging. However, this is slightly better (and no worse) than the current behavior of getting wedged. Couple other comments: - In the patch, partitionFetchSize (in AbstractFetcherThread) may be very small or even zero - but I think that's fine since there is no sensible "minMessageSize" and not increasing the partitionFetchSize when consuming a large number of partitions would be a client misconfiguration. - In this patch, the check on whether a partition is incomplete or not is accomplished by fetching with the configured fetch.size and then measuring validBytes. This is wasteful and can be addressed by preventing the broker from responding with an incomplete message, and setting a new error code (say, FetchSizeTooSmall) in the FetchResponse. I think we can defer this to trunk. - Another thing that Jay and I discussed a bit: the use of folds vs. mutables. I had refactored the code to use foldLeft to avoid using mutable sets. Folding and immutables forces you to check that you have accounted for all possible paths to the end result - which is helpful especially when there are multiple such paths (e.g., cases in a match). Personally I think it is more readable as well, but that is a matter of preference. So if people think we're better off with mutables let me know. In this case, actually Set.reduce would be more suitable, but it is unavailable in scala 2.8.x. - I would like to further divide the fetch size in ConsumerFetcherThread by (queuedchunks.max * numBrokersToFetchFrom) - so regular consumers can also bound memory usage. Let me know if there are any objections to that. For one, consumer.max.mem may be a better config name than repurposing the fetch.size config but at the same time I would prefer not changing any configs. I can post that diff separately. - The above also raises the separate question of whether we want to queue chunks for the replica fetchers. The main issue with this is that advance fetch requests from followers could (currently) cause the leader the move the HW prematurely. i.e., we would have to handle for that but it might be useful to implement this in the future. Right now, all partitions must be appended before the next fetch is issued - if we "fetch ahead" then the fetch request network I/O can proceed in parallel with disk I/O. So this is an 0.8 patch, that can be applied to trunk as well (unless we decide it is not a "blocker" that should go only into trunk). After this, we can add the additional error code in the FetchResponse if people are okay with the overall approach. > decouple fetch size from max message size > ----------------------------------------- > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira