I've updated the KIP to mention this.
I've currently set the size limit to 1Kb. It's large enough so
group/heartbeat messages are smaller than it and also small enough so
the consumer memory usage stays under control.

If there are no more comments, I'll restart the vote

On Wed, Jan 18, 2017 at 2:28 AM, radai <radai.rosenbl...@gmail.com> wrote:
> i have (hopefully?) addressed Rajini's concern of muting all connections
> ahead of time on the KIP-72 PR.
> as for avoiding the pool for small allocations i think thats a great idea.
> I also think you could implement it as a composite pool :-)
> (composite redirects all requests under size X to the NONE pool and above X
> to some "real" pool)
>
> On Wed, Jan 11, 2017 at 8:05 AM, Mickael Maison <mickael.mai...@gmail.com>
> wrote:
>
>> Ok thanks for the clarification.
>> I agree too, I don't want a new config parameter. From the numbers we
>> gathered (see Edoardo's comment above), it shouldn't be too hard to
>> pick a meaningful value
>>
>> On Wed, Jan 11, 2017 at 3:58 PM, Rajini Sivaram <rajinisiva...@gmail.com>
>> wrote:
>> > Mickael,
>> >
>> > I had based the comment on KIP-72 description where brokers were muting
>> all
>> > client channels once memory pool was empty. Having reviewed the PR
>> today, I
>> > think it may be fine to delay muting and allocate small buffers outside
>> of
>> > the pool. I would still not want to have a config parameter to decide
>> what
>> > "small" means, a well chosen hard limit would suffice.
>> >
>> > On Wed, Jan 11, 2017 at 3:05 PM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> >> Rajini,
>> >>
>> >> Why do you think we don't want to do the same for brokers ?
>> >> It feels like brokers would be affected the same way and could end up
>> >> delaying group/hearbeat requests.
>> >>
>> >> Also given queued.max.requests it seems unlikely that small requests
>> >> (<<1Kb) being allocated outside of the memory pool would cause OOM
>> >> exceptions
>> >>
>> >>
>> >> On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram <rsiva...@pivotal.io>
>> >> wrote:
>> >> > Edo,
>> >> >
>> >> > I wouldn't introduce a new config entry, especially since you don't
>> need
>> >> it
>> >> > after KAFKA-4137. As a temporary measure that would work for
>> consumers.
>> >> But
>> >> > you probably don't want to do the same for brokers - will be worth
>> >> checking
>> >> > with Radai since the implementation will be based on KIP-72. To do
>> this
>> >> > only for consumers, you will need some conditions in the common
>> network
>> >> > code while allocating and releasing buffers. A bit messy, but doable.
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar <eco...@uk.ibm.com>
>> >> wrote:
>> >> >
>> >> >> Thanks Rajini,
>> >> >> Before Kafka-4137, we could avoid coordinator starvation without
>> making
>> >> a
>> >> >> special case for a special connection,
>> >> >> but rather simply, in applying the buffer.memory check only to
>> 'large'
>> >> >> responses
>> >> >> (e.g.  size > 1k, possibly introducing a new config entry) in
>> >> >>
>> >> >> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
>> >> >>
>> >> >> Essentially this would limit reading fetch responses but allow for
>> other
>> >> >> responses to be processed.
>> >> >>
>> >> >> This is a sample of sizes for responses I collected :
>> >> >>
>> >> >> ***** size=108 APIKEY=3 METADATA
>> >> >> *****  size=28 APIKEY=10 GROUP_COORDINATOR
>> >> >> *****  size=193 APIKEY=11 JOIN_GROUP
>> >> >> *****  size=39 APIKEY=14 SYNC_GROUP
>> >> >> *****  size=39 APIKEY=9 OFFSET_FETCH
>> >> >> *****  size=45 APIKEY=2 LIST_OFFSETS
>> >> >> *****  size=88926 APIKEY=1 FETCH
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >> *****  size=6 APIKEY=12 HEARTBEAT
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >> *****  size=6 APIKEY=12 HEARTBEAT
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >> *****  size=45 APIKEY=1 FETCH
>> >> >>
>> >> >> What do you think?
>> >> >> --------------------------------------------------
>> >> >> Edoardo Comar
>> >> >> IBM MessageHub
>> >> >> eco...@uk.ibm.com
>> >> >> IBM UK Ltd, Hursley Park, SO21 2JN
>> >> >>
>> >> >> IBM United Kingdom Limited Registered in England and Wales with
>> number
>> >> >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hants.
>> >> PO6
>> >> >> 3AU
>> >> >>
>> >> >>
>> >> >>
>> >> >> From:   Rajini Sivaram <rajinisiva...@googlemail.com>
>> >> >> To:     dev@kafka.apache.org
>> >> >> Date:   13/12/2016 17:27
>> >> >> Subject:        Re: [DISCUSS] KIP-81: Max in-flight fetches
>> >> >>
>> >> >>
>> >> >>
>> >> >> Coordinator starvation: For an implementation based on KIP-72, there
>> >> will
>> >> >> be coordinator starvation without KAFKA-4137 since you would stop
>> >> reading
>> >> >> from sockets when the memory pool is full (the fact that coordinator
>> >> >> messages are small doesn't help). I imagine you can work around this
>> by
>> >> >> treating coordinator connections as special connections but that
>> spills
>> >> >> over to common network code. Separate NetworkClient for coordinator
>> >> >> proposed in KAFKA-4137 would be much better.
>> >> >>
>> >> >> On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison <
>> >> mickael.mai...@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Thanks for all the feedback.
>> >> >> >
>> >> >> > I've updated the KIP with all the details.
>> >> >> > Below are a few of the main points:
>> >> >> >
>> >> >> > - Overall memory usage of the consumer:
>> >> >> > I made it clear the memory pool is only used to store the raw bytes
>> >> >> > from the network and that the decompressed/deserialized messages
>> are
>> >> >> > not stored in it but as extra memory on the heap. In addition, the
>> >> >> > consumer also keeps track of other things (in flight requests,
>> >> >> > subscriptions, etc..) that account for extra memory as well. So
>> this
>> >> >> > is not a hard bound memory constraint but should still allow to
>> >> >> > roughly size how much memory can be used.
>> >> >> >
>> >> >> > - Relation with the existing settings:
>> >> >> > There are already 2 settings that deal with memory usage of the
>> >> >> > consumer. I suggest we lower the priority of
>> >> >> > `max.partition.fetch.bytes` (I wonder if we should attempt to
>> >> >> > deprecate it or increase its default value so it's a contraint less
>> >> >> > likely to be hit) and have the new setting `buffer.memory` as High.
>> >> >> > I'm a bit unsure what's the best default value for
>> `buffer.memory`, I
>> >> >> > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd
>> appreciate
>> >> >> > feedback. It should always at least be equal to `max.fetch.bytes`.
>> >> >> >
>> >> >> > - Configuration name `buffer.memory`:
>> >> >> > I think it's the name that makes the most sense. It's aligned with
>> the
>> >> >> > producer and as mentioned generic enough to allow future changes if
>> >> >> > needed.
>> >> >> >
>> >> >> > - Coordination starvation:
>> >> >> > Yes this is a potential issue. I'd expect these requests to be
>> small
>> >> >> > enough to not be affected too much. If that's the case KAFKA-4137
>> >> >> > suggests a possible fix.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk>
>> >> wrote:
>> >> >> > > Makes sense Jay.
>> >> >> > >
>> >> >> > > Mickael, in addition to how we can compute defaults of the other
>> >> >> settings
>> >> >> > > from `buffer.memory`, it would be good to specify what is allowed
>> >> and
>> >> >> how
>> >> >> > > we handle the different cases (e.g. what do we do if
>> >> >> > > `max.partition.fetch.bytes`
>> >> >> > > is greater than `buffer.memory`, is that simply not allowed?).
>> >> >> > >
>> >> >> > > To summarise the gap between the ideal scenario (user specifies
>> how
>> >> >> much
>> >> >> > > memory the consumer can use) and what is being proposed:
>> >> >> > >
>> >> >> > > 1. We will decompress and deserialize the data for one or more
>> >> >> partitions
>> >> >> > > in order to return them to the user and we don't account for the
>> >> >> > increased
>> >> >> > > memory usage resulting from that. This is likely to be
>> significant
>> >> on
>> >> >> a
>> >> >> > per
>> >> >> > > record basis, but we try to do it for the minimal number of
>> records
>> >> >> > > possible within the constraints of the system. Currently the
>> >> >> constraints
>> >> >> > > are: we decompress and deserialize the data for a partition at a
>> >> time
>> >> >> > > (default `max.partition.fetch.bytes` is 1MB, but this is a soft
>> >> limit
>> >> >> in
>> >> >> > > case there are oversized messages) until we have enough records
>> to
>> >> >> > > satisfy `max.poll.records`
>> >> >> > > (default 500) or there are no more completed fetches. It seems
>> like
>> >> >> this
>> >> >> > > may be OK for a lot of cases, but some tuning will still be
>> required
>> >> >> in
>> >> >> > > others.
>> >> >> > >
>> >> >> > > 2. We don't account for bookkeeping data structures or
>> intermediate
>> >> >> > objects
>> >> >> > > allocated during the general operation of the consumer. Probably
>> >> >> > something
>> >> >> > > we have to live with as the cost/benefit of fixing this doesn't
>> seem
>> >> >> > worth
>> >> >> > > it.
>> >> >> > >
>> >> >> > > Ismael
>> >> >> > >
>> >> >> > > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io>
>> >> wrote:
>> >> >> > >
>> >> >> > >> Hey Ismael,
>> >> >> > >>
>> >> >> > >> Yeah I think we are both saying the same thing---removing only
>> >> works
>> >> >> if
>> >> >> > you
>> >> >> > >> have a truly optimal strategy. Actually even dynamically
>> computing
>> >> a
>> >> >> > >> reasonable default isn't totally obvious (do you set
>> >> fetch.max.bytes
>> >> >> to
>> >> >> > >> equal buffer.memory to try to queue up as much data in the
>> network
>> >> >> > buffers?
>> >> >> > >> Do you try to limit it to your socket.receive.buffer size so
>> that
>> >> you
>> >> >> > can
>> >> >> > >> read it in a single shot?).
>> >> >> > >>
>> >> >> > >> Regarding what is being measured, my interpretation was the
>> same as
>> >> >> > yours.
>> >> >> > >> I was just adding to the previous point that buffer.memory
>> setting
>> >> >> would
>> >> >> > >> not be a very close proxy for memory usage. Someone was pointing
>> >> out
>> >> >> > that
>> >> >> > >> compression would make this true, and I was just adding that
>> even
>> >> >> > without
>> >> >> > >> compression the object overhead would lead to a high expansion
>> >> >> factor.
>> >> >> > >>
>> >> >> > >> -Jay
>> >> >> > >>
>> >> >> > >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <
>> ism...@juma.me.uk>
>> >> >> > wrote:
>> >> >> > >>
>> >> >> > >> > Hi Jay,
>> >> >> > >> >
>> >> >> > >> > About `max.partition.fetch.bytes`, yes it was an oversight
>> not to
>> >> >> > lower
>> >> >> > >> its
>> >> >> > >> > priority as part of KIP-74 given the existence of
>> >> `fetch.max.bytes`
>> >> >> > and
>> >> >> > >> the
>> >> >> > >> > fact that we can now make progress in the presence of
>> oversized
>> >> >> > messages
>> >> >> > >> > independently of either of those settings.
>> >> >> > >> >
>> >> >> > >> > I agree that we should try to set those values automatically
>> >> based
>> >> >> on
>> >> >> > >> > `buffer.memory`, but I am not sure if we can have a truly
>> optimal
>> >> >> > >> strategy.
>> >> >> > >> > So, I'd go with reducing the priority to "low" instead of
>> >> removing
>> >> >> > >> > `fetch.max.bytes` and `max.partition.fetch.bytes` altogether
>> for
>> >> >> now.
>> >> >> > If
>> >> >> > >> > experience in the field tells us that the auto strategy is
>> good
>> >> >> > enough,
>> >> >> > >> we
>> >> >> > >> > can consider removing them (yes, I know, it's unlikely to
>> happen
>> >> as
>> >> >> > there
>> >> >> > >> > won't be that much motivation then).
>> >> >> > >> >
>> >> >> > >> > Regarding the "conversion from packed bytes to java objects"
>> >> >> comment,
>> >> >> > >> that
>> >> >> > >> > raises the question: what are we actually measuring here? From
>> >> the
>> >> >> > KIP,
>> >> >> > >> > it's not too clear. My interpretation was that we were not
>> >> >> measuring
>> >> >> > the
>> >> >> > >> > memory usage of the Java objects. In that case,
>> `buffer.memory`
>> >> >> seems
>> >> >> > >> like
>> >> >> > >> > a reasonable name although perhaps the user's expectation is
>> that
>> >> >> we
>> >> >> > >> would
>> >> >> > >> > measure the memory usage of the Java objects?
>> >> >> > >> >
>> >> >> > >> > Ismael
>> >> >> > >> >
>> >> >> > >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io>
>> >> >> wrote:
>> >> >> > >> >
>> >> >> > >> > > I think the question is whether we have a truly optimal
>> >> strategy
>> >> >> for
>> >> >> > >> > > deriving the partition- and fetch-level configs from the
>> global
>> >> >> > >> setting.
>> >> >> > >> > If
>> >> >> > >> > > we do then we should just get rid of them. If not, then if
>> we
>> >> can
>> >> >> at
>> >> >> > >> > least
>> >> >> > >> > > derive usually good and never terrible settings from the
>> global
>> >> >> > limit
>> >> >> > >> at
>> >> >> > >> > > initialization time maybe we can set them automatically
>> unless
>> >> >> the
>> >> >> > user
>> >> >> > >> > > overrides with an explicit conifg. Even the latter would
>> let us
>> >> >> > mark it
>> >> >> > >> > low
>> >> >> > >> > > priority which at least takes it off the list of things you
>> >> have
>> >> >> to
>> >> >> > >> grok
>> >> >> > >> > to
>> >> >> > >> > > use the consumer which I suspect would be much appreciated
>> by
>> >> our
>> >> >> > poor
>> >> >> > >> > > users.
>> >> >> > >> > >
>> >> >> > >> > > Regardless it'd be nice to make sure we get an explanation
>> of
>> >> the
>> >> >> > >> > > relationships between the remaining memory configs in the
>> KIP
>> >> and
>> >> >> in
>> >> >> > >> the
>> >> >> > >> > > docs.
>> >> >> > >> > >
>> >> >> > >> > > I agree that buffer.memory isn't bad.
>> >> >> > >> > >
>> >> >> > >> > > -Jay
>> >> >> > >> > >
>> >> >> > >> > >
>> >> >> > >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <
>> >> >> > ja...@confluent.io>
>> >> >> > >> > > wrote:
>> >> >> > >> > >
>> >> >> > >> > > > Yeah, that's a good point. Perhaps in retrospect, it would
>> >> have
>> >> >> > been
>> >> >> > >> > > better
>> >> >> > >> > > > to define `buffer.memory` first and let `fetch.max.bytes`
>> be
>> >> >> based
>> >> >> > >> off
>> >> >> > >> > of
>> >> >> > >> > > > it. I like `buffer.memory` since it gives the consumer
>> nice
>> >> >> > symmetry
>> >> >> > >> > with
>> >> >> > >> > > > the producer and its generic naming gives us some
>> flexibility
>> >> >> > >> > internally
>> >> >> > >> > > > with how we use it. We could still do that I guess, if
>> we're
>> >> >> > willing
>> >> >> > >> to
>> >> >> > >> > > > deprecate `fetch.max.bytes` (one release after adding
>> it!).
>> >> >> > >> > > >
>> >> >> > >> > > > As for `max.partition.fetch.bytes`, it's noted in KIP-74
>> that
>> >> >> it
>> >> >> > is
>> >> >> > >> > still
>> >> >> > >> > > > useful in Kafka Streams, but I agree it makes sense to
>> lower
>> >> >> its
>> >> >> > >> > priority
>> >> >> > >> > > > in favor of `fetch.max.bytes`.
>> >> >> > >> > > >
>> >> >> > >> > > > -Jason
>> >> >> > >> > > >
>> >> >> > >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <
>> j...@confluent.io
>> >> >
>> >> >> > wrote:
>> >> >> > >> > > >
>> >> >> > >> > > > > Jason, it's not just decompression but also the
>> conversion
>> >> >> from
>> >> >> > >> > packed
>> >> >> > >> > > > > bytes to java objects, right? That can be even larger
>> than
>> >> >> the
>> >> >> > >> > > > > decompression blow up. I think this may be okay, the
>> >> problem
>> >> >> may
>> >> >> > >> just
>> >> >> > >> > > be
>> >> >> > >> > > > > that the naming is a bit misleading. In the producer you
>> >> are
>> >> >> > >> > literally
>> >> >> > >> > > > > allocating a buffer of that size, so the name
>> buffer.memory
>> >> >> > makes
>> >> >> > >> > > sense.
>> >> >> > >> > > > In
>> >> >> > >> > > > > this case it is something more like
>> >> >> max.bytes.read.per.poll.call
>> >> >> > >> > > > (terrible
>> >> >> > >> > > > > name, but maybe something like that?).
>> >> >> > >> > > > >
>> >> >> > >> > > > > Mickael, I'd second Jason's request for the default and
>> >> >> expand
>> >> >> > on
>> >> >> > >> it.
>> >> >> > >> > > We
>> >> >> > >> > > > > currently have several consumer-related memory
>> >> >> > >> > > > > settings--max.partition.fetch.bytes, fetch.max.bytes. I
>> >> don't
>> >> >> > >> think
>> >> >> > >> > it
>> >> >> > >> > > > is
>> >> >> > >> > > > > clear today how to set these. For example we mark
>> >> >> > >> > > > max.partition.fetch.bytes
>> >> >> > >> > > > > as high importance and fetch.max.bytes as medium, but it
>> >> >> seems
>> >> >> > like
>> >> >> > >> > it
>> >> >> > >> > > > > would be the other way around. Can we think this through
>> >> from
>> >> >> > the
>> >> >> > >> > point
>> >> >> > >> > > > of
>> >> >> > >> > > > > view of a lazy user? I.e. I have 64MB of space to use
>> for
>> >> my
>> >> >> > >> > consumer,
>> >> >> > >> > > in
>> >> >> > >> > > > > an ideal world I'd say, "hey consumer here is 64MB go
>> use
>> >> >> that
>> >> >> > as
>> >> >> > >> > > > > efficiently as possible" and not have to tune a bunch of
>> >> >> > individual
>> >> >> > >> > > > things
>> >> >> > >> > > > > with complex relationships. Maybe one or both of the
>> >> existing
>> >> >> > >> > settings
>> >> >> > >> > > > can
>> >> >> > >> > > > > either be eliminated or at the least marked as low
>> priority
>> >> >> and
>> >> >> > we
>> >> >> > >> > can
>> >> >> > >> > > > > infer a reasonable default from the new config your
>> >> >> introducing?
>> >> >> > >> > > > >
>> >> >> > >> > > > > -jay
>> >> >> > >> > > > >
>> >> >> > >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <
>> >> >> > >> ja...@confluent.io>
>> >> >> > >> > > > > wrote:
>> >> >> > >> > > > >
>> >> >> > >> > > > > > Hi Mickael,
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > I think the approach looks good, just a few minor
>> >> >> questions:
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > 1. The KIP doesn't say what the default value of
>> >> >> > `buffer.memory`
>> >> >> > >> > will
>> >> >> > >> > > > be.
>> >> >> > >> > > > > > Looks like we use 50MB as the default for
>> >> >> `fetch.max.bytes`,
>> >> >> > so
>> >> >> > >> > > perhaps
>> >> >> > >> > > > > it
>> >> >> > >> > > > > > makes sense to set the default based on that. Might
>> also
>> >> be
>> >> >> > worth
>> >> >> > >> > > > > > mentioning somewhere the constraint between the two
>> >> >> configs.
>> >> >> > >> > > > > > 2. To clarify, this limit only affects the
>> uncompressed
>> >> >> size
>> >> >> > of
>> >> >> > >> the
>> >> >> > >> > > > > fetched
>> >> >> > >> > > > > > data, right? The consumer may still exceed it in
>> order to
>> >> >> > store
>> >> >> > >> the
>> >> >> > >> > > > > > decompressed record data. We delay decompression until
>> >> the
>> >> >> > >> records
>> >> >> > >> > > are
>> >> >> > >> > > > > > returned to the user, but because of
>> max.poll.records, we
>> >> >> may
>> >> >> > end
>> >> >> > >> > up
>> >> >> > >> > > > > > holding onto the decompressed data from a single
>> >> partition
>> >> >> > for a
>> >> >> > >> > few
>> >> >> > >> > > > > > iterations. I think this is fine, but probably worth
>> >> noting
>> >> >> in
>> >> >> > >> the
>> >> >> > >> > > KIP.
>> >> >> > >> > > > > > 3. Is there any risk using the MemoryPool that, after
>> we
>> >> >> fill
>> >> >> > up
>> >> >> > >> > the
>> >> >> > >> > > > > memory
>> >> >> > >> > > > > > with fetch data, we can starve the coordinator's
>> >> >> connection?
>> >> >> > >> > Suppose,
>> >> >> > >> > > > for
>> >> >> > >> > > > > > example, that we send a bunch of pre-fetches right
>> before
>> >> >> > >> returning
>> >> >> > >> > > to
>> >> >> > >> > > > > the
>> >> >> > >> > > > > > user. These fetches might return before the next call
>> to
>> >> >> > poll(),
>> >> >> > >> in
>> >> >> > >> > > > which
>> >> >> > >> > > > > > case we might not have enough memory to receive
>> >> heartbeats,
>> >> >> > which
>> >> >> > >> > > would
>> >> >> > >> > > > > > block us from sending additional heartbeats until the
>> >> next
>> >> >> > call
>> >> >> > >> to
>> >> >> > >> > > > > poll().
>> >> >> > >> > > > > > Not sure it's a big problem since heartbeats are tiny,
>> >> but
>> >> >> > might
>> >> >> > >> be
>> >> >> > >> > > > worth
>> >> >> > >> > > > > > thinking about.
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > Thanks,
>> >> >> > >> > > > > > Jason
>> >> >> > >> > > > > >
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
>> >> >> > >> > > > mickael.mai...@gmail.com
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > wrote:
>> >> >> > >> > > > > >
>> >> >> > >> > > > > > > It's been a few days since the last comments. KIP-72
>> >> vote
>> >> >> > seems
>> >> >> > >> > to
>> >> >> > >> > > > > > > have passed so if I don't get any new comments I'll
>> >> start
>> >> >> > the
>> >> >> > >> > vote
>> >> >> > >> > > on
>> >> >> > >> > > > > > > Monday.
>> >> >> > >> > > > > > > Thanks
>> >> >> > >> > > > > > >
>> >> >> > >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <
>> >> >> > >> > radai.rosenbl...@gmail.com
>> >> >> > >> > > >
>> >> >> > >> > > > > > wrote:
>> >> >> > >> > > > > > > > +1 - there's is a need for an effective way to
>> >> control
>> >> >> > kafka
>> >> >> > >> > > memory
>> >> >> > >> > > > > > > > consumption - both on the broker and on clients.
>> >> >> > >> > > > > > > > i think we could even reuse the exact same param
>> >> name -
>> >> >> > >> > > > > > > *queued.max.bytes *-
>> >> >> > >> > > > > > > > as it would serve the exact same purpose.
>> >> >> > >> > > > > > > >
>> >> >> > >> > > > > > > > also (and again its the same across the broker and
>> >> >> > clients)
>> >> >> > >> > this
>> >> >> > >> > > > > bound
>> >> >> > >> > > > > > > > should also cover decompression, at some point.
>> >> >> > >> > > > > > > > the problem with that is that to the best of my
>> >> >> knowledge
>> >> >> > the
>> >> >> > >> > > > current
>> >> >> > >> > > > > > > wire
>> >> >> > >> > > > > > > > protocol does not declare the final, uncompressed
>> >> size
>> >> >> of
>> >> >> > >> > > anything
>> >> >> > >> > > > up
>> >> >> > >> > > > > > > front
>> >> >> > >> > > > > > > > - all we know is the size of the compressed
>> buffer.
>> >> >> this
>> >> >> > may
>> >> >> > >> > > > require
>> >> >> > >> > > > > a
>> >> >> > >> > > > > > > > format change in the future to properly support?
>> >> >> > >> > > > > > > >
>> >> >> > >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
>> >> >> > >> > > > > > > mickael.mai...@gmail.com>
>> >> >> > >> > > > > > > > wrote:
>> >> >> > >> > > > > > > >
>> >> >> > >> > > > > > > >> Thanks for all the replies.
>> >> >> > >> > > > > > > >>
>> >> >> > >> > > > > > > >> I've updated the KIP:
>> >> >> > >> > > > > > > >> https://cwiki.apache.org/
>> >> confluence/display/KAFKA/KIP-
>> >> >> > >> > > > > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> > >> > > > > > > >> The main point is to selectively read from
>> sockets
>> >> >> > instead
>> >> >> > >> of
>> >> >> > >> > > > > > > >> throttling FetchRequests sends. I also mentioned
>> it
>> >> >> will
>> >> >> > be
>> >> >> > >> > > > reusing
>> >> >> > >> > > > > > > >> the MemoryPool implementation created in KIP-72
>> >> >> instead
>> >> >> > of
>> >> >> > >> > > adding
>> >> >> > >> > > > > > > >> another memory tracking method.
>> >> >> > >> > > > > > > >>
>> >> >> > >> > > > > > > >> Please have another look. As always, comments are
>> >> >> > welcome !
>> >> >> > >> > > > > > > >>
>> >> >> > >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
>> >> >> > >> > > > radai.rosenbl...@gmail.com>
>> >> >> > >> > > > > > > wrote:
>> >> >> > >> > > > > > > >> > selectively reading from sockets achieves
>> memory
>> >> >> > control
>> >> >> > >> (up
>> >> >> > >> > > to
>> >> >> > >> > > > > and
>> >> >> > >> > > > > > > not
>> >> >> > >> > > > > > > >> > including talk of (de)compression)
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> > this is exactly what i (also, even mostly) did
>> for
>> >> >> > kip-72
>> >> >> > >> -
>> >> >> > >> > > > which
>> >> >> > >> > > > > i
>> >> >> > >> > > > > > > hope
>> >> >> > >> > > > > > > >> in
>> >> >> > >> > > > > > > >> > itself should be a reason to think about both
>> KIPs
>> >> >> at
>> >> >> > the
>> >> >> > >> > same
>> >> >> > >> > > > > time
>> >> >> > >> > > > > > > >> because
>> >> >> > >> > > > > > > >> > the changes will be similar (at least in
>> intent)
>> >> and
>> >> >> > might
>> >> >> > >> > > > result
>> >> >> > >> > > > > in
>> >> >> > >> > > > > > > >> > duplicated effort.
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> > a pool API is a way to "scale" all the way from
>> >> just
>> >> >> > >> > > > maintaining a
>> >> >> > >> > > > > > > >> variable
>> >> >> > >> > > > > > > >> > holding amount of available memory (which is
>> what
>> >> my
>> >> >> > >> current
>> >> >> > >> > > > > kip-72
>> >> >> > >> > > > > > > code
>> >> >> > >> > > > > > > >> > does and what this kip proposes IIUC) all the
>> way
>> >> up
>> >> >> to
>> >> >> > >> > > actually
>> >> >> > >> > > > > > > re-using
>> >> >> > >> > > > > > > >> > buffers without any changes to the code using
>> the
>> >> >> pool
>> >> >> > -
>> >> >> > >> > just
>> >> >> > >> > > > drop
>> >> >> > >> > > > > > in
>> >> >> > >> > > > > > > a
>> >> >> > >> > > > > > > >> > different pool impl.
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> > for "edge nodes" (producer/consumer) the
>> >> performance
>> >> >> > gain
>> >> >> > >> in
>> >> >> > >> > > > > > actually
>> >> >> > >> > > > > > > >> > pooling large buffers may be arguable, but i
>> >> suspect
>> >> >> > for
>> >> >> > >> > > brokers
>> >> >> > >> > > > > > > >> regularly
>> >> >> > >> > > > > > > >> > operating on 1MB-sized requests (which is the
>> norm
>> >> >> at
>> >> >> > >> > > linkedin)
>> >> >> > >> > > > > the
>> >> >> > >> > > > > > > >> > resulting memory fragmentation is an actual
>> >> >> bottleneck
>> >> >> > (i
>> >> >> > >> > have
>> >> >> > >> > > > > > initial
>> >> >> > >> > > > > > > >> > micro-benchmark results to back this up but
>> have
>> >> not
>> >> >> > had
>> >> >> > >> the
>> >> >> > >> > > > time
>> >> >> > >> > > > > to
>> >> >> > >> > > > > > > do a
>> >> >> > >> > > > > > > >> > full profiling run).
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> > so basically I'm saying we may be doing (very)
>> >> >> similar
>> >> >> > >> > things
>> >> >> > >> > > in
>> >> >> > >> > > > > > > mostly
>> >> >> > >> > > > > > > >> the
>> >> >> > >> > > > > > > >> > same areas of code.
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael
>> Maison <
>> >> >> > >> > > > > > > >> mickael.mai...@gmail.com>
>> >> >> > >> > > > > > > >> > wrote:
>> >> >> > >> > > > > > > >> >
>> >> >> > >> > > > > > > >> >> electively reading from the socket should
>> enable
>> >> to
>> >> >> > >> > > > > > > >> >> control the memory usage without impacting
>> >> >> > performance.
>> >> >> > >> > I've
>> >> >> > >> > > > had
>> >> >> > >> > > > > > look
>> >> >> > >> > > > > > > >> >> at that today and I can see how that would
>> work.
>> >> >> > >> > > > > > > >> >> I'll update the KIP accordingly tomorrow.
>> >> >> > >> > > > > > > >> >>
>> >> >> > >> > > > > > > >>
>> >> >> > >> > > > > > >
>> >> >> > >> > > > > >
>> >> >> > >> > > > >
>> >> >> > >> > > >
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Regards,
>> >> >>
>> >> >> Rajini
>> >> >>
>> >> >>
>> >> >>
>> >> >> Unless stated otherwise above:
>> >> >> IBM United Kingdom Limited - Registered in England and Wales with
>> number
>> >> >> 741598.
>> >> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
>> PO6
>> >> 3AU
>> >> >>
>> >>
>>

Reply via email to