Thanks for the updates, Mickael. The proposal looks good to me. -Jason
On Wed, Jan 18, 2017 at 10:19 AM, Mickael Maison <mickael.mai...@gmail.com> wrote: > I've updated the KIP to mention this. > I've currently set the size limit to 1Kb. It's large enough so > group/heartbeat messages are smaller than it and also small enough so > the consumer memory usage stays under control. > > If there are no more comments, I'll restart the vote > > On Wed, Jan 18, 2017 at 2:28 AM, radai <radai.rosenbl...@gmail.com> wrote: > > i have (hopefully?) addressed Rajini's concern of muting all connections > > ahead of time on the KIP-72 PR. > > as for avoiding the pool for small allocations i think thats a great > idea. > > I also think you could implement it as a composite pool :-) > > (composite redirects all requests under size X to the NONE pool and > above X > > to some "real" pool) > > > > On Wed, Jan 11, 2017 at 8:05 AM, Mickael Maison < > mickael.mai...@gmail.com> > > wrote: > > > >> Ok thanks for the clarification. > >> I agree too, I don't want a new config parameter. From the numbers we > >> gathered (see Edoardo's comment above), it shouldn't be too hard to > >> pick a meaningful value > >> > >> On Wed, Jan 11, 2017 at 3:58 PM, Rajini Sivaram < > rajinisiva...@gmail.com> > >> wrote: > >> > Mickael, > >> > > >> > I had based the comment on KIP-72 description where brokers were > muting > >> all > >> > client channels once memory pool was empty. Having reviewed the PR > >> today, I > >> > think it may be fine to delay muting and allocate small buffers > outside > >> of > >> > the pool. I would still not want to have a config parameter to decide > >> what > >> > "small" means, a well chosen hard limit would suffice. > >> > > >> > On Wed, Jan 11, 2017 at 3:05 PM, Mickael Maison < > >> mickael.mai...@gmail.com> > >> > wrote: > >> > > >> >> Rajini, > >> >> > >> >> Why do you think we don't want to do the same for brokers ? > >> >> It feels like brokers would be affected the same way and could end up > >> >> delaying group/hearbeat requests. > >> >> > >> >> Also given queued.max.requests it seems unlikely that small requests > >> >> (<<1Kb) being allocated outside of the memory pool would cause OOM > >> >> exceptions > >> >> > >> >> > >> >> On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram < > rsiva...@pivotal.io> > >> >> wrote: > >> >> > Edo, > >> >> > > >> >> > I wouldn't introduce a new config entry, especially since you don't > >> need > >> >> it > >> >> > after KAFKA-4137. As a temporary measure that would work for > >> consumers. > >> >> But > >> >> > you probably don't want to do the same for brokers - will be worth > >> >> checking > >> >> > with Radai since the implementation will be based on KIP-72. To do > >> this > >> >> > only for consumers, you will need some conditions in the common > >> network > >> >> > code while allocating and releasing buffers. A bit messy, but > doable. > >> >> > > >> >> > > >> >> > > >> >> > On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar <eco...@uk.ibm.com > > > >> >> wrote: > >> >> > > >> >> >> Thanks Rajini, > >> >> >> Before Kafka-4137, we could avoid coordinator starvation without > >> making > >> >> a > >> >> >> special case for a special connection, > >> >> >> but rather simply, in applying the buffer.memory check only to > >> 'large' > >> >> >> responses > >> >> >> (e.g. size > 1k, possibly introducing a new config entry) in > >> >> >> > >> >> >> NetworkReceive.readFromReadableChannel(ReadableByteChannel) > >> >> >> > >> >> >> Essentially this would limit reading fetch responses but allow for > >> other > >> >> >> responses to be processed. > >> >> >> > >> >> >> This is a sample of sizes for responses I collected : > >> >> >> > >> >> >> ***** size=108 APIKEY=3 METADATA > >> >> >> ***** size=28 APIKEY=10 GROUP_COORDINATOR > >> >> >> ***** size=193 APIKEY=11 JOIN_GROUP > >> >> >> ***** size=39 APIKEY=14 SYNC_GROUP > >> >> >> ***** size=39 APIKEY=9 OFFSET_FETCH > >> >> >> ***** size=45 APIKEY=2 LIST_OFFSETS > >> >> >> ***** size=88926 APIKEY=1 FETCH > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> ***** size=6 APIKEY=12 HEARTBEAT > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> ***** size=6 APIKEY=12 HEARTBEAT > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> ***** size=45 APIKEY=1 FETCH > >> >> >> > >> >> >> What do you think? > >> >> >> -------------------------------------------------- > >> >> >> Edoardo Comar > >> >> >> IBM MessageHub > >> >> >> eco...@uk.ibm.com > >> >> >> IBM UK Ltd, Hursley Park, SO21 2JN > >> >> >> > >> >> >> IBM United Kingdom Limited Registered in England and Wales with > >> number > >> >> >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, > >> Hants. > >> >> PO6 > >> >> >> 3AU > >> >> >> > >> >> >> > >> >> >> > >> >> >> From: Rajini Sivaram <rajinisiva...@googlemail.com> > >> >> >> To: dev@kafka.apache.org > >> >> >> Date: 13/12/2016 17:27 > >> >> >> Subject: Re: [DISCUSS] KIP-81: Max in-flight fetches > >> >> >> > >> >> >> > >> >> >> > >> >> >> Coordinator starvation: For an implementation based on KIP-72, > there > >> >> will > >> >> >> be coordinator starvation without KAFKA-4137 since you would stop > >> >> reading > >> >> >> from sockets when the memory pool is full (the fact that > coordinator > >> >> >> messages are small doesn't help). I imagine you can work around > this > >> by > >> >> >> treating coordinator connections as special connections but that > >> spills > >> >> >> over to common network code. Separate NetworkClient for > coordinator > >> >> >> proposed in KAFKA-4137 would be much better. > >> >> >> > >> >> >> On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison < > >> >> mickael.mai...@gmail.com> > >> >> >> wrote: > >> >> >> > >> >> >> > Thanks for all the feedback. > >> >> >> > > >> >> >> > I've updated the KIP with all the details. > >> >> >> > Below are a few of the main points: > >> >> >> > > >> >> >> > - Overall memory usage of the consumer: > >> >> >> > I made it clear the memory pool is only used to store the raw > bytes > >> >> >> > from the network and that the decompressed/deserialized messages > >> are > >> >> >> > not stored in it but as extra memory on the heap. In addition, > the > >> >> >> > consumer also keeps track of other things (in flight requests, > >> >> >> > subscriptions, etc..) that account for extra memory as well. So > >> this > >> >> >> > is not a hard bound memory constraint but should still allow to > >> >> >> > roughly size how much memory can be used. > >> >> >> > > >> >> >> > - Relation with the existing settings: > >> >> >> > There are already 2 settings that deal with memory usage of the > >> >> >> > consumer. I suggest we lower the priority of > >> >> >> > `max.partition.fetch.bytes` (I wonder if we should attempt to > >> >> >> > deprecate it or increase its default value so it's a contraint > less > >> >> >> > likely to be hit) and have the new setting `buffer.memory` as > High. > >> >> >> > I'm a bit unsure what's the best default value for > >> `buffer.memory`, I > >> >> >> > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd > >> appreciate > >> >> >> > feedback. It should always at least be equal to > `max.fetch.bytes`. > >> >> >> > > >> >> >> > - Configuration name `buffer.memory`: > >> >> >> > I think it's the name that makes the most sense. It's aligned > with > >> the > >> >> >> > producer and as mentioned generic enough to allow future > changes if > >> >> >> > needed. > >> >> >> > > >> >> >> > - Coordination starvation: > >> >> >> > Yes this is a potential issue. I'd expect these requests to be > >> small > >> >> >> > enough to not be affected too much. If that's the case > KAFKA-4137 > >> >> >> > suggests a possible fix. > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk > > > >> >> wrote: > >> >> >> > > Makes sense Jay. > >> >> >> > > > >> >> >> > > Mickael, in addition to how we can compute defaults of the > other > >> >> >> settings > >> >> >> > > from `buffer.memory`, it would be good to specify what is > allowed > >> >> and > >> >> >> how > >> >> >> > > we handle the different cases (e.g. what do we do if > >> >> >> > > `max.partition.fetch.bytes` > >> >> >> > > is greater than `buffer.memory`, is that simply not allowed?). > >> >> >> > > > >> >> >> > > To summarise the gap between the ideal scenario (user > specifies > >> how > >> >> >> much > >> >> >> > > memory the consumer can use) and what is being proposed: > >> >> >> > > > >> >> >> > > 1. We will decompress and deserialize the data for one or more > >> >> >> partitions > >> >> >> > > in order to return them to the user and we don't account for > the > >> >> >> > increased > >> >> >> > > memory usage resulting from that. This is likely to be > >> significant > >> >> on > >> >> >> a > >> >> >> > per > >> >> >> > > record basis, but we try to do it for the minimal number of > >> records > >> >> >> > > possible within the constraints of the system. Currently the > >> >> >> constraints > >> >> >> > > are: we decompress and deserialize the data for a partition > at a > >> >> time > >> >> >> > > (default `max.partition.fetch.bytes` is 1MB, but this is a > soft > >> >> limit > >> >> >> in > >> >> >> > > case there are oversized messages) until we have enough > records > >> to > >> >> >> > > satisfy `max.poll.records` > >> >> >> > > (default 500) or there are no more completed fetches. It seems > >> like > >> >> >> this > >> >> >> > > may be OK for a lot of cases, but some tuning will still be > >> required > >> >> >> in > >> >> >> > > others. > >> >> >> > > > >> >> >> > > 2. We don't account for bookkeeping data structures or > >> intermediate > >> >> >> > objects > >> >> >> > > allocated during the general operation of the consumer. > Probably > >> >> >> > something > >> >> >> > > we have to live with as the cost/benefit of fixing this > doesn't > >> seem > >> >> >> > worth > >> >> >> > > it. > >> >> >> > > > >> >> >> > > Ismael > >> >> >> > > > >> >> >> > > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io> > >> >> wrote: > >> >> >> > > > >> >> >> > >> Hey Ismael, > >> >> >> > >> > >> >> >> > >> Yeah I think we are both saying the same thing---removing > only > >> >> works > >> >> >> if > >> >> >> > you > >> >> >> > >> have a truly optimal strategy. Actually even dynamically > >> computing > >> >> a > >> >> >> > >> reasonable default isn't totally obvious (do you set > >> >> fetch.max.bytes > >> >> >> to > >> >> >> > >> equal buffer.memory to try to queue up as much data in the > >> network > >> >> >> > buffers? > >> >> >> > >> Do you try to limit it to your socket.receive.buffer size so > >> that > >> >> you > >> >> >> > can > >> >> >> > >> read it in a single shot?). > >> >> >> > >> > >> >> >> > >> Regarding what is being measured, my interpretation was the > >> same as > >> >> >> > yours. > >> >> >> > >> I was just adding to the previous point that buffer.memory > >> setting > >> >> >> would > >> >> >> > >> not be a very close proxy for memory usage. Someone was > pointing > >> >> out > >> >> >> > that > >> >> >> > >> compression would make this true, and I was just adding that > >> even > >> >> >> > without > >> >> >> > >> compression the object overhead would lead to a high > expansion > >> >> >> factor. > >> >> >> > >> > >> >> >> > >> -Jay > >> >> >> > >> > >> >> >> > >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma < > >> ism...@juma.me.uk> > >> >> >> > wrote: > >> >> >> > >> > >> >> >> > >> > Hi Jay, > >> >> >> > >> > > >> >> >> > >> > About `max.partition.fetch.bytes`, yes it was an oversight > >> not to > >> >> >> > lower > >> >> >> > >> its > >> >> >> > >> > priority as part of KIP-74 given the existence of > >> >> `fetch.max.bytes` > >> >> >> > and > >> >> >> > >> the > >> >> >> > >> > fact that we can now make progress in the presence of > >> oversized > >> >> >> > messages > >> >> >> > >> > independently of either of those settings. > >> >> >> > >> > > >> >> >> > >> > I agree that we should try to set those values > automatically > >> >> based > >> >> >> on > >> >> >> > >> > `buffer.memory`, but I am not sure if we can have a truly > >> optimal > >> >> >> > >> strategy. > >> >> >> > >> > So, I'd go with reducing the priority to "low" instead of > >> >> removing > >> >> >> > >> > `fetch.max.bytes` and `max.partition.fetch.bytes` > altogether > >> for > >> >> >> now. > >> >> >> > If > >> >> >> > >> > experience in the field tells us that the auto strategy is > >> good > >> >> >> > enough, > >> >> >> > >> we > >> >> >> > >> > can consider removing them (yes, I know, it's unlikely to > >> happen > >> >> as > >> >> >> > there > >> >> >> > >> > won't be that much motivation then). > >> >> >> > >> > > >> >> >> > >> > Regarding the "conversion from packed bytes to java > objects" > >> >> >> comment, > >> >> >> > >> that > >> >> >> > >> > raises the question: what are we actually measuring here? > From > >> >> the > >> >> >> > KIP, > >> >> >> > >> > it's not too clear. My interpretation was that we were not > >> >> >> measuring > >> >> >> > the > >> >> >> > >> > memory usage of the Java objects. In that case, > >> `buffer.memory` > >> >> >> seems > >> >> >> > >> like > >> >> >> > >> > a reasonable name although perhaps the user's expectation > is > >> that > >> >> >> we > >> >> >> > >> would > >> >> >> > >> > measure the memory usage of the Java objects? > >> >> >> > >> > > >> >> >> > >> > Ismael > >> >> >> > >> > > >> >> >> > >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps < > j...@confluent.io> > >> >> >> wrote: > >> >> >> > >> > > >> >> >> > >> > > I think the question is whether we have a truly optimal > >> >> strategy > >> >> >> for > >> >> >> > >> > > deriving the partition- and fetch-level configs from the > >> global > >> >> >> > >> setting. > >> >> >> > >> > If > >> >> >> > >> > > we do then we should just get rid of them. If not, then > if > >> we > >> >> can > >> >> >> at > >> >> >> > >> > least > >> >> >> > >> > > derive usually good and never terrible settings from the > >> global > >> >> >> > limit > >> >> >> > >> at > >> >> >> > >> > > initialization time maybe we can set them automatically > >> unless > >> >> >> the > >> >> >> > user > >> >> >> > >> > > overrides with an explicit conifg. Even the latter would > >> let us > >> >> >> > mark it > >> >> >> > >> > low > >> >> >> > >> > > priority which at least takes it off the list of things > you > >> >> have > >> >> >> to > >> >> >> > >> grok > >> >> >> > >> > to > >> >> >> > >> > > use the consumer which I suspect would be much > appreciated > >> by > >> >> our > >> >> >> > poor > >> >> >> > >> > > users. > >> >> >> > >> > > > >> >> >> > >> > > Regardless it'd be nice to make sure we get an > explanation > >> of > >> >> the > >> >> >> > >> > > relationships between the remaining memory configs in the > >> KIP > >> >> and > >> >> >> in > >> >> >> > >> the > >> >> >> > >> > > docs. > >> >> >> > >> > > > >> >> >> > >> > > I agree that buffer.memory isn't bad. > >> >> >> > >> > > > >> >> >> > >> > > -Jay > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> > >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson < > >> >> >> > ja...@confluent.io> > >> >> >> > >> > > wrote: > >> >> >> > >> > > > >> >> >> > >> > > > Yeah, that's a good point. Perhaps in retrospect, it > would > >> >> have > >> >> >> > been > >> >> >> > >> > > better > >> >> >> > >> > > > to define `buffer.memory` first and let > `fetch.max.bytes` > >> be > >> >> >> based > >> >> >> > >> off > >> >> >> > >> > of > >> >> >> > >> > > > it. I like `buffer.memory` since it gives the consumer > >> nice > >> >> >> > symmetry > >> >> >> > >> > with > >> >> >> > >> > > > the producer and its generic naming gives us some > >> flexibility > >> >> >> > >> > internally > >> >> >> > >> > > > with how we use it. We could still do that I guess, if > >> we're > >> >> >> > willing > >> >> >> > >> to > >> >> >> > >> > > > deprecate `fetch.max.bytes` (one release after adding > >> it!). > >> >> >> > >> > > > > >> >> >> > >> > > > As for `max.partition.fetch.bytes`, it's noted in > KIP-74 > >> that > >> >> >> it > >> >> >> > is > >> >> >> > >> > still > >> >> >> > >> > > > useful in Kafka Streams, but I agree it makes sense to > >> lower > >> >> >> its > >> >> >> > >> > priority > >> >> >> > >> > > > in favor of `fetch.max.bytes`. > >> >> >> > >> > > > > >> >> >> > >> > > > -Jason > >> >> >> > >> > > > > >> >> >> > >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps < > >> j...@confluent.io > >> >> > > >> >> >> > wrote: > >> >> >> > >> > > > > >> >> >> > >> > > > > Jason, it's not just decompression but also the > >> conversion > >> >> >> from > >> >> >> > >> > packed > >> >> >> > >> > > > > bytes to java objects, right? That can be even larger > >> than > >> >> >> the > >> >> >> > >> > > > > decompression blow up. I think this may be okay, the > >> >> problem > >> >> >> may > >> >> >> > >> just > >> >> >> > >> > > be > >> >> >> > >> > > > > that the naming is a bit misleading. In the producer > you > >> >> are > >> >> >> > >> > literally > >> >> >> > >> > > > > allocating a buffer of that size, so the name > >> buffer.memory > >> >> >> > makes > >> >> >> > >> > > sense. > >> >> >> > >> > > > In > >> >> >> > >> > > > > this case it is something more like > >> >> >> max.bytes.read.per.poll.call > >> >> >> > >> > > > (terrible > >> >> >> > >> > > > > name, but maybe something like that?). > >> >> >> > >> > > > > > >> >> >> > >> > > > > Mickael, I'd second Jason's request for the default > and > >> >> >> expand > >> >> >> > on > >> >> >> > >> it. > >> >> >> > >> > > We > >> >> >> > >> > > > > currently have several consumer-related memory > >> >> >> > >> > > > > settings--max.partition.fetch.bytes, > fetch.max.bytes. I > >> >> don't > >> >> >> > >> think > >> >> >> > >> > it > >> >> >> > >> > > > is > >> >> >> > >> > > > > clear today how to set these. For example we mark > >> >> >> > >> > > > max.partition.fetch.bytes > >> >> >> > >> > > > > as high importance and fetch.max.bytes as medium, > but it > >> >> >> seems > >> >> >> > like > >> >> >> > >> > it > >> >> >> > >> > > > > would be the other way around. Can we think this > through > >> >> from > >> >> >> > the > >> >> >> > >> > point > >> >> >> > >> > > > of > >> >> >> > >> > > > > view of a lazy user? I.e. I have 64MB of space to use > >> for > >> >> my > >> >> >> > >> > consumer, > >> >> >> > >> > > in > >> >> >> > >> > > > > an ideal world I'd say, "hey consumer here is 64MB go > >> use > >> >> >> that > >> >> >> > as > >> >> >> > >> > > > > efficiently as possible" and not have to tune a > bunch of > >> >> >> > individual > >> >> >> > >> > > > things > >> >> >> > >> > > > > with complex relationships. Maybe one or both of the > >> >> existing > >> >> >> > >> > settings > >> >> >> > >> > > > can > >> >> >> > >> > > > > either be eliminated or at the least marked as low > >> priority > >> >> >> and > >> >> >> > we > >> >> >> > >> > can > >> >> >> > >> > > > > infer a reasonable default from the new config your > >> >> >> introducing? > >> >> >> > >> > > > > > >> >> >> > >> > > > > -jay > >> >> >> > >> > > > > > >> >> >> > >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson < > >> >> >> > >> ja...@confluent.io> > >> >> >> > >> > > > > wrote: > >> >> >> > >> > > > > > >> >> >> > >> > > > > > Hi Mickael, > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > I think the approach looks good, just a few minor > >> >> >> questions: > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > 1. The KIP doesn't say what the default value of > >> >> >> > `buffer.memory` > >> >> >> > >> > will > >> >> >> > >> > > > be. > >> >> >> > >> > > > > > Looks like we use 50MB as the default for > >> >> >> `fetch.max.bytes`, > >> >> >> > so > >> >> >> > >> > > perhaps > >> >> >> > >> > > > > it > >> >> >> > >> > > > > > makes sense to set the default based on that. Might > >> also > >> >> be > >> >> >> > worth > >> >> >> > >> > > > > > mentioning somewhere the constraint between the two > >> >> >> configs. > >> >> >> > >> > > > > > 2. To clarify, this limit only affects the > >> uncompressed > >> >> >> size > >> >> >> > of > >> >> >> > >> the > >> >> >> > >> > > > > fetched > >> >> >> > >> > > > > > data, right? The consumer may still exceed it in > >> order to > >> >> >> > store > >> >> >> > >> the > >> >> >> > >> > > > > > decompressed record data. We delay decompression > until > >> >> the > >> >> >> > >> records > >> >> >> > >> > > are > >> >> >> > >> > > > > > returned to the user, but because of > >> max.poll.records, we > >> >> >> may > >> >> >> > end > >> >> >> > >> > up > >> >> >> > >> > > > > > holding onto the decompressed data from a single > >> >> partition > >> >> >> > for a > >> >> >> > >> > few > >> >> >> > >> > > > > > iterations. I think this is fine, but probably > worth > >> >> noting > >> >> >> in > >> >> >> > >> the > >> >> >> > >> > > KIP. > >> >> >> > >> > > > > > 3. Is there any risk using the MemoryPool that, > after > >> we > >> >> >> fill > >> >> >> > up > >> >> >> > >> > the > >> >> >> > >> > > > > memory > >> >> >> > >> > > > > > with fetch data, we can starve the coordinator's > >> >> >> connection? > >> >> >> > >> > Suppose, > >> >> >> > >> > > > for > >> >> >> > >> > > > > > example, that we send a bunch of pre-fetches right > >> before > >> >> >> > >> returning > >> >> >> > >> > > to > >> >> >> > >> > > > > the > >> >> >> > >> > > > > > user. These fetches might return before the next > call > >> to > >> >> >> > poll(), > >> >> >> > >> in > >> >> >> > >> > > > which > >> >> >> > >> > > > > > case we might not have enough memory to receive > >> >> heartbeats, > >> >> >> > which > >> >> >> > >> > > would > >> >> >> > >> > > > > > block us from sending additional heartbeats until > the > >> >> next > >> >> >> > call > >> >> >> > >> to > >> >> >> > >> > > > > poll(). > >> >> >> > >> > > > > > Not sure it's a big problem since heartbeats are > tiny, > >> >> but > >> >> >> > might > >> >> >> > >> be > >> >> >> > >> > > > worth > >> >> >> > >> > > > > > thinking about. > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > Thanks, > >> >> >> > >> > > > > > Jason > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison < > >> >> >> > >> > > > mickael.mai...@gmail.com > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > wrote: > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > > It's been a few days since the last comments. > KIP-72 > >> >> vote > >> >> >> > seems > >> >> >> > >> > to > >> >> >> > >> > > > > > > have passed so if I don't get any new comments > I'll > >> >> start > >> >> >> > the > >> >> >> > >> > vote > >> >> >> > >> > > on > >> >> >> > >> > > > > > > Monday. > >> >> >> > >> > > > > > > Thanks > >> >> >> > >> > > > > > > > >> >> >> > >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai < > >> >> >> > >> > radai.rosenbl...@gmail.com > >> >> >> > >> > > > > >> >> >> > >> > > > > > wrote: > >> >> >> > >> > > > > > > > +1 - there's is a need for an effective way to > >> >> control > >> >> >> > kafka > >> >> >> > >> > > memory > >> >> >> > >> > > > > > > > consumption - both on the broker and on > clients. > >> >> >> > >> > > > > > > > i think we could even reuse the exact same > param > >> >> name - > >> >> >> > >> > > > > > > *queued.max.bytes *- > >> >> >> > >> > > > > > > > as it would serve the exact same purpose. > >> >> >> > >> > > > > > > > > >> >> >> > >> > > > > > > > also (and again its the same across the broker > and > >> >> >> > clients) > >> >> >> > >> > this > >> >> >> > >> > > > > bound > >> >> >> > >> > > > > > > > should also cover decompression, at some point. > >> >> >> > >> > > > > > > > the problem with that is that to the best of my > >> >> >> knowledge > >> >> >> > the > >> >> >> > >> > > > current > >> >> >> > >> > > > > > > wire > >> >> >> > >> > > > > > > > protocol does not declare the final, > uncompressed > >> >> size > >> >> >> of > >> >> >> > >> > > anything > >> >> >> > >> > > > up > >> >> >> > >> > > > > > > front > >> >> >> > >> > > > > > > > - all we know is the size of the compressed > >> buffer. > >> >> >> this > >> >> >> > may > >> >> >> > >> > > > require > >> >> >> > >> > > > > a > >> >> >> > >> > > > > > > > format change in the future to properly > support? > >> >> >> > >> > > > > > > > > >> >> >> > >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael > Maison < > >> >> >> > >> > > > > > > mickael.mai...@gmail.com> > >> >> >> > >> > > > > > > > wrote: > >> >> >> > >> > > > > > > > > >> >> >> > >> > > > > > > >> Thanks for all the replies. > >> >> >> > >> > > > > > > >> > >> >> >> > >> > > > > > > >> I've updated the KIP: > >> >> >> > >> > > > > > > >> https://cwiki.apache.org/ > >> >> confluence/display/KAFKA/KIP- > >> >> >> > >> > > > > > > >> 81%3A+Bound+Fetch+memory+ > usage+in+the+consumer > >> >> >> > >> > > > > > > >> The main point is to selectively read from > >> sockets > >> >> >> > instead > >> >> >> > >> of > >> >> >> > >> > > > > > > >> throttling FetchRequests sends. I also > mentioned > >> it > >> >> >> will > >> >> >> > be > >> >> >> > >> > > > reusing > >> >> >> > >> > > > > > > >> the MemoryPool implementation created in > KIP-72 > >> >> >> instead > >> >> >> > of > >> >> >> > >> > > adding > >> >> >> > >> > > > > > > >> another memory tracking method. > >> >> >> > >> > > > > > > >> > >> >> >> > >> > > > > > > >> Please have another look. As always, comments > are > >> >> >> > welcome ! > >> >> >> > >> > > > > > > >> > >> >> >> > >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai < > >> >> >> > >> > > > radai.rosenbl...@gmail.com> > >> >> >> > >> > > > > > > wrote: > >> >> >> > >> > > > > > > >> > selectively reading from sockets achieves > >> memory > >> >> >> > control > >> >> >> > >> (up > >> >> >> > >> > > to > >> >> >> > >> > > > > and > >> >> >> > >> > > > > > > not > >> >> >> > >> > > > > > > >> > including talk of (de)compression) > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> > this is exactly what i (also, even mostly) > did > >> for > >> >> >> > kip-72 > >> >> >> > >> - > >> >> >> > >> > > > which > >> >> >> > >> > > > > i > >> >> >> > >> > > > > > > hope > >> >> >> > >> > > > > > > >> in > >> >> >> > >> > > > > > > >> > itself should be a reason to think about > both > >> KIPs > >> >> >> at > >> >> >> > the > >> >> >> > >> > same > >> >> >> > >> > > > > time > >> >> >> > >> > > > > > > >> because > >> >> >> > >> > > > > > > >> > the changes will be similar (at least in > >> intent) > >> >> and > >> >> >> > might > >> >> >> > >> > > > result > >> >> >> > >> > > > > in > >> >> >> > >> > > > > > > >> > duplicated effort. > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> > a pool API is a way to "scale" all the way > from > >> >> just > >> >> >> > >> > > > maintaining a > >> >> >> > >> > > > > > > >> variable > >> >> >> > >> > > > > > > >> > holding amount of available memory (which is > >> what > >> >> my > >> >> >> > >> current > >> >> >> > >> > > > > kip-72 > >> >> >> > >> > > > > > > code > >> >> >> > >> > > > > > > >> > does and what this kip proposes IIUC) all > the > >> way > >> >> up > >> >> >> to > >> >> >> > >> > > actually > >> >> >> > >> > > > > > > re-using > >> >> >> > >> > > > > > > >> > buffers without any changes to the code > using > >> the > >> >> >> pool > >> >> >> > - > >> >> >> > >> > just > >> >> >> > >> > > > drop > >> >> >> > >> > > > > > in > >> >> >> > >> > > > > > > a > >> >> >> > >> > > > > > > >> > different pool impl. > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> > for "edge nodes" (producer/consumer) the > >> >> performance > >> >> >> > gain > >> >> >> > >> in > >> >> >> > >> > > > > > actually > >> >> >> > >> > > > > > > >> > pooling large buffers may be arguable, but i > >> >> suspect > >> >> >> > for > >> >> >> > >> > > brokers > >> >> >> > >> > > > > > > >> regularly > >> >> >> > >> > > > > > > >> > operating on 1MB-sized requests (which is > the > >> norm > >> >> >> at > >> >> >> > >> > > linkedin) > >> >> >> > >> > > > > the > >> >> >> > >> > > > > > > >> > resulting memory fragmentation is an actual > >> >> >> bottleneck > >> >> >> > (i > >> >> >> > >> > have > >> >> >> > >> > > > > > initial > >> >> >> > >> > > > > > > >> > micro-benchmark results to back this up but > >> have > >> >> not > >> >> >> > had > >> >> >> > >> the > >> >> >> > >> > > > time > >> >> >> > >> > > > > to > >> >> >> > >> > > > > > > do a > >> >> >> > >> > > > > > > >> > full profiling run). > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> > so basically I'm saying we may be doing > (very) > >> >> >> similar > >> >> >> > >> > things > >> >> >> > >> > > in > >> >> >> > >> > > > > > > mostly > >> >> >> > >> > > > > > > >> the > >> >> >> > >> > > > > > > >> > same areas of code. > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael > >> Maison < > >> >> >> > >> > > > > > > >> mickael.mai...@gmail.com> > >> >> >> > >> > > > > > > >> > wrote: > >> >> >> > >> > > > > > > >> > > >> >> >> > >> > > > > > > >> >> electively reading from the socket should > >> enable > >> >> to > >> >> >> > >> > > > > > > >> >> control the memory usage without impacting > >> >> >> > performance. > >> >> >> > >> > I've > >> >> >> > >> > > > had > >> >> >> > >> > > > > > look > >> >> >> > >> > > > > > > >> >> at that today and I can see how that would > >> work. > >> >> >> > >> > > > > > > >> >> I'll update the KIP accordingly tomorrow. > >> >> >> > >> > > > > > > >> >> > >> >> >> > >> > > > > > > >> > >> >> >> > >> > > > > > > > >> >> >> > >> > > > > > > >> >> >> > >> > > > > > >> >> >> > >> > > > > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> >> > >> >> >> > >> >> >> -- > >> >> >> Regards, > >> >> >> > >> >> >> Rajini > >> >> >> > >> >> >> > >> >> >> > >> >> >> Unless stated otherwise above: > >> >> >> IBM United Kingdom Limited - Registered in England and Wales with > >> number > >> >> >> 741598. > >> >> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire > >> PO6 > >> >> 3AU > >> >> >> > >> >> > >> >