Hey everyone. So I started doing some tests on the new consumer/coordinator
to see if it could handle more strenuous use cases like mirroring clusters
with thousands of topics and thought I'd share whatever I have so far.

The scalability limit: the amount of group metadata we can fit into one
message

Some background:
Client-side assignment is implemented in two phases
1. a PreparingRebalance phase that identifies members of the group and
aggregates member subscriptions.
2. an AwaitingSync phase that waits for the group leader to decide member
assignments based on the member subscriptions across the group.
  - The leader announces this decision with a SyncGroupRequest. The
GroupCoordinator handles SyncGroupRequests by appending all group state
into a single message under the __consumer_offsets topic. This message is
keyed on the group id and contains each member subscription as well as the
decided assignment for each member.

The environment:
- one broker
- one __consumer_offsets partition
- offsets.topic.compression.codec=1 // this is gzip
- broker has my pending KAFKA-3718 patch that actually makes use of
offsets.topic.compression.codec: https://github.com/apache/kafka/pull/1394
- around 3000 topics. This is an actual subset of topics from one of our
clusters.
- topics have 8 partitions
- topics are 25 characters long on average
- one group with a varying number of consumers each hardcoded with all the
topics just to make the tests more consistent. wildcarding with .* should
have the same effect once the subscription hits the coordinator as the
subscription has already been fully expanded out to the list of topics by
the consumers.
- I added some log messages to Log.scala to print out the message sizes
after compression
- there are no producers at all and auto commits are disabled. The only
topic with messages getting added is the __consumer_offsets topic and
they're only from storing group metadata while processing SyncGroupRequests.

Results:
The results below show that we exceed the 1000012 byte
KafkaConfig.messageMaxBytes limit relatively quickly (between 30-40
consumers):
1 consumer 54739 bytes
5 consumers 261524 bytes
10 consumers 459804 bytes
20 consumers 702499 bytes
30 consumers 930525 bytes
40 consumers 1115657 bytes * the tipping point
50 consumers 1363112 bytes
60 consumers 1598621 bytes
70 consumers 1837359 bytes
80 consumers 2066934 bytes
90 consumers 2310970 bytes
100 consumers 2542735 bytes

Note that the growth itself is pretty gradual. Plotting the points makes it
look roughly linear w.r.t the number of consumers:
https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)

Also note that these numbers aren't averages or medians or anything like
that. It's just the byte size from a given run. I did run them a few times
and saw similar results.

Impact:
Even after adding gzip to the __consumer_offsets topic with my pending
KAFKA-3718 patch, the AwaitingSync phase of the group fails with
RecordTooLargeException. This means the combined size of each member's
subscriptions and assignments exceeded the KafkaConfig.messageMaxBytes of
1000012 bytes. The group ends up dying.

Options:
1. Config change: reduce the number of consumers in the group. This isn't
always a realistic answer in more strenuous use cases like MirrorMaker
clusters or for auditing.
2. Config change: split the group into smaller groups which together will
get full coverage of the topics. This gives each group member a smaller
subscription.(ex: g1 has topics starting with a-m while g2 has topics
starting ith n-z). This would be operationally painful to manage.
3. Config change: split the topics among members of the group. Again this
gives each group member a smaller subscription. This would also be
operationally painful to manage.
4. Config change: bump up KafkaConfig.messageMaxBytes (a topic-level
config) and KafkaConfig.replicaFetchMaxBytes (a broker-level config).
Applying messageMaxBytes to just the __consumer_offsets topic seems
relatively harmless, but bumping up the broker-level replicaFetchMaxBytes
would probably need more attention.
5. Config change: try different compression codecs. Based on 2 minutes of
googling, it seems like lz4 and snappy are faster than gzip but have worse
compression, so this probably won't help.
6. Implementation change: support sending the regex over the wire instead
of the fully expanded topic subscriptions. I think people said in the past
that different languages have subtle differences in regex, so this doesn't
play nicely with cross-language groups.
7. Implementation change: maybe we can reverse the mapping? Instead of
mapping from member to subscriptions, we can map a subscription to a list
of members.
8. Implementation change: maybe we can try to break apart the subscription
and assignments from the same SyncGroupRequest into multiple records? They
can still go to the same message set and get appended together. This way
the limit become the segment size, which shouldn't be a problem. This can
be tricky to get right because we're currently keying these messages on the
group, so I think records from the same rebalance might accidentally
compact one another, but my understanding of compaction isn't that great.

Todo:
It would be interesting to rerun the tests with no compression just to see
how much gzip is helping but it's getting late. Maybe tomorrow?

- Onur

Reply via email to