Thanks Rajini,
Before Kafka-4137, we could avoid coordinator starvation without making a 
special case for a special connection,
but rather simply, in applying the buffer.memory check only to 'large' 
responses 
(e.g.  size > 1k, possibly introducing a new config entry) in 

NetworkReceive.readFromReadableChannel(ReadableByteChannel)

Essentially this would limit reading fetch responses but allow for other 
responses to be processed.

This is a sample of sizes for responses I collected :

***** size=108 APIKEY=3 METADATA
*****  size=28 APIKEY=10 GROUP_COORDINATOR
*****  size=193 APIKEY=11 JOIN_GROUP
*****  size=39 APIKEY=14 SYNC_GROUP
*****  size=39 APIKEY=9 OFFSET_FETCH
*****  size=45 APIKEY=2 LIST_OFFSETS
*****  size=88926 APIKEY=1 FETCH
*****  size=45 APIKEY=1 FETCH
*****  size=6 APIKEY=12 HEARTBEAT
*****  size=45 APIKEY=1 FETCH
*****  size=45 APIKEY=1 FETCH
*****  size=45 APIKEY=1 FETCH
*****  size=6 APIKEY=12 HEARTBEAT
*****  size=45 APIKEY=1 FETCH
*****  size=45 APIKEY=1 FETCH

What do you think?
--------------------------------------------------
Edoardo Comar
IBM MessageHub
eco...@uk.ibm.com
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Rajini Sivaram <rajinisiva...@googlemail.com>
To:     dev@kafka.apache.org
Date:   13/12/2016 17:27
Subject:        Re: [DISCUSS] KIP-81: Max in-flight fetches



