Thanks for the updates, Mickael. The proposal looks good to me.

-Jason

On Wed, Jan 18, 2017 at 10:19 AM, Mickael Maison <mickael.mai...@gmail.com>
wrote:

> I've updated the KIP to mention this.
> I've currently set the size limit to 1Kb. It's large enough so
> group/heartbeat messages are smaller than it and also small enough so
> the consumer memory usage stays under control.
>
> If there are no more comments, I'll restart the vote
>
> On Wed, Jan 18, 2017 at 2:28 AM, radai <radai.rosenbl...@gmail.com> wrote:
> > i have (hopefully?) addressed Rajini's concern of muting all connections
> > ahead of time on the KIP-72 PR.
> > as for avoiding the pool for small allocations i think thats a great
> idea.
> > I also think you could implement it as a composite pool :-)
> > (composite redirects all requests under size X to the NONE pool and
> above X
> > to some "real" pool)
> >
> > On Wed, Jan 11, 2017 at 8:05 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> Ok thanks for the clarification.
> >> I agree too, I don't want a new config parameter. From the numbers we
> >> gathered (see Edoardo's comment above), it shouldn't be too hard to
> >> pick a meaningful value
> >>
> >> On Wed, Jan 11, 2017 at 3:58 PM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> >> wrote:
> >> > Mickael,
> >> >
> >> > I had based the comment on KIP-72 description where brokers were
> muting
> >> all
> >> > client channels once memory pool was empty. Having reviewed the PR
> >> today, I
> >> > think it may be fine to delay muting and allocate small buffers
> outside
> >> of
> >> > the pool. I would still not want to have a config parameter to decide
> >> what
> >> > "small" means, a well chosen hard limit would suffice.
> >> >
> >> > On Wed, Jan 11, 2017 at 3:05 PM, Mickael Maison <
> >> mickael.mai...@gmail.com>
> >> > wrote:
> >> >
> >> >> Rajini,
> >> >>
> >> >> Why do you think we don't want to do the same for brokers ?
> >> >> It feels like brokers would be affected the same way and could end up
> >> >> delaying group/hearbeat requests.
> >> >>
> >> >> Also given queued.max.requests it seems unlikely that small requests
> >> >> (<<1Kb) being allocated outside of the memory pool would cause OOM
> >> >> exceptions
> >> >>
> >> >>
> >> >> On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram <
> rsiva...@pivotal.io>
> >> >> wrote:
> >> >> > Edo,
> >> >> >
> >> >> > I wouldn't introduce a new config entry, especially since you don't
> >> need
> >> >> it
> >> >> > after KAFKA-4137. As a temporary measure that would work for
> >> consumers.
> >> >> But
> >> >> > you probably don't want to do the same for brokers - will be worth
> >> >> checking
> >> >> > with Radai since the implementation will be based on KIP-72. To do
> >> this
> >> >> > only for consumers, you will need some conditions in the common
> >> network
> >> >> > code while allocating and releasing buffers. A bit messy, but
> doable.
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar <eco...@uk.ibm.com
> >
> >> >> wrote:
> >> >> >
> >> >> >> Thanks Rajini,
> >> >> >> Before Kafka-4137, we could avoid coordinator starvation without
> >> making
> >> >> a
> >> >> >> special case for a special connection,
> >> >> >> but rather simply, in applying the buffer.memory check only to
> >> 'large'
> >> >> >> responses
> >> >> >> (e.g.  size > 1k, possibly introducing a new config entry) in
> >> >> >>
> >> >> >> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
> >> >> >>
> >> >> >> Essentially this would limit reading fetch responses but allow for
> >> other
> >> >> >> responses to be processed.
> >> >> >>
> >> >> >> This is a sample of sizes for responses I collected :
> >> >> >>
> >> >> >> ***** size=108 APIKEY=3 METADATA
> >> >> >> *****  size=28 APIKEY=10 GROUP_COORDINATOR
> >> >> >> *****  size=193 APIKEY=11 JOIN_GROUP
> >> >> >> *****  size=39 APIKEY=14 SYNC_GROUP
> >> >> >> *****  size=39 APIKEY=9 OFFSET_FETCH
> >> >> >> *****  size=45 APIKEY=2 LIST_OFFSETS
> >> >> >> *****  size=88926 APIKEY=1 FETCH
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >> *****  size=6 APIKEY=12 HEARTBEAT
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >> *****  size=6 APIKEY=12 HEARTBEAT
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >> *****  size=45 APIKEY=1 FETCH
> >> >> >>
> >> >> >> What do you think?
> >> >> >> --------------------------------------------------
> >> >> >> Edoardo Comar
> >> >> >> IBM MessageHub
> >> >> >> eco...@uk.ibm.com
> >> >> >> IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >>
> >> >> >> IBM United Kingdom Limited Registered in England and Wales with
> >> number
> >> >> >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> >> Hants.
> >> >> PO6
> >> >> >> 3AU
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> From:   Rajini Sivaram <rajinisiva...@googlemail.com>
> >> >> >> To:     dev@kafka.apache.org
> >> >> >> Date:   13/12/2016 17:27
> >> >> >> Subject:        Re: [DISCUSS] KIP-81: Max in-flight fetches
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> Coordinator starvation: For an implementation based on KIP-72,
> there
> >> >> will
> >> >> >> be coordinator starvation without KAFKA-4137 since you would stop
> >> >> reading
> >> >> >> from sockets when the memory pool is full (the fact that
> coordinator
> >> >> >> messages are small doesn't help). I imagine you can work around
> this
> >> by
> >> >> >> treating coordinator connections as special connections but that
> >> spills
> >> >> >> over to common network code. Separate NetworkClient for
> coordinator
> >> >> >> proposed in KAFKA-4137 would be much better.
> >> >> >>
> >> >> >> On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison <
> >> >> mickael.mai...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >> > Thanks for all the feedback.
> >> >> >> >
> >> >> >> > I've updated the KIP with all the details.
> >> >> >> > Below are a few of the main points:
> >> >> >> >
> >> >> >> > - Overall memory usage of the consumer:
> >> >> >> > I made it clear the memory pool is only used to store the raw
> bytes
> >> >> >> > from the network and that the decompressed/deserialized messages
> >> are
> >> >> >> > not stored in it but as extra memory on the heap. In addition,
> the
> >> >> >> > consumer also keeps track of other things (in flight requests,
> >> >> >> > subscriptions, etc..) that account for extra memory as well. So
> >> this
> >> >> >> > is not a hard bound memory constraint but should still allow to
> >> >> >> > roughly size how much memory can be used.
> >> >> >> >
> >> >> >> > - Relation with the existing settings:
> >> >> >> > There are already 2 settings that deal with memory usage of the
> >> >> >> > consumer. I suggest we lower the priority of
> >> >> >> > `max.partition.fetch.bytes` (I wonder if we should attempt to
> >> >> >> > deprecate it or increase its default value so it's a contraint
> less
> >> >> >> > likely to be hit) and have the new setting `buffer.memory` as
> High.
> >> >> >> > I'm a bit unsure what's the best default value for
> >> `buffer.memory`, I
> >> >> >> > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd
> >> appreciate
> >> >> >> > feedback. It should always at least be equal to
> `max.fetch.bytes`.
> >> >> >> >
> >> >> >> > - Configuration name `buffer.memory`:
> >> >> >> > I think it's the name that makes the most sense. It's aligned
> with
> >> the
> >> >> >> > producer and as mentioned generic enough to allow future
> changes if
> >> >> >> > needed.
> >> >> >> >
> >> >> >> > - Coordination starvation:
> >> >> >> > Yes this is a potential issue. I'd expect these requests to be
> >> small
> >> >> >> > enough to not be affected too much. If that's the case
> KAFKA-4137
> >> >> >> > suggests a possible fix.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk
> >
> >> >> wrote:
> >> >> >> > > Makes sense Jay.
> >> >> >> > >
> >> >> >> > > Mickael, in addition to how we can compute defaults of the
> other
> >> >> >> settings
> >> >> >> > > from `buffer.memory`, it would be good to specify what is
> allowed
> >> >> and
> >> >> >> how
> >> >> >> > > we handle the different cases (e.g. what do we do if
> >> >> >> > > `max.partition.fetch.bytes`
> >> >> >> > > is greater than `buffer.memory`, is that simply not allowed?).
> >> >> >> > >
> >> >> >> > > To summarise the gap between the ideal scenario (user
> specifies
> >> how
> >> >> >> much
> >> >> >> > > memory the consumer can use) and what is being proposed:
> >> >> >> > >
> >> >> >> > > 1. We will decompress and deserialize the data for one or more
> >> >> >> partitions
> >> >> >> > > in order to return them to the user and we don't account for
> the
> >> >> >> > increased
> >> >> >> > > memory usage resulting from that. This is likely to be
> >> significant
> >> >> on
> >> >> >> a
> >> >> >> > per
> >> >> >> > > record basis, but we try to do it for the minimal number of
> >> records
> >> >> >> > > possible within the constraints of the system. Currently the
> >> >> >> constraints
> >> >> >> > > are: we decompress and deserialize the data for a partition
> at a
> >> >> time
> >> >> >> > > (default `max.partition.fetch.bytes` is 1MB, but this is a
> soft
> >> >> limit
> >> >> >> in
> >> >> >> > > case there are oversized messages) until we have enough
> records
> >> to
> >> >> >> > > satisfy `max.poll.records`
> >> >> >> > > (default 500) or there are no more completed fetches. It seems
> >> like
> >> >> >> this
> >> >> >> > > may be OK for a lot of cases, but some tuning will still be
> >> required
> >> >> >> in
> >> >> >> > > others.
> >> >> >> > >
> >> >> >> > > 2. We don't account for bookkeeping data structures or
> >> intermediate
> >> >> >> > objects
> >> >> >> > > allocated during the general operation of the consumer.
> Probably
> >> >> >> > something
> >> >> >> > > we have to live with as the cost/benefit of fixing this
> doesn't
> >> seem
> >> >> >> > worth
> >> >> >> > > it.
> >> >> >> > >
> >> >> >> > > Ismael
> >> >> >> > >
> >> >> >> > > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io>
> >> >> wrote:
> >> >> >> > >
> >> >> >> > >> Hey Ismael,
> >> >> >> > >>
> >> >> >> > >> Yeah I think we are both saying the same thing---removing
> only
> >> >> works
> >> >> >> if
> >> >> >> > you
> >> >> >> > >> have a truly optimal strategy. Actually even dynamically
> >> computing
> >> >> a
> >> >> >> > >> reasonable default isn't totally obvious (do you set
> >> >> fetch.max.bytes
> >> >> >> to
> >> >> >> > >> equal buffer.memory to try to queue up as much data in the
> >> network
> >> >> >> > buffers?
> >> >> >> > >> Do you try to limit it to your socket.receive.buffer size so
> >> that
> >> >> you
> >> >> >> > can
> >> >> >> > >> read it in a single shot?).
> >> >> >> > >>
> >> >> >> > >> Regarding what is being measured, my interpretation was the
> >> same as
> >> >> >> > yours.
> >> >> >> > >> I was just adding to the previous point that buffer.memory
> >> setting
> >> >> >> would
> >> >> >> > >> not be a very close proxy for memory usage. Someone was
> pointing
> >> >> out
> >> >> >> > that
> >> >> >> > >> compression would make this true, and I was just adding that
> >> even
> >> >> >> > without
> >> >> >> > >> compression the object overhead would lead to a high
> expansion
> >> >> >> factor.
> >> >> >> > >>
> >> >> >> > >> -Jay
> >> >> >> > >>
> >> >> >> > >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <
> >> ism...@juma.me.uk>
> >> >> >> > wrote:
> >> >> >> > >>
> >> >> >> > >> > Hi Jay,
> >> >> >> > >> >
> >> >> >> > >> > About `max.partition.fetch.bytes`, yes it was an oversight
> >> not to
> >> >> >> > lower
> >> >> >> > >> its
> >> >> >> > >> > priority as part of KIP-74 given the existence of
> >> >> `fetch.max.bytes`
> >> >> >> > and
> >> >> >> > >> the
> >> >> >> > >> > fact that we can now make progress in the presence of
> >> oversized
> >> >> >> > messages
> >> >> >> > >> > independently of either of those settings.
> >> >> >> > >> >
> >> >> >> > >> > I agree that we should try to set those values
> automatically
> >> >> based
> >> >> >> on
> >> >> >> > >> > `buffer.memory`, but I am not sure if we can have a truly
> >> optimal
> >> >> >> > >> strategy.
> >> >> >> > >> > So, I'd go with reducing the priority to "low" instead of
> >> >> removing
> >> >> >> > >> > `fetch.max.bytes` and `max.partition.fetch.bytes`
> altogether
> >> for
> >> >> >> now.
> >> >> >> > If
> >> >> >> > >> > experience in the field tells us that the auto strategy is
> >> good
> >> >> >> > enough,
> >> >> >> > >> we
> >> >> >> > >> > can consider removing them (yes, I know, it's unlikely to
> >> happen
> >> >> as
> >> >> >> > there
> >> >> >> > >> > won't be that much motivation then).
> >> >> >> > >> >
> >> >> >> > >> > Regarding the "conversion from packed bytes to java
> objects"
> >> >> >> comment,
> >> >> >> > >> that
> >> >> >> > >> > raises the question: what are we actually measuring here?
> From
> >> >> the
> >> >> >> > KIP,
> >> >> >> > >> > it's not too clear. My interpretation was that we were not
> >> >> >> measuring
> >> >> >> > the
> >> >> >> > >> > memory usage of the Java objects. In that case,
> >> `buffer.memory`
> >> >> >> seems
> >> >> >> > >> like
> >> >> >> > >> > a reasonable name although perhaps the user's expectation
> is
> >> that
> >> >> >> we
> >> >> >> > >> would
> >> >> >> > >> > measure the memory usage of the Java objects?
> >> >> >> > >> >
> >> >> >> > >> > Ismael
> >> >> >> > >> >
> >> >> >> > >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <
> j...@confluent.io>
> >> >> >> wrote:
> >> >> >> > >> >
> >> >> >> > >> > > I think the question is whether we have a truly optimal
> >> >> strategy
> >> >> >> for
> >> >> >> > >> > > deriving the partition- and fetch-level configs from the
> >> global
> >> >> >> > >> setting.
> >> >> >> > >> > If
> >> >> >> > >> > > we do then we should just get rid of them. If not, then
> if
> >> we
> >> >> can
> >> >> >> at
> >> >> >> > >> > least
> >> >> >> > >> > > derive usually good and never terrible settings from the
> >> global
> >> >> >> > limit
> >> >> >> > >> at
> >> >> >> > >> > > initialization time maybe we can set them automatically
> >> unless
> >> >> >> the
> >> >> >> > user
> >> >> >> > >> > > overrides with an explicit conifg. Even the latter would
> >> let us
> >> >> >> > mark it
> >> >> >> > >> > low
> >> >> >> > >> > > priority which at least takes it off the list of things
> you
> >> >> have
> >> >> >> to
> >> >> >> > >> grok
> >> >> >> > >> > to
> >> >> >> > >> > > use the consumer which I suspect would be much
> appreciated
> >> by
> >> >> our
> >> >> >> > poor
> >> >> >> > >> > > users.
> >> >> >> > >> > >
> >> >> >> > >> > > Regardless it'd be nice to make sure we get an
> explanation
> >> of
> >> >> the
> >> >> >> > >> > > relationships between the remaining memory configs in the
> >> KIP
> >> >> and
> >> >> >> in
> >> >> >> > >> the
> >> >> >> > >> > > docs.
> >> >> >> > >> > >
> >> >> >> > >> > > I agree that buffer.memory isn't bad.
> >> >> >> > >> > >
> >> >> >> > >> > > -Jay
> >> >> >> > >> > >
> >> >> >> > >> > >
> >> >> >> > >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <
> >> >> >> > ja...@confluent.io>
> >> >> >> > >> > > wrote:
> >> >> >> > >> > >
> >> >> >> > >> > > > Yeah, that's a good point. Perhaps in retrospect, it
> would
> >> >> have
> >> >> >> > been
> >> >> >> > >> > > better
> >> >> >> > >> > > > to define `buffer.memory` first and let
> `fetch.max.bytes`
> >> be
> >> >> >> based
> >> >> >> > >> off
> >> >> >> > >> > of
> >> >> >> > >> > > > it. I like `buffer.memory` since it gives the consumer
> >> nice
> >> >> >> > symmetry
> >> >> >> > >> > with
> >> >> >> > >> > > > the producer and its generic naming gives us some
> >> flexibility
> >> >> >> > >> > internally
> >> >> >> > >> > > > with how we use it. We could still do that I guess, if
> >> we're
> >> >> >> > willing
> >> >> >> > >> to
> >> >> >> > >> > > > deprecate `fetch.max.bytes` (one release after adding
> >> it!).
> >> >> >> > >> > > >
> >> >> >> > >> > > > As for `max.partition.fetch.bytes`, it's noted in
> KIP-74
> >> that
> >> >> >> it
> >> >> >> > is
> >> >> >> > >> > still
> >> >> >> > >> > > > useful in Kafka Streams, but I agree it makes sense to
> >> lower
> >> >> >> its
> >> >> >> > >> > priority
> >> >> >> > >> > > > in favor of `fetch.max.bytes`.
> >> >> >> > >> > > >
> >> >> >> > >> > > > -Jason
> >> >> >> > >> > > >
> >> >> >> > >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <
> >> j...@confluent.io
> >> >> >
> >> >> >> > wrote:
> >> >> >> > >> > > >
> >> >> >> > >> > > > > Jason, it's not just decompression but also the
> >> conversion
> >> >> >> from
> >> >> >> > >> > packed
> >> >> >> > >> > > > > bytes to java objects, right? That can be even larger
> >> than
> >> >> >> the
> >> >> >> > >> > > > > decompression blow up. I think this may be okay, the
> >> >> problem
> >> >> >> may
> >> >> >> > >> just
> >> >> >> > >> > > be
> >> >> >> > >> > > > > that the naming is a bit misleading. In the producer
> you
> >> >> are
> >> >> >> > >> > literally
> >> >> >> > >> > > > > allocating a buffer of that size, so the name
> >> buffer.memory
> >> >> >> > makes
> >> >> >> > >> > > sense.
> >> >> >> > >> > > > In
> >> >> >> > >> > > > > this case it is something more like
> >> >> >> max.bytes.read.per.poll.call
> >> >> >> > >> > > > (terrible
> >> >> >> > >> > > > > name, but maybe something like that?).
> >> >> >> > >> > > > >
> >> >> >> > >> > > > > Mickael, I'd second Jason's request for the default
> and
> >> >> >> expand
> >> >> >> > on
> >> >> >> > >> it.
> >> >> >> > >> > > We
> >> >> >> > >> > > > > currently have several consumer-related memory
> >> >> >> > >> > > > > settings--max.partition.fetch.bytes,
> fetch.max.bytes. I
> >> >> don't
> >> >> >> > >> think
> >> >> >> > >> > it
> >> >> >> > >> > > > is
> >> >> >> > >> > > > > clear today how to set these. For example we mark
> >> >> >> > >> > > > max.partition.fetch.bytes
> >> >> >> > >> > > > > as high importance and fetch.max.bytes as medium,
> but it
> >> >> >> seems
> >> >> >> > like
> >> >> >> > >> > it
> >> >> >> > >> > > > > would be the other way around. Can we think this
> through
> >> >> from
> >> >> >> > the
> >> >> >> > >> > point
> >> >> >> > >> > > > of
> >> >> >> > >> > > > > view of a lazy user? I.e. I have 64MB of space to use
> >> for
> >> >> my
> >> >> >> > >> > consumer,
> >> >> >> > >> > > in
> >> >> >> > >> > > > > an ideal world I'd say, "hey consumer here is 64MB go
> >> use
> >> >> >> that
> >> >> >> > as
> >> >> >> > >> > > > > efficiently as possible" and not have to tune a
> bunch of
> >> >> >> > individual
> >> >> >> > >> > > > things
> >> >> >> > >> > > > > with complex relationships. Maybe one or both of the
> >> >> existing
> >> >> >> > >> > settings
> >> >> >> > >> > > > can
> >> >> >> > >> > > > > either be eliminated or at the least marked as low
> >> priority
> >> >> >> and
> >> >> >> > we
> >> >> >> > >> > can
> >> >> >> > >> > > > > infer a reasonable default from the new config your
> >> >> >> introducing?
> >> >> >> > >> > > > >
> >> >> >> > >> > > > > -jay
> >> >> >> > >> > > > >
> >> >> >> > >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <
> >> >> >> > >> ja...@confluent.io>
> >> >> >> > >> > > > > wrote:
> >> >> >> > >> > > > >
> >> >> >> > >> > > > > > Hi Mickael,
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > I think the approach looks good, just a few minor
> >> >> >> questions:
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > 1. The KIP doesn't say what the default value of
> >> >> >> > `buffer.memory`
> >> >> >> > >> > will
> >> >> >> > >> > > > be.
> >> >> >> > >> > > > > > Looks like we use 50MB as the default for
> >> >> >> `fetch.max.bytes`,
> >> >> >> > so
> >> >> >> > >> > > perhaps
> >> >> >> > >> > > > > it
> >> >> >> > >> > > > > > makes sense to set the default based on that. Might
> >> also
> >> >> be
> >> >> >> > worth
> >> >> >> > >> > > > > > mentioning somewhere the constraint between the two
> >> >> >> configs.
> >> >> >> > >> > > > > > 2. To clarify, this limit only affects the
> >> uncompressed
> >> >> >> size
> >> >> >> > of
> >> >> >> > >> the
> >> >> >> > >> > > > > fetched
> >> >> >> > >> > > > > > data, right? The consumer may still exceed it in
> >> order to
> >> >> >> > store
> >> >> >> > >> the
> >> >> >> > >> > > > > > decompressed record data. We delay decompression
> until
> >> >> the
> >> >> >> > >> records
> >> >> >> > >> > > are
> >> >> >> > >> > > > > > returned to the user, but because of
> >> max.poll.records, we
> >> >> >> may
> >> >> >> > end
> >> >> >> > >> > up
> >> >> >> > >> > > > > > holding onto the decompressed data from a single
> >> >> partition
> >> >> >> > for a
> >> >> >> > >> > few
> >> >> >> > >> > > > > > iterations. I think this is fine, but probably
> worth
> >> >> noting
> >> >> >> in
> >> >> >> > >> the
> >> >> >> > >> > > KIP.
> >> >> >> > >> > > > > > 3. Is there any risk using the MemoryPool that,
> after
> >> we
> >> >> >> fill
> >> >> >> > up
> >> >> >> > >> > the
> >> >> >> > >> > > > > memory
> >> >> >> > >> > > > > > with fetch data, we can starve the coordinator's
> >> >> >> connection?
> >> >> >> > >> > Suppose,
> >> >> >> > >> > > > for
> >> >> >> > >> > > > > > example, that we send a bunch of pre-fetches right
> >> before
> >> >> >> > >> returning
> >> >> >> > >> > > to
> >> >> >> > >> > > > > the
> >> >> >> > >> > > > > > user. These fetches might return before the next
> call
> >> to
> >> >> >> > poll(),
> >> >> >> > >> in
> >> >> >> > >> > > > which
> >> >> >> > >> > > > > > case we might not have enough memory to receive
> >> >> heartbeats,
> >> >> >> > which
> >> >> >> > >> > > would
> >> >> >> > >> > > > > > block us from sending additional heartbeats until
> the
> >> >> next
> >> >> >> > call
> >> >> >> > >> to
> >> >> >> > >> > > > > poll().
> >> >> >> > >> > > > > > Not sure it's a big problem since heartbeats are
> tiny,
> >> >> but
> >> >> >> > might
> >> >> >> > >> be
> >> >> >> > >> > > > worth
> >> >> >> > >> > > > > > thinking about.
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > Thanks,
> >> >> >> > >> > > > > > Jason
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
> >> >> >> > >> > > > mickael.mai...@gmail.com
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > wrote:
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > > > > It's been a few days since the last comments.
> KIP-72
> >> >> vote
> >> >> >> > seems
> >> >> >> > >> > to
> >> >> >> > >> > > > > > > have passed so if I don't get any new comments
> I'll
> >> >> start
> >> >> >> > the
> >> >> >> > >> > vote
> >> >> >> > >> > > on
> >> >> >> > >> > > > > > > Monday.
> >> >> >> > >> > > > > > > Thanks
> >> >> >> > >> > > > > > >
> >> >> >> > >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <
> >> >> >> > >> > radai.rosenbl...@gmail.com
> >> >> >> > >> > > >
> >> >> >> > >> > > > > > wrote:
> >> >> >> > >> > > > > > > > +1 - there's is a need for an effective way to
> >> >> control
> >> >> >> > kafka
> >> >> >> > >> > > memory
> >> >> >> > >> > > > > > > > consumption - both on the broker and on
> clients.
> >> >> >> > >> > > > > > > > i think we could even reuse the exact same
> param
> >> >> name -
> >> >> >> > >> > > > > > > *queued.max.bytes *-
> >> >> >> > >> > > > > > > > as it would serve the exact same purpose.
> >> >> >> > >> > > > > > > >
> >> >> >> > >> > > > > > > > also (and again its the same across the broker
> and
> >> >> >> > clients)
> >> >> >> > >> > this
> >> >> >> > >> > > > > bound
> >> >> >> > >> > > > > > > > should also cover decompression, at some point.
> >> >> >> > >> > > > > > > > the problem with that is that to the best of my
> >> >> >> knowledge
> >> >> >> > the
> >> >> >> > >> > > > current
> >> >> >> > >> > > > > > > wire
> >> >> >> > >> > > > > > > > protocol does not declare the final,
> uncompressed
> >> >> size
> >> >> >> of
> >> >> >> > >> > > anything
> >> >> >> > >> > > > up
> >> >> >> > >> > > > > > > front
> >> >> >> > >> > > > > > > > - all we know is the size of the compressed
> >> buffer.
> >> >> >> this
> >> >> >> > may
> >> >> >> > >> > > > require
> >> >> >> > >> > > > > a
> >> >> >> > >> > > > > > > > format change in the future to properly
> support?
> >> >> >> > >> > > > > > > >
> >> >> >> > >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael
> Maison <
> >> >> >> > >> > > > > > > mickael.mai...@gmail.com>
> >> >> >> > >> > > > > > > > wrote:
> >> >> >> > >> > > > > > > >
> >> >> >> > >> > > > > > > >> Thanks for all the replies.
> >> >> >> > >> > > > > > > >>
> >> >> >> > >> > > > > > > >> I've updated the KIP:
> >> >> >> > >> > > > > > > >> https://cwiki.apache.org/
> >> >> confluence/display/KAFKA/KIP-
> >> >> >> > >> > > > > > > >> 81%3A+Bound+Fetch+memory+
> usage+in+the+consumer
> >> >> >> > >> > > > > > > >> The main point is to selectively read from
> >> sockets
> >> >> >> > instead
> >> >> >> > >> of
> >> >> >> > >> > > > > > > >> throttling FetchRequests sends. I also
> mentioned
> >> it
> >> >> >> will
> >> >> >> > be
> >> >> >> > >> > > > reusing
> >> >> >> > >> > > > > > > >> the MemoryPool implementation created in
> KIP-72
> >> >> >> instead
> >> >> >> > of
> >> >> >> > >> > > adding
> >> >> >> > >> > > > > > > >> another memory tracking method.
> >> >> >> > >> > > > > > > >>
> >> >> >> > >> > > > > > > >> Please have another look. As always, comments
> are
> >> >> >> > welcome !
> >> >> >> > >> > > > > > > >>
> >> >> >> > >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
> >> >> >> > >> > > > radai.rosenbl...@gmail.com>
> >> >> >> > >> > > > > > > wrote:
> >> >> >> > >> > > > > > > >> > selectively reading from sockets achieves
> >> memory
> >> >> >> > control
> >> >> >> > >> (up
> >> >> >> > >> > > to
> >> >> >> > >> > > > > and
> >> >> >> > >> > > > > > > not
> >> >> >> > >> > > > > > > >> > including talk of (de)compression)
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> > this is exactly what i (also, even mostly)
> did
> >> for
> >> >> >> > kip-72
> >> >> >> > >> -
> >> >> >> > >> > > > which
> >> >> >> > >> > > > > i
> >> >> >> > >> > > > > > > hope
> >> >> >> > >> > > > > > > >> in
> >> >> >> > >> > > > > > > >> > itself should be a reason to think about
> both
> >> KIPs
> >> >> >> at
> >> >> >> > the
> >> >> >> > >> > same
> >> >> >> > >> > > > > time
> >> >> >> > >> > > > > > > >> because
> >> >> >> > >> > > > > > > >> > the changes will be similar (at least in
> >> intent)
> >> >> and
> >> >> >> > might
> >> >> >> > >> > > > result
> >> >> >> > >> > > > > in
> >> >> >> > >> > > > > > > >> > duplicated effort.
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> > a pool API is a way to "scale" all the way
> from
> >> >> just
> >> >> >> > >> > > > maintaining a
> >> >> >> > >> > > > > > > >> variable
> >> >> >> > >> > > > > > > >> > holding amount of available memory (which is
> >> what
> >> >> my
> >> >> >> > >> current
> >> >> >> > >> > > > > kip-72
> >> >> >> > >> > > > > > > code
> >> >> >> > >> > > > > > > >> > does and what this kip proposes IIUC) all
> the
> >> way
> >> >> up
> >> >> >> to
> >> >> >> > >> > > actually
> >> >> >> > >> > > > > > > re-using
> >> >> >> > >> > > > > > > >> > buffers without any changes to the code
> using
> >> the
> >> >> >> pool
> >> >> >> > -
> >> >> >> > >> > just
> >> >> >> > >> > > > drop
> >> >> >> > >> > > > > > in
> >> >> >> > >> > > > > > > a
> >> >> >> > >> > > > > > > >> > different pool impl.
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> > for "edge nodes" (producer/consumer) the
> >> >> performance
> >> >> >> > gain
> >> >> >> > >> in
> >> >> >> > >> > > > > > actually
> >> >> >> > >> > > > > > > >> > pooling large buffers may be arguable, but i
> >> >> suspect
> >> >> >> > for
> >> >> >> > >> > > brokers
> >> >> >> > >> > > > > > > >> regularly
> >> >> >> > >> > > > > > > >> > operating on 1MB-sized requests (which is
> the
> >> norm
> >> >> >> at
> >> >> >> > >> > > linkedin)
> >> >> >> > >> > > > > the
> >> >> >> > >> > > > > > > >> > resulting memory fragmentation is an actual
> >> >> >> bottleneck
> >> >> >> > (i
> >> >> >> > >> > have
> >> >> >> > >> > > > > > initial
> >> >> >> > >> > > > > > > >> > micro-benchmark results to back this up but
> >> have
> >> >> not
> >> >> >> > had
> >> >> >> > >> the
> >> >> >> > >> > > > time
> >> >> >> > >> > > > > to
> >> >> >> > >> > > > > > > do a
> >> >> >> > >> > > > > > > >> > full profiling run).
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> > so basically I'm saying we may be doing
> (very)
> >> >> >> similar
> >> >> >> > >> > things
> >> >> >> > >> > > in
> >> >> >> > >> > > > > > > mostly
> >> >> >> > >> > > > > > > >> the
> >> >> >> > >> > > > > > > >> > same areas of code.
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael
> >> Maison <
> >> >> >> > >> > > > > > > >> mickael.mai...@gmail.com>
> >> >> >> > >> > > > > > > >> > wrote:
> >> >> >> > >> > > > > > > >> >
> >> >> >> > >> > > > > > > >> >> electively reading from the socket should
> >> enable
> >> >> to
> >> >> >> > >> > > > > > > >> >> control the memory usage without impacting
> >> >> >> > performance.
> >> >> >> > >> > I've
> >> >> >> > >> > > > had
> >> >> >> > >> > > > > > look
> >> >> >> > >> > > > > > > >> >> at that today and I can see how that would
> >> work.
> >> >> >> > >> > > > > > > >> >> I'll update the KIP accordingly tomorrow.
> >> >> >> > >> > > > > > > >> >>
> >> >> >> > >> > > > > > > >>
> >> >> >> > >> > > > > > >
> >> >> >> > >> > > > > >
> >> >> >> > >> > > > >
> >> >> >> > >> > > >
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >> Regards,
> >> >> >>
> >> >> >> Rajini
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> Unless stated otherwise above:
> >> >> >> IBM United Kingdom Limited - Registered in England and Wales with
> >> number
> >> >> >> 741598.
> >> >> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> >> PO6
> >> >> 3AU
> >> >> >>
> >> >>
> >>
>

Reply via email to