Yes it would be a protocol bump.

@Jason - on reducing the size of the assignment field, I would be
interested to see what savings we can get - but my hunch is that we would
end up picking one of either: a compact assignment field format or turn on
compression. We actually did a similar investigation in the context of
public access logs - we had to turn off PAL in some of our clusters at
LinkedIn because well PAL is extremely verbose, contains a ton of redundant
text (mainly repetitive topic strings) and our gzip processes couldn't even
keep up! So we revisited the option of binary logging - which is actually
simple to do given that we just need to take the Protocol.Schema of each
request and write it out as binary (instead of the verbose string) similar
to ZooKeeper's (jute) transaction logs - this actually gets rid of a lot of
the topic string (and other) redundancies. Anyway, a cursory evaluation of
this PAL scheme showed that the uncompressed binary PAL is significantly (>
60%) smaller than the plaintext PAL, but the post-compressed binary PAL is
just 25% smaller than the post-compressed plaintext PAL. IOW using a symbol
table helps a lot but further compression on that already compact format
would yield only marginal return.

So basically I feel we could get pretty far with a more compact field
format for assignment and if we do that then we would potentially not even
want to do any compression.

wrt the suggestion to auto-scale up the fetch response size I agree that is
the easiest work around for now.

Thanks,

Joel


On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira <g...@confluent.io> wrote:

> Regarding the change to the assignment field. It would be a protocol bump,
> otherwise consumers will not know how to parse the bytes the broker is
> returning, right?
> Or did I misunderstand the suggestion?
>
> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I think for just solving issue 1), Jun's suggestion is sufficient and
> > simple. So I'd prefer that approach.
> >
> > In addition, Jason's optimization on the assignment field would be good
> for
> > 2) and 3) as well, and I like that optimization for its simplicity and no
> > format change as well. And in the future I'm in favor of considering to
> > change the in-memory cache format as Jiangjie suggested.
> >
> > Guozhang
> >
> >
> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Hi Jason,
> > >
> > > There are a few problems we want to solve here:
> > > 1. The group metadata is too big to be appended to the log.
> > > 2. Reduce the memory footprint on the broker
> > > 3. Reduce the bytes transferred over the wire.
> > >
> > > To solve (1), I like your idea of having separate messages per member.
> > The
> > > proposal (Onur's option 8) is to break metadata into small records in
> the
> > > same uncompressed message set so each record is small. I agree it would
> > be
> > > ideal if we are able to store the metadata separately for each member.
> I
> > > was also thinking about storing the metadata into multiple messages,
> too.
> > > What concerns me was that having multiple messages seems breaking the
> > > atomicity. I am not sure how we are going to deal with the potential
> > > issues. For example, What if group metadata is replicated but the
> member
> > > metadata is not? It might be fine depending on the implementation
> though,
> > > but I am not sure.
> > >
> > > For (2) we want to store the metadata onto the disk, which is what we
> > have
> > > to do anyway. The only question is in what format should we store them.
> > >
> > > To address (3) we want to have the metadata to be compressed, which is
> > > contradict to the the above solution of (1).
> > >
> > > I think Jun's suggestion is probably still the simplest. To avoid
> > changing
> > > the behavior for consumers, maybe we can do that only for offset_topic,
> > > i.e, if the max fetch bytes of the fetch request is smaller than the
> > > message size on the offset topic, we always return at least one full
> > > message. This should avoid the unexpected problem on the client side
> > > because supposedly only tools and brokers will fetch from the the
> > internal
> > > topics,
> > >
> > > As a modification to what you suggested, one solution I was thinking
> was
> > to
> > > have multiple messages in a single compressed message. That means for
> > > SyncGroupResponse we still need to read the entire compressed messages
> > and
> > > extract the inner messages, which seems not quite different from
> having a
> > > single message containing everything. But let me just put it here and
> see
> > > if that makes sense.
> > >
> > > We can have a map of GroupMetadataKey -> GroupMetadataValueOffset.
> > >
> > > The GroupMetadataValue is stored in a compressed message. The inner
> > > messages are the following:
> > >
> > > Inner Message 0: Version GroupId Generation
> > >
> > > Inner Message 1: MemberId MemberMetadata_1 (we can compress the bytes
> > here)
> > >
> > > Inner Message 2: MemberId MemberMetadata_2
> > > ....
> > > Inner Message N: MemberId MemberMetadata_N
> > >
> > > The MemberMetadata format is the following:
> > >   MemberMetadata => Version Generation ClientId Host Subscription
> > > Assignment
> > >
> > > So DescribeGroupResponse will just return the entire compressed
> > > GroupMetadataMessage. SyncGroupResponse will return the corresponding
> > inner
> > > message.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Tue, May 24, 2016 at 9:14 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Becket,
> > > >
> > > > I like your idea to store only the offset for the group metadata in
> > > memory.
> > > > I think it would be safe to keep it in memory for a short time after
> > the
> > > > rebalance completes, but after that, it's only real purpose is to
> > answer
> > > > DescribeGroup requests, so your proposal makes a lot of sense to me.
> > > >
> > > > As for the specific problem with the size of the group metadata
> message
> > > for
> > > > the MM case, if we cannot succeed in reducing the size of the
> > > > subscription/assignment (which I think is still probably the best
> > > > alternative if it can work), then I think there are some options for
> > > > changing the message format (option #8 in Onur's initial e-mail).
> > > > Currently, the key used for storing the group metadata is this:
> > > >
> > > > GroupMetadataKey => Version GroupId
> > > >
> > > > And the value is something like this (some details elided):
> > > >
> > > > GroupMetadataValue => Version GroupId Generation [MemberMetadata]
> > > >   MemberMetadata => ClientId Host Subscription Assignment
> > > >
> > > > I don't think we can change the key without a lot of pain, but it
> seems
> > > > like we can change the value format. Maybe we can take the
> > > > subscription/assignment payloads out of the value and introduce a new
> > > > "MemberMetadata" message for each member in the group. For example:
> > > >
> > > > MemberMetadataKey => Version GroupId MemberId
> > > >
> > > > MemberMetadataValue => Version Generation ClientId Host Subscription
> > > > Assignment
> > > >
> > > > When a new generation is created, we would first write the group
> > metadata
> > > > message which includes the generation and all of the memberIds, and
> > then
> > > > we'd write the member metadata messages. To answer the DescribeGroup
> > > > request, we'd read the group metadata at the cached offset and,
> > depending
> > > > on the version, all of the following member metadata. This would be
> > more
> > > > complex to maintain, but it seems doable if it comes to it.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > It might worth thinking a little further. We have discussed this
> > before
> > > > > that we want to avoid holding all the group metadata in memory.
> > > > >
> > > > > I am thinking about the following end state:
> > > > >
> > > > > 1. Enable compression on the offset topic.
> > > > > 2. Instead of holding the entire group metadata in memory on the
> > > brokers,
> > > > > each broker only keeps a [group -> Offset] map, the offset points
> to
> > > the
> > > > > message in the offset topic which holds the latest metadata of the
> > > group.
> > > > > 3. DescribeGroupResponse will read from the offset topic directly
> > like
> > > a
> > > > > normal consumption, except that only exactly one message will be
> > > > returned.
> > > > > 4. SyncGroupResponse will read the message, extract the assignment
> > part
> > > > and
> > > > > send back the partition assignment. We can compress the partition
> > > > > assignment before sends it out if we want.
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > > >
> > > > > > > Jason, doesn't gzip (or other compression) basically do this?
> If
> > > the
> > > > > > topic
> > > > > > > is a string and the topic is repeated throughout, won't
> > compression
> > > > > > > basically replace all repeated instances of it with an index
> > > > reference
> > > > > to
> > > > > > > the full string?
> > > > > >
> > > > > >
> > > > > > Hey James, yeah, that's probably true, but keep in mind that the
> > > > > > compression happens on the broker side. It would be nice to have
> a
> > > more
> > > > > > compact representation so that get some benefit over the wire as
> > > well.
> > > > > This
> > > > > > seems to be less of a concern here, so the bigger gains are
> > probably
> > > > from
> > > > > > reducing the number of partitions that need to be listed
> > > individually.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <
> > > > > > onurkaraman.apa...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > When figuring out these optimizations, it's worth keeping in
> mind
> > > the
> > > > > > > improvements when the message is uncompressed vs when it's
> > > > compressed.
> > > > > > >
> > > > > > > When uncompressed:
> > > > > > > Fixing the Assignment serialization to instead be a topic index
> > > into
> > > > > the
> > > > > > > corresponding member's subscription list would usually be a
> good
> > > > thing.
> > > > > > >
> > > > > > > I think the proposal is only worse when the topic names are
> > small.
> > > > The
> > > > > > > Type.STRING we use in our protocol for the assignment's
> > > > TOPIC_KEY_NAME
> > > > > is
> > > > > > > limited in length to Short.MAX_VALUE, so our strings are first
> > > > > prepended
> > > > > > > with 2 bytes to indicate the string size.
> > > > > > >
> > > > > > > The new proposal does worse when:
> > > > > > > 2 + utf_encoded_string_payload_size < index_type_size
> > > > > > > in other words when:
> > > > > > > utf_encoded_string_payload_size < index_type_size - 2
> > > > > > >
> > > > > > > If the index type ends up being Type.INT32, then the proposal
> is
> > > > worse
> > > > > > when
> > > > > > > the topic is length 1.
> > > > > > > If the index type ends up being Type.INT64, then the proposal
> is
> > > > worse
> > > > > > when
> > > > > > > the topic is less than length 6.
> > > > > > >
> > > > > > > When compressed:
> > > > > > > As James Cheng brought up, I'm not sure how things change when
> > > > > > compression
> > > > > > > comes into the picture. This would be worth investigating.
> > > > > > >
> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James Cheng <
> > wushuja...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > >
> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > 2. Maybe there's a better way to lay out the assignment
> > without
> > > > > > needing
> > > > > > > > to
> > > > > > > > > explicitly repeat the topic? For example, the leader could
> > sort
> > > > the
> > > > > > > > topics
> > > > > > > > > for each member and just use an integer to represent the
> > index
> > > of
> > > > > > each
> > > > > > > > > topic within the sorted list (note this depends on the
> > > > subscription
> > > > > > > > > including the full topic list).
> > > > > > > > >
> > > > > > > > > Assignment -> [TopicIndex [Partition]]
> > > > > > > > >
> > > > > > > >
> > > > > > > > Jason, doesn't gzip (or other compression) basically do this?
> > If
> > > > the
> > > > > > > topic
> > > > > > > > is a string and the topic is repeated throughout, won't
> > > compression
> > > > > > > > basically replace all repeated instances of it with an index
> > > > > reference
> > > > > > to
> > > > > > > > the full string?
> > > > > > > >
> > > > > > > > -James
> > > > > > > >
> > > > > > > > > You could even combine these two options so that you have
> > only
> > > 3
> > > > > > > integers
> > > > > > > > > for each topic assignment:
> > > > > > > > >
> > > > > > > > > Assignment -> [TopicIndex MinPartition MaxPartition]
> > > > > > > > >
> > > > > > > > > There may even be better options with a little more
> thought.
> > > All
> > > > of
> > > > > > > this
> > > > > > > > is
> > > > > > > > > just part of the client-side protocol, so it wouldn't
> require
> > > any
> > > > > > > version
> > > > > > > > > bumps on the broker. What do you think?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <
> > > > wangg...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> The original concern is that regex may not be efficiently
> > > > > supported
> > > > > > > > >> across-languages, but if there is a neat workaround I
> would
> > > love
> > > > > to
> > > > > > > > learn.
> > > > > > > > >>
> > > > > > > > >> Guozhang
> > > > > > > > >>
> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <
> > > ism...@juma.me.uk
> > > > >
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> +1 to Jun's suggestion.
> > > > > > > > >>>
> > > > > > > > >>> Having said that, as a general point, I think we should
> > > > consider
> > > > > > > > >> supporting
> > > > > > > > >>> topic patterns in the wire protocol. It requires some
> > > thinking
> > > > > for
> > > > > > > > >>> cross-language support, but it seems surmountable and it
> > > could
> > > > > make
> > > > > > > > >> certain
> > > > > > > > >>> operations a lot more efficient (the fact that a basic
> > regex
> > > > > > > > subscription
> > > > > > > > >>> causes the consumer to request metadata for all topics is
> > not
> > > > > > great).
> > > > > > > > >>>
> > > > > > > > >>> Ismael
> > > > > > > > >>>
> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>>
> > > > > > > > >>>> I like Jun's suggestion in changing the handling logics
> of
> > > > > single
> > > > > > > > large
> > > > > > > > >>>> message on the consumer side.
> > > > > > > > >>>>
> > > > > > > > >>>> As for the case of "a single group subscribing to 3000
> > > > topics",
> > > > > > with
> > > > > > > > >> 100
> > > > > > > > >>>> consumers the 2.5Mb Gzip size is reasonable to me (when
> > > > storing
> > > > > in
> > > > > > > ZK,
> > > > > > > > >> we
> > > > > > > > >>>> also have the znode limit which is set to 1Mb by
> default,
> > > > though
> > > > > > > > >>> admittedly
> > > > > > > > >>>> it is only for one consumer). And if we do the change as
> > Jun
> > > > > > > > suggested,
> > > > > > > > >>>> 2.5Mb on follower's memory pressure is OK I think.
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> Guozhang
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > > > > > > >>>> onurkaraman.apa...@gmail.com
> > > > > > > > >>>>> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>>> Results without compression:
> > > > > > > > >>>>> 1 consumer 292383 bytes
> > > > > > > > >>>>> 5 consumers 1079579 bytes * the tipping point
> > > > > > > > >>>>> 10 consumers 1855018 bytes
> > > > > > > > >>>>> 20 consumers 2780220 bytes
> > > > > > > > >>>>> 30 consumers 3705422 bytes
> > > > > > > > >>>>> 40 consumers 4630624 bytes
> > > > > > > > >>>>> 50 consumers 5555826 bytes
> > > > > > > > >>>>> 60 consumers 6480788 bytes
> > > > > > > > >>>>> 70 consumers 7405750 bytes
> > > > > > > > >>>>> 80 consumers 8330712 bytes
> > > > > > > > >>>>> 90 consumers 9255674 bytes
> > > > > > > > >>>>> 100 consumers 10180636 bytes
> > > > > > > > >>>>>
> > > > > > > > >>>>> So it looks like gzip compression shrinks the message
> > size
> > > by
> > > > > 4x.
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <
> > j...@confluent.io
> > > >
> > > > > > wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Onur,
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks for the investigation.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Another option is to just fix how we deal with the
> case
> > > > when a
> > > > > > > > >>> message
> > > > > > > > >>>> is
> > > > > > > > >>>>>> larger than the fetch size. Today, if the fetch size
> is
> > > > > smaller
> > > > > > > > >> than
> > > > > > > > >>>> the
> > > > > > > > >>>>>> fetch size, the consumer will get stuck. Instead, we
> can
> > > > > simply
> > > > > > > > >>> return
> > > > > > > > >>>>> the
> > > > > > > > >>>>>> full message if it's larger than the fetch size w/o
> > > > requiring
> > > > > > the
> > > > > > > > >>>>> consumer
> > > > > > > > >>>>>> to manually adjust the fetch size. On the broker side,
> > to
> > > > > serve
> > > > > > a
> > > > > > > > >>> fetch
> > > > > > > > >>>>>> request, we already do an index lookup and then scan
> the
> > > > log a
> > > > > > bit
> > > > > > > > >> to
> > > > > > > > >>>>> find
> > > > > > > > >>>>>> the message with the requested offset. We can just
> check
> > > the
> > > > > > size
> > > > > > > > >> of
> > > > > > > > >>>> that
> > > > > > > > >>>>>> message and return the full message if its size is
> > larger
> > > > than
> > > > > > the
> > > > > > > > >>>> fetch
> > > > > > > > >>>>>> size. This way, fetch size is really for performance
> > > > > > optimization,
> > > > > > > > >>> i.e.
> > > > > > > > >>>>> in
> > > > > > > > >>>>>> the common case, we will not return more bytes than
> > fetch
> > > > > size,
> > > > > > > but
> > > > > > > > >>> if
> > > > > > > > >>>>>> there is a large message, we will return more bytes
> than
> > > the
> > > > > > > > >>> specified
> > > > > > > > >>>>>> fetch size. In practice, large messages are rare. So,
> it
> > > > > > shouldn't
> > > > > > > > >>>>> increase
> > > > > > > > >>>>>> the memory consumption on the client too much.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Jun
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > > > > > > >>>>>> onurkaraman.apa...@gmail.com>
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> Hey everyone. So I started doing some tests on the
> new
> > > > > > > > >>>>>> consumer/coordinator
> > > > > > > > >>>>>>> to see if it could handle more strenuous use cases
> like
> > > > > > mirroring
> > > > > > > > >>>>>> clusters
> > > > > > > > >>>>>>> with thousands of topics and thought I'd share
> > whatever I
> > > > > have
> > > > > > so
> > > > > > > > >>>> far.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> The scalability limit: the amount of group metadata
> we
> > > can
> > > > > fit
> > > > > > > > >> into
> > > > > > > > >>>> one
> > > > > > > > >>>>>>> message
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Some background:
> > > > > > > > >>>>>>> Client-side assignment is implemented in two phases
> > > > > > > > >>>>>>> 1. a PreparingRebalance phase that identifies members
> > of
> > > > the
> > > > > > > > >> group
> > > > > > > > >>>> and
> > > > > > > > >>>>>>> aggregates member subscriptions.
> > > > > > > > >>>>>>> 2. an AwaitingSync phase that waits for the group
> > leader
> > > to
> > > > > > > > >> decide
> > > > > > > > >>>>> member
> > > > > > > > >>>>>>> assignments based on the member subscriptions across
> > the
> > > > > group.
> > > > > > > > >>>>>>>  - The leader announces this decision with a
> > > > > SyncGroupRequest.
> > > > > > > > >> The
> > > > > > > > >>>>>>> GroupCoordinator handles SyncGroupRequests by
> appending
> > > all
> > > > > > group
> > > > > > > > >>>> state
> > > > > > > > >>>>>>> into a single message under the __consumer_offsets
> > topic.
> > > > > This
> > > > > > > > >>>> message
> > > > > > > > >>>>> is
> > > > > > > > >>>>>>> keyed on the group id and contains each member
> > > subscription
> > > > > as
> > > > > > > > >> well
> > > > > > > > >>>> as
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>> decided assignment for each member.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> The environment:
> > > > > > > > >>>>>>> - one broker
> > > > > > > > >>>>>>> - one __consumer_offsets partition
> > > > > > > > >>>>>>> - offsets.topic.compression.codec=1 // this is gzip
> > > > > > > > >>>>>>> - broker has my pending KAFKA-3718 patch that
> actually
> > > > makes
> > > > > > use
> > > > > > > > >> of
> > > > > > > > >>>>>>> offsets.topic.compression.codec:
> > > > > > > > >>>>>> https://github.com/apache/kafka/pull/1394
> > > > > > > > >>>>>>> - around 3000 topics. This is an actual subset of
> > topics
> > > > from
> > > > > > one
> > > > > > > > >>> of
> > > > > > > > >>>>> our
> > > > > > > > >>>>>>> clusters.
> > > > > > > > >>>>>>> - topics have 8 partitions
> > > > > > > > >>>>>>> - topics are 25 characters long on average
> > > > > > > > >>>>>>> - one group with a varying number of consumers each
> > > > hardcoded
> > > > > > > > >> with
> > > > > > > > >>>> all
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>> topics just to make the tests more consistent.
> > > wildcarding
> > > > > with
> > > > > > > > >> .*
> > > > > > > > >>>>> should
> > > > > > > > >>>>>>> have the same effect once the subscription hits the
> > > > > coordinator
> > > > > > > > >> as
> > > > > > > > >>>> the
> > > > > > > > >>>>>>> subscription has already been fully expanded out to
> the
> > > > list
> > > > > of
> > > > > > > > >>>> topics
> > > > > > > > >>>>> by
> > > > > > > > >>>>>>> the consumers.
> > > > > > > > >>>>>>> - I added some log messages to Log.scala to print out
> > the
> > > > > > message
> > > > > > > > >>>> sizes
> > > > > > > > >>>>>>> after compression
> > > > > > > > >>>>>>> - there are no producers at all and auto commits are
> > > > > disabled.
> > > > > > > > >> The
> > > > > > > > >>>> only
> > > > > > > > >>>>>>> topic with messages getting added is the
> > > __consumer_offsets
> > > > > > topic
> > > > > > > > >>> and
> > > > > > > > >>>>>>> they're only from storing group metadata while
> > processing
> > > > > > > > >>>>>>> SyncGroupRequests.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Results:
> > > > > > > > >>>>>>> The results below show that we exceed the 1000012
> byte
> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly
> > > > (between
> > > > > > > > >> 30-40
> > > > > > > > >>>>>>> consumers):
> > > > > > > > >>>>>>> 1 consumer 54739 bytes
> > > > > > > > >>>>>>> 5 consumers 261524 bytes
> > > > > > > > >>>>>>> 10 consumers 459804 bytes
> > > > > > > > >>>>>>> 20 consumers 702499 bytes
> > > > > > > > >>>>>>> 30 consumers 930525 bytes
> > > > > > > > >>>>>>> 40 consumers 1115657 bytes * the tipping point
> > > > > > > > >>>>>>> 50 consumers 1363112 bytes
> > > > > > > > >>>>>>> 60 consumers 1598621 bytes
> > > > > > > > >>>>>>> 70 consumers 1837359 bytes
> > > > > > > > >>>>>>> 80 consumers 2066934 bytes
> > > > > > > > >>>>>>> 90 consumers 2310970 bytes
> > > > > > > > >>>>>>> 100 consumers 2542735 bytes
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Note that the growth itself is pretty gradual.
> Plotting
> > > the
> > > > > > > > >> points
> > > > > > > > >>>>> makes
> > > > > > > > >>>>>> it
> > > > > > > > >>>>>>> look roughly linear w.r.t the number of consumers:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Also note that these numbers aren't averages or
> medians
> > > or
> > > > > > > > >> anything
> > > > > > > > >>>>> like
> > > > > > > > >>>>>>> that. It's just the byte size from a given run. I did
> > run
> > > > > them
> > > > > > a
> > > > > > > > >>> few
> > > > > > > > >>>>>> times
> > > > > > > > >>>>>>> and saw similar results.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Impact:
> > > > > > > > >>>>>>> Even after adding gzip to the __consumer_offsets
> topic
> > > with
> > > > > my
> > > > > > > > >>>> pending
> > > > > > > > >>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group
> > > fails
> > > > > > with
> > > > > > > > >>>>>>> RecordTooLargeException. This means the combined size
> > of
> > > > each
> > > > > > > > >>>> member's
> > > > > > > > >>>>>>> subscriptions and assignments exceeded the
> > > > > > > > >>>> KafkaConfig.messageMaxBytes
> > > > > > > > >>>>> of
> > > > > > > > >>>>>>> 1000012 bytes. The group ends up dying.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Options:
> > > > > > > > >>>>>>> 1. Config change: reduce the number of consumers in
> the
> > > > > group.
> > > > > > > > >> This
> > > > > > > > >>>>> isn't
> > > > > > > > >>>>>>> always a realistic answer in more strenuous use cases
> > > like
> > > > > > > > >>>> MirrorMaker
> > > > > > > > >>>>>>> clusters or for auditing.
> > > > > > > > >>>>>>> 2. Config change: split the group into smaller groups
> > > which
> > > > > > > > >>> together
> > > > > > > > >>>>> will
> > > > > > > > >>>>>>> get full coverage of the topics. This gives each
> group
> > > > > member a
> > > > > > > > >>>> smaller
> > > > > > > > >>>>>>> subscription.(ex: g1 has topics starting with a-m
> while
> > > g2
> > > > > has
> > > > > > > > >>> topics
> > > > > > > > >>>>>>> starting ith n-z). This would be operationally
> painful
> > to
> > > > > > manage.
> > > > > > > > >>>>>>> 3. Config change: split the topics among members of
> the
> > > > > group.
> > > > > > > > >>> Again
> > > > > > > > >>>>> this
> > > > > > > > >>>>>>> gives each group member a smaller subscription. This
> > > would
> > > > > also
> > > > > > > > >> be
> > > > > > > > >>>>>>> operationally painful to manage.
> > > > > > > > >>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes
> > (a
> > > > > > > > >>> topic-level
> > > > > > > > >>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a
> > > > broker-level
> > > > > > > > >>> config).
> > > > > > > > >>>>>>> Applying messageMaxBytes to just the
> __consumer_offsets
> > > > topic
> > > > > > > > >> seems
> > > > > > > > >>>>>>> relatively harmless, but bumping up the broker-level
> > > > > > > > >>>>> replicaFetchMaxBytes
> > > > > > > > >>>>>>> would probably need more attention.
> > > > > > > > >>>>>>> 5. Config change: try different compression codecs.
> > Based
> > > > on
> > > > > 2
> > > > > > > > >>>> minutes
> > > > > > > > >>>>> of
> > > > > > > > >>>>>>> googling, it seems like lz4 and snappy are faster
> than
> > > gzip
> > > > > but
> > > > > > > > >>> have
> > > > > > > > >>>>>> worse
> > > > > > > > >>>>>>> compression, so this probably won't help.
> > > > > > > > >>>>>>> 6. Implementation change: support sending the regex
> > over
> > > > the
> > > > > > wire
> > > > > > > > >>>>> instead
> > > > > > > > >>>>>>> of the fully expanded topic subscriptions. I think
> > people
> > > > > said
> > > > > > in
> > > > > > > > >>> the
> > > > > > > > >>>>>> past
> > > > > > > > >>>>>>> that different languages have subtle differences in
> > > regex,
> > > > so
> > > > > > > > >> this
> > > > > > > > >>>>>> doesn't
> > > > > > > > >>>>>>> play nicely with cross-language groups.
> > > > > > > > >>>>>>> 7. Implementation change: maybe we can reverse the
> > > mapping?
> > > > > > > > >> Instead
> > > > > > > > >>>> of
> > > > > > > > >>>>>>> mapping from member to subscriptions, we can map a
> > > > > subscription
> > > > > > > > >> to
> > > > > > > > >>> a
> > > > > > > > >>>>> list
> > > > > > > > >>>>>>> of members.
> > > > > > > > >>>>>>> 8. Implementation change: maybe we can try to break
> > apart
> > > > the
> > > > > > > > >>>>>> subscription
> > > > > > > > >>>>>>> and assignments from the same SyncGroupRequest into
> > > > multiple
> > > > > > > > >>> records?
> > > > > > > > >>>>>> They
> > > > > > > > >>>>>>> can still go to the same message set and get appended
> > > > > together.
> > > > > > > > >>> This
> > > > > > > > >>>>> way
> > > > > > > > >>>>>>> the limit become the segment size, which shouldn't
> be a
> > > > > > problem.
> > > > > > > > >>> This
> > > > > > > > >>>>> can
> > > > > > > > >>>>>>> be tricky to get right because we're currently keying
> > > these
> > > > > > > > >>> messages
> > > > > > > > >>>> on
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>> group, so I think records from the same rebalance
> might
> > > > > > > > >>> accidentally
> > > > > > > > >>>>>>> compact one another, but my understanding of
> compaction
> > > > isn't
> > > > > > > > >> that
> > > > > > > > >>>>> great.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Todo:
> > > > > > > > >>>>>>> It would be interesting to rerun the tests with no
> > > > > compression
> > > > > > > > >> just
> > > > > > > > >>>> to
> > > > > > > > >>>>>> see
> > > > > > > > >>>>>>> how much gzip is helping but it's getting late. Maybe
> > > > > tomorrow?
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> - Onur
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> --
> > > > > > > > >>>> -- Guozhang
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> --
> > > > > > > > >> -- Guozhang
> > > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to