Coordinator starvation: For an implementation based on KIP-72, there will
be coordinator starvation without KAFKA-4137 since you would stop reading
from sockets when the memory pool is full (the fact that coordinator
messages are small doesn't help). I imagine you can work around this by
treating coordinator connections as special connections but that spills
over to common network code. Separate NetworkClient for coordinator
proposed in KAFKA-4137 would be much better.

On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison <mickael.mai...@gmail.com>
wrote:

> Thanks for all the feedback.
>
> I've updated the KIP with all the details.
> Below are a few of the main points:
>
> - Overall memory usage of the consumer:
> I made it clear the memory pool is only used to store the raw bytes
> from the network and that the decompressed/deserialized messages are
> not stored in it but as extra memory on the heap. In addition, the
> consumer also keeps track of other things (in flight requests,
> subscriptions, etc..) that account for extra memory as well. So this
> is not a hard bound memory constraint but should still allow to
> roughly size how much memory can be used.
>
> - Relation with the existing settings:
> There are already 2 settings that deal with memory usage of the
> consumer. I suggest we lower the priority of
> `max.partition.fetch.bytes` (I wonder if we should attempt to
> deprecate it or increase its default value so it's a contraint less
> likely to be hit) and have the new setting `buffer.memory` as High.
> I'm a bit unsure what's the best default value for `buffer.memory`, I
> suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd appreciate
> feedback. It should always at least be equal to `max.fetch.bytes`.
>
> - Configuration name `buffer.memory`:
> I think it's the name that makes the most sense. It's aligned with the
> producer and as mentioned generic enough to allow future changes if
> needed.
>
> - Coordination starvation:
> Yes this is a potential issue. I'd expect these requests to be small
> enough to not be affected too much. If that's the case KAFKA-4137
> suggests a possible fix.
>
>
>
> On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> > Makes sense Jay.
> >
> > Mickael, in addition to how we can compute defaults of the other 
settings
> > from `buffer.memory`, it would be good to specify what is allowed and 
how
> > we handle the different cases (e.g. what do we do if
> > `max.partition.fetch.bytes`
> > is greater than `buffer.memory`, is that simply not allowed?).
> >
> > To summarise the gap between the ideal scenario (user specifies how 
much
> > memory the consumer can use) and what is being proposed:
> >
> > 1. We will decompress and deserialize the data for one or more 
partitions
> > in order to return them to the user and we don't account for the
> increased
> > memory usage resulting from that. This is likely to be significant on 
a
> per
> > record basis, but we try to do it for the minimal number of records
> > possible within the constraints of the system. Currently the 
constraints
> > are: we decompress and deserialize the data for a partition at a time
> > (default `max.partition.fetch.bytes` is 1MB, but this is a soft limit 
in
> > case there are oversized messages) until we have enough records to
> > satisfy `max.poll.records`
> > (default 500) or there are no more completed fetches. It seems like 
this
> > may be OK for a lot of cases, but some tuning will still be required 
in
> > others.
> >
> > 2. We don't account for bookkeeping data structures or intermediate
> objects
> > allocated during the general operation of the consumer. Probably
> something
> > we have to live with as the cost/benefit of fixing this doesn't seem
> worth
> > it.
> >
> > Ismael
> >
> > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> >> Hey Ismael,
> >>
> >> Yeah I think we are both saying the same thing---removing only works 
if
> you
> >> have a truly optimal strategy. Actually even dynamically computing a
> >> reasonable default isn't totally obvious (do you set fetch.max.bytes 
to
> >> equal buffer.memory to try to queue up as much data in the network
> buffers?
> >> Do you try to limit it to your socket.receive.buffer size so that you
> can
> >> read it in a single shot?).
> >>
> >> Regarding what is being measured, my interpretation was the same as
> yours.
> >> I was just adding to the previous point that buffer.memory setting 
would
> >> not be a very close proxy for memory usage. Someone was pointing out
> that
> >> compression would make this true, and I was just adding that even
> without
> >> compression the object overhead would lead to a high expansion 
factor.
> >>
> >> -Jay
> >>
> >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> >>
> >> > Hi Jay,
> >> >
> >> > About `max.partition.fetch.bytes`, yes it was an oversight not to
> lower
> >> its
> >> > priority as part of KIP-74 given the existence of `fetch.max.bytes`
> and
> >> the
> >> > fact that we can now make progress in the presence of oversized
> messages
> >> > independently of either of those settings.
> >> >
> >> > I agree that we should try to set those values automatically based 
on
> >> > `buffer.memory`, but I am not sure if we can have a truly optimal
> >> strategy.
> >> > So, I'd go with reducing the priority to "low" instead of removing
> >> > `fetch.max.bytes` and `max.partition.fetch.bytes` altogether for 
now.
> If
> >> > experience in the field tells us that the auto strategy is good
> enough,
> >> we
> >> > can consider removing them (yes, I know, it's unlikely to happen as
> there
> >> > won't be that much motivation then).
> >> >
> >> > Regarding the "conversion from packed bytes to java objects" 
comment,
> >> that
> >> > raises the question: what are we actually measuring here? From the
> KIP,
> >> > it's not too clear. My interpretation was that we were not 
measuring
> the
> >> > memory usage of the Java objects. In that case, `buffer.memory` 
seems
> >> like
> >> > a reasonable name although perhaps the user's expectation is that 
we
> >> would
> >> > measure the memory usage of the Java objects?
> >> >
> >> > Ismael
> >> >
> >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io> 
wrote:
> >> >
> >> > > I think the question is whether we have a truly optimal strategy 
for
> >> > > deriving the partition- and fetch-level configs from the global
> >> setting.
> >> > If
> >> > > we do then we should just get rid of them. If not, then if we can 
at
> >> > least
> >> > > derive usually good and never terrible settings from the global
> limit
> >> at
> >> > > initialization time maybe we can set them automatically unless 
the
> user
> >> > > overrides with an explicit conifg. Even the latter would let us
> mark it
> >> > low
> >> > > priority which at least takes it off the list of things you have 
to
> >> grok
> >> > to
> >> > > use the consumer which I suspect would be much appreciated by our
> poor
> >> > > users.
> >> > >
> >> > > Regardless it'd be nice to make sure we get an explanation of the
> >> > > relationships between the remaining memory configs in the KIP and 
in
> >> the
> >> > > docs.
> >> > >
> >> > > I agree that buffer.memory isn't bad.
> >> > >
> >> > > -Jay
> >> > >
> >> > >
> >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <
> ja...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Yeah, that's a good point. Perhaps in retrospect, it would have
> been
> >> > > better
> >> > > > to define `buffer.memory` first and let `fetch.max.bytes` be 
based
> >> off
> >> > of
> >> > > > it. I like `buffer.memory` since it gives the consumer nice
> symmetry
> >> > with
> >> > > > the producer and its generic naming gives us some flexibility
> >> > internally
> >> > > > with how we use it. We could still do that I guess, if we're
> willing
> >> to
> >> > > > deprecate `fetch.max.bytes` (one release after adding it!).
> >> > > >
> >> > > > As for `max.partition.fetch.bytes`, it's noted in KIP-74 that 
it
> is
> >> > still
> >> > > > useful in Kafka Streams, but I agree it makes sense to lower 
its
> >> > priority
> >> > > > in favor of `fetch.max.bytes`.
> >> > > >
> >> > > > -Jason
> >> > > >
> >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io>
> wrote:
> >> > > >
> >> > > > > Jason, it's not just decompression but also the conversion 
from
> >> > packed
> >> > > > > bytes to java objects, right? That can be even larger than 
the
> >> > > > > decompression blow up. I think this may be okay, the problem 
may
> >> just
> >> > > be
> >> > > > > that the naming is a bit misleading. In the producer you are
> >> > literally
> >> > > > > allocating a buffer of that size, so the name buffer.memory
> makes
> >> > > sense.
> >> > > > In
> >> > > > > this case it is something more like 
max.bytes.read.per.poll.call
> >> > > > (terrible
> >> > > > > name, but maybe something like that?).
> >> > > > >
> >> > > > > Mickael, I'd second Jason's request for the default and 
expand
> on
> >> it.
> >> > > We
> >> > > > > currently have several consumer-related memory
> >> > > > > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't
> >> think
> >> > it
> >> > > > is
> >> > > > > clear today how to set these. For example we mark
> >> > > > max.partition.fetch.bytes
> >> > > > > as high importance and fetch.max.bytes as medium, but it 
seems
> like
> >> > it
> >> > > > > would be the other way around. Can we think this through from
> the
> >> > point
> >> > > > of
> >> > > > > view of a lazy user? I.e. I have 64MB of space to use for my
> >> > consumer,
> >> > > in
> >> > > > > an ideal world I'd say, "hey consumer here is 64MB go use 
that
> as
> >> > > > > efficiently as possible" and not have to tune a bunch of
> individual
> >> > > > things
> >> > > > > with complex relationships. Maybe one or both of the existing
> >> > settings
> >> > > > can
> >> > > > > either be eliminated or at the least marked as low priority 
and
> we
> >> > can
> >> > > > > infer a reasonable default from the new config your 
introducing?
> >> > > > >
> >> > > > > -jay
> >> > > > >
> >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <
> >> ja...@confluent.io>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Mickael,
> >> > > > > >
> >> > > > > > I think the approach looks good, just a few minor 
questions:
> >> > > > > >
> >> > > > > > 1. The KIP doesn't say what the default value of
> `buffer.memory`
> >> > will
> >> > > > be.
> >> > > > > > Looks like we use 50MB as the default for 
`fetch.max.bytes`,
> so
> >> > > perhaps
> >> > > > > it
> >> > > > > > makes sense to set the default based on that. Might also be
> worth
> >> > > > > > mentioning somewhere the constraint between the two 
configs.
> >> > > > > > 2. To clarify, this limit only affects the uncompressed 
size
> of
> >> the
> >> > > > > fetched
> >> > > > > > data, right? The consumer may still exceed it in order to
> store
> >> the
> >> > > > > > decompressed record data. We delay decompression until the
> >> records
> >> > > are
> >> > > > > > returned to the user, but because of max.poll.records, we 
may
> end
> >> > up
> >> > > > > > holding onto the decompressed data from a single partition
> for a
> >> > few
> >> > > > > > iterations. I think this is fine, but probably worth noting 
in
> >> the
> >> > > KIP.
> >> > > > > > 3. Is there any risk using the MemoryPool that, after we 
fill
> up
> >> > the
> >> > > > > memory
> >> > > > > > with fetch data, we can starve the coordinator's 
connection?
> >> > Suppose,
> >> > > > for
> >> > > > > > example, that we send a bunch of pre-fetches right before
> >> returning
> >> > > to
> >> > > > > the
> >> > > > > > user. These fetches might return before the next call to
> poll(),
> >> in
> >> > > > which
> >> > > > > > case we might not have enough memory to receive heartbeats,
> which
> >> > > would
> >> > > > > > block us from sending additional heartbeats until the next
> call
> >> to
> >> > > > > poll().
> >> > > > > > Not sure it's a big problem since heartbeats are tiny, but
> might
> >> be
> >> > > > worth
> >> > > > > > thinking about.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Jason
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
> >> > > > mickael.mai...@gmail.com
> >> > > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > It's been a few days since the last comments. KIP-72 vote
> seems
> >> > to
> >> > > > > > > have passed so if I don't get any new comments I'll start
> the
> >> > vote
> >> > > on
> >> > > > > > > Monday.
> >> > > > > > > Thanks
> >> > > > > > >
> >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <
> >> > radai.rosenbl...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > > +1 - there's is a need for an effective way to control
> kafka
> >> > > memory
> >> > > > > > > > consumption - both on the broker and on clients.
> >> > > > > > > > i think we could even reuse the exact same param name -
> >> > > > > > > *queued.max.bytes *-
> >> > > > > > > > as it would serve the exact same purpose.
> >> > > > > > > >
> >> > > > > > > > also (and again its the same across the broker and
> clients)
> >> > this
> >> > > > > bound
> >> > > > > > > > should also cover decompression, at some point.
> >> > > > > > > > the problem with that is that to the best of my 
knowledge
> the
> >> > > > current
> >> > > > > > > wire
> >> > > > > > > > protocol does not declare the final, uncompressed size 
of
> >> > > anything
> >> > > > up
> >> > > > > > > front
> >> > > > > > > > - all we know is the size of the compressed buffer. 
this
> may
> >> > > > require
> >> > > > > a
> >> > > > > > > > format change in the future to properly support?
> >> > > > > > > >
> >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
> >> > > > > > > mickael.mai...@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > >> Thanks for all the replies.
> >> > > > > > > >>
> >> > > > > > > >> I've updated the KIP:
> >> > > > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> > > > > > > >> The main point is to selectively read from sockets
> instead
> >> of
> >> > > > > > > >> throttling FetchRequests sends. I also mentioned it 
will
> be
> >> > > > reusing
> >> > > > > > > >> the MemoryPool implementation created in KIP-72 
instead
> of
> >> > > adding
> >> > > > > > > >> another memory tracking method.
> >> > > > > > > >>
> >> > > > > > > >> Please have another look. As always, comments are
> welcome !
> >> > > > > > > >>
> >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
> >> > > > radai.rosenbl...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > >> > selectively reading from sockets achieves memory
> control
> >> (up
> >> > > to
> >> > > > > and
> >> > > > > > > not
> >> > > > > > > >> > including talk of (de)compression)
> >> > > > > > > >> >
> >> > > > > > > >> > this is exactly what i (also, even mostly) did for
> kip-72
> >> -
> >> > > > which
> >> > > > > i
> >> > > > > > > hope
> >> > > > > > > >> in
> >> > > > > > > >> > itself should be a reason to think about both KIPs 
at
> the
> >> > same
> >> > > > > time
> >> > > > > > > >> because
> >> > > > > > > >> > the changes will be similar (at least in intent) and
> might
> >> > > > result
> >> > > > > in
> >> > > > > > > >> > duplicated effort.
> >> > > > > > > >> >
> >> > > > > > > >> > a pool API is a way to "scale" all the way from just
> >> > > > maintaining a
> >> > > > > > > >> variable
> >> > > > > > > >> > holding amount of available memory (which is what my
> >> current
> >> > > > > kip-72
> >> > > > > > > code
> >> > > > > > > >> > does and what this kip proposes IIUC) all the way up 
to
> >> > > actually
> >> > > > > > > re-using
> >> > > > > > > >> > buffers without any changes to the code using the 
pool
> -
> >> > just
> >> > > > drop
> >> > > > > > in
> >> > > > > > > a
> >> > > > > > > >> > different pool impl.
> >> > > > > > > >> >
> >> > > > > > > >> > for "edge nodes" (producer/consumer) the performance
> gain
> >> in
> >> > > > > > actually
> >> > > > > > > >> > pooling large buffers may be arguable, but i suspect
> for
> >> > > brokers
> >> > > > > > > >> regularly
> >> > > > > > > >> > operating on 1MB-sized requests (which is the norm 
at
> >> > > linkedin)
> >> > > > > the
> >> > > > > > > >> > resulting memory fragmentation is an actual 
bottleneck
> (i
> >> > have
> >> > > > > > initial
> >> > > > > > > >> > micro-benchmark results to back this up but have not
> had
> >> the
> >> > > > time
> >> > > > > to
> >> > > > > > > do a
> >> > > > > > > >> > full profiling run).
> >> > > > > > > >> >
> >> > > > > > > >> > so basically I'm saying we may be doing (very) 
similar
> >> > things
> >> > > in
> >> > > > > > > mostly
> >> > > > > > > >> the
> >> > > > > > > >> > same areas of code.
> >> > > > > > > >> >
> >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
> >> > > > > > > >> mickael.mai...@gmail.com>
> >> > > > > > > >> > wrote:
> >> > > > > > > >> >
> >> > > > > > > >> >> electively reading from the socket should enable to
> >> > > > > > > >> >> control the memory usage without impacting
> performance.
> >> > I've
> >> > > > had
> >> > > > > > look
> >> > > > > > > >> >> at that today and I can see how that would work.
> >> > > > > > > >> >> I'll update the KIP accordingly tomorrow.
> >> > > > > > > >> >>
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>



-- 
Regards,

Rajini



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply via email to