Re: [DISCUSS] scalability limits in the coordinator

2016-06-10 Thread Jason Gustafson
Hey Becket, My suggestion was pretty far from a completely thought-out proposal, but the advantages of having your MM cluster maintain subscriptions/assignments in its own topic are the following: 1. It solves the immediate problem of the size of the group metadata. 2. It distributes the subscrip

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Becket Qin
Hi Jason, I am trying to understand the gain of saving the assignment and metadata in a topic and return the offsets to the consumers. This obviously saves memory footprint as we agreed before. But does it save network bandwidth? The consumers still need to read the same amount of data from the co

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Jason Gustafson
Hi Onur, I didn't have a specific proposal in mind, I was just thinking analogously with how Connect ensures task configurations are propagated to tasks consistently when it rebalances the cluster. The high level concept is to take the assignment data out of the rebalance protocol itself and repla

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
I think the value of adding a "offsets.replica.fetch.max.bytes" config is that we don't break/change the meaning of "replica.fetch.max.bytes". We can also set "offsets.replica.fetch.max.bytes" to be a value safely larger than what we expect to ever allow the __consumer_offsets topic max message si

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Becket Qin
I think taking bigger one of the fetch size and message size limit is probably good enough. If we have a separate "offset.replica.fetch.max.bytes", I guess the value will always be set to max message size of the __consumer_offsets topic, which does not seem to have much value. On Thu, Jun 9, 2016

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
Maybe another approach can be to add a new "offsets.replica.fetch.max.bytes" config on the brokers. On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman wrote: > I made a PR with a tweak to Jun's/Becket's proposal: > https://github.com/apache/kafka/pull/1484 > > It just tweaks the fetch behavior specifi

Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
I made a PR with a tweak to Jun's/Becket's proposal: https://github.com/apache/kafka/pull/1484 It just tweaks the fetch behavior specifically for replicas fetching from the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes" is less than the __consumer_offset leader's "message.ma

Re: [DISCUSS] scalability limits in the coordinator

2016-05-29 Thread Onur Karaman
Sorry I know next to nothing about Kafka Connect. I didn't understand the Kafka Connect / MM idea you brought up. Can you go into more detail? Otherwise I think our remaining options are: - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for __consumer_offsets topic and change the fetc

Re: [DISCUSS] scalability limits in the coordinator

2016-05-26 Thread Jason Gustafson
Hey Onur, Thanks for the investigation. It seems the conclusion is that the compact format helps, but perhaps not enough to justify adding a new assignment schema? I'm not sure there's much more room for savings unless we change something more fundamental in the assignment approach. We spent some

Re: [DISCUSS] scalability limits in the coordinator

2016-05-25 Thread Onur Karaman
I gave the topic index assignment trick a try against the same environment. The implementation just changed the assignment serialization and deserialization logic. It didn't change SyncGroupResponse, meaning it continues to exclude the subscription from the SyncGroupResponse and assumes the member

Re: [DISCUSS] scalability limits in the coordinator

2016-05-25 Thread Jason Gustafson
Gwen, Joel: That's correct. The protocol does allow us to give an assignor its own assignment schema, but I think this will require a couple internal changes to the consumer to make use of the full generality. One thing I'm a little uncertain about is whether we should use a different protocol ty

Re: [DISCUSS] scalability limits in the coordinator

2016-05-25 Thread Gwen Shapira
ah, right - we can add as many strategies as we want. On Wed, May 25, 2016 at 10:54 AM, Joel Koshy wrote: > > Yes it would be a protocol bump. > > > > Sorry - I'm officially confused. I think it may not be required - since the > more compact format would be associated with a new assignment strat

Re: [DISCUSS] scalability limits in the coordinator

2016-05-25 Thread Joel Koshy
> Yes it would be a protocol bump. > Sorry - I'm officially confused. I think it may not be required - since the more compact format would be associated with a new assignment strategy - right? > smaller than the plaintext PAL, but the post-compressed binary PAL is just > 25% smaller than the pos

Re: [DISCUSS] scalability limits in the coordinator

2016-05-25 Thread Joel Koshy
Yes it would be a protocol bump. @Jason - on reducing the size of the assignment field, I would be interested to see what savings we can get - but my hunch is that we would end up picking one of either: a compact assignment field format or turn on compression. We actually did a similar investigati

Re: [DISCUSS] scalability limits in the coordinator

2016-05-24 Thread Gwen Shapira
Regarding the change to the assignment field. It would be a protocol bump, otherwise consumers will not know how to parse the bytes the broker is returning, right? Or did I misunderstand the suggestion? On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang wrote: > I think for just solving issue 1), Ju

Re: [DISCUSS] scalability limits in the coordinator

2016-05-24 Thread Guozhang Wang
I think for just solving issue 1), Jun's suggestion is sufficient and simple. So I'd prefer that approach. In addition, Jason's optimization on the assignment field would be good for 2) and 3) as well, and I like that optimization for its simplicity and no format change as well. And in the future

Re: [DISCUSS] scalability limits in the coordinator

2016-05-24 Thread Becket Qin
Hi Jason, There are a few problems we want to solve here: 1. The group metadata is too big to be appended to the log. 2. Reduce the memory footprint on the broker 3. Reduce the bytes transferred over the wire. To solve (1), I like your idea of having separate messages per member. The proposal (On

Re: [DISCUSS] scalability limits in the coordinator

2016-05-24 Thread Jason Gustafson
Hey Becket, I like your idea to store only the offset for the group metadata in memory. I think it would be safe to keep it in memory for a short time after the rebalance completes, but after that, it's only real purpose is to answer DescribeGroup requests, so your proposal makes a lot of sense to

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Becket Qin
It might worth thinking a little further. We have discussed this before that we want to avoid holding all the group metadata in memory. I am thinking about the following end state: 1. Enable compression on the offset topic. 2. Instead of holding the entire group metadata in memory on the brokers,

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
> > Jason, doesn't gzip (or other compression) basically do this? If the topic > is a string and the topic is repeated throughout, won't compression > basically replace all repeated instances of it with an index reference to > the full string? Hey James, yeah, that's probably true, but keep in mi

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Liquan Pei
Would be interesting to see size after with compression on. On Mon, May 23, 2016 at 4:23 PM, Onur Karaman wrote: > When figuring out these optimizations, it's worth keeping in mind the > improvements when the message is uncompressed vs when it's compressed. > > When uncompressed: > Fixing the As

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Onur Karaman
When figuring out these optimizations, it's worth keeping in mind the improvements when the message is uncompressed vs when it's compressed. When uncompressed: Fixing the Assignment serialization to instead be a topic index into the corresponding member's subscription list would usually be a good

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread James Cheng
> On May 23, 2016, at 10:59 AM, Jason Gustafson wrote: > > 2. Maybe there's a better way to lay out the assignment without needing to > explicitly repeat the topic? For example, the leader could sort the topics > for each member and just use an integer to represent the index of each > topic with

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
> > Assignments also can be optimized with some tricks like the ones Jason > mentioned, but I think these end up being specific to the assignment > strategy, making it hard to keep a generic ConsumerProtocol. Leaving the protocol generic would be ideal since tools (such as consumer-groups.sh) dep

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Guozhang Wang
Discussed with Jason about several optimization proposals, and summarize them here: --- Today the offset topic message value format is: [member subscription assignment] where subscription and assignment are just bytes to the brokers, and consumers know the schema to interpret them; usual

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Onur Karaman
To get a better sense of the limit and what we should be optimizing for, it helps to look at the message format: private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), new Field("client_id", STRING), new Field("client_host", STRING), new Field("session_timeout", INT32),

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Ismael Juma
Hi Jason, It would definitely be interesting to try a few of these optimisations on a real world example to quantify the impact. Ismael On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson wrote: > Hey Onur, > > Thanks for the investigation. I agree with Ismael that pushing regex or > some kind of

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
Hey Onur, Thanks for the investigation. I agree with Ismael that pushing regex or some kind of patterns into the protocol would help for communicating subscriptions and for avoiding unnecessary overhead when fetching topic metadata, but it doesn't seem like it would address the main issue here sin

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jay Kreps
I think the concern was just that we didn't want to do java regex for non-java clients, but I do think there are perl regex libraries (which is kind of more the standard) for java. So that might be a solution. -Jay On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang wrote: > The original concern is

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Becket Qin
+1 on Jun's idea. Even without the new consumer, currently we may still face this issue of record size too large in offset topic if user commits offsets with a big blob of metadata. Topic pattern would help reduce the group metadata size. However some use cases may not be able to benefit from it.

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Guozhang Wang
The original concern is that regex may not be efficiently supported across-languages, but if there is a neat workaround I would love to learn. Guozhang On Mon, May 23, 2016 at 5:31 AM, Ismael Juma wrote: > +1 to Jun's suggestion. > > Having said that, as a general point, I think we should consi

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Ismael Juma
+1 to Jun's suggestion. Having said that, as a general point, I think we should consider supporting topic patterns in the wire protocol. It requires some thinking for cross-language support, but it seems surmountable and it could make certain operations a lot more efficient (the fact that a basic

Re: [DISCUSS] scalability limits in the coordinator

2016-05-22 Thread Guozhang Wang
I like Jun's suggestion in changing the handling logics of single large message on the consumer side. As for the case of "a single group subscribing to 3000 topics", with 100 consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK, we also have the znode limit which is set to 1Mb by

Re: [DISCUSS] scalability limits in the coordinator

2016-05-21 Thread Onur Karaman
Results without compression: 1 consumer 292383 bytes 5 consumers 1079579 bytes * the tipping point 10 consumers 1855018 bytes 20 consumers 2780220 bytes 30 consumers 3705422 bytes 40 consumers 4630624 bytes 50 consumers 826 bytes 60 consumers 6480788 bytes 70 consumers 7405750 bytes 80 consumer

Re: [DISCUSS] scalability limits in the coordinator

2016-05-21 Thread Jun Rao
Onur, Thanks for the investigation. Another option is to just fix how we deal with the case when a message is larger than the fetch size. Today, if the fetch size is smaller than the fetch size, the consumer will get stuck. Instead, we can simply return the full message if it's larger than the fe

[DISCUSS] scalability limits in the coordinator

2016-05-21 Thread Onur Karaman
Hey everyone. So I started doing some tests on the new consumer/coordinator to see if it could handle more strenuous use cases like mirroring clusters with thousands of topics and thought I'd share whatever I have so far. The scalability limit: the amount of group metadata we can fit into one mess