Hey Becket, In that case, the broker side partition assignment would be ideal because > it avoids > issues like metadata inconsistency / split brain / exploding subscription > set propagation.
As per our previous discussions regarding each of those concerns (referring to this email thread, KIP calls and JIRA comments), we are going to run a set of tests using the LinkedIn deployment numbers that we will wait for you to share. The purpose is to see if those concerns are really valid or not. I'd prefer to see that before making any more changes that will complicate the protocol. On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi folks, > > After further discussion in LinkedIn, we found that while having a more > general group management protocol is very useful, the vast majority of the > clients will not use customized partition assignment strategy. In that > case, the broker side partition assignment would be ideal because it avoids > issues like metadata inconsistency / split brain / exploding subscription > set propagation. > > So we have the following proposal that satisfies the majority of the > clients' needs without changing the currently proposed binary protocol. > i.e., Continue to support broker-side assignment if the assignment strategy > is recognized by the coordinator. > > 1. Keep the binary protocol as currently proposed. > > 2. Change the way we interpret ProtocolMetadata: > 2.1 On consumer side, change partition.assignment.strategy to > partition.assignor.class. Implement the something like the following > PartitionAssignor Interface: > > public interface PartitionAssignor { > List<String> protocolTypes(); > byte[] protocolMetadata(); > // return the Topic->List<Partition> map that are assigned to this > consumer. > List<TopicPartition> assignPartitions(String protocolType, byte[] > responseProtocolMetadata); > } > > public abstract class AbstractPartitionAssignor implements > PartitionAssignor { > protected final KafkaConsumer consumer; > AbstractPartitionAssignor(KafkaConsumer consumer) { > this.consumer = consumer; > } > } > > 2.2 The ProtocolMetadata in JoinGroupRequest will be > partitionAssignor.protocolMetadata(). When partition.assignor.class is > "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest will be a > JSON subscription set. ("range", "roundrobin" will be reserved words, we > can also consider reserving some Prefix such as "broker-" to be more clear) > 2.3 On broker side when ProtocolType is "range" or "roundroubin", > coordinator will parse the ProtocolMetadata in the JoinGroupRequest and > assign the partitions for consumers. In the JoinGroupResponse, the > ProtocolMetadata will be the global assignment of partitions. > 2.4 On client side, after receiving the JoinGroupResponse, > partitionAssignor.assignPartitions() will be invoked to return the actual > assignment. If the assignor is RangeAssignor or RoundRobinAssignor, they > will parse the assignment from the ProtocolMetadata returned by > coordinator. > > This approach has a few merits: > 1. Does not change the proposed binary protocol, which is still general. > 2. The majority of the consumers will not suffer from inconsistent metadata > / split brain / exploding subscription set propagation. This is > specifically to deal with the issue that the current proposal caters to a > 20% use-case while adversely impacting the more common 80% use-cases. > 3. Easy to implement. The only thing needed is implement a partitioner > class. For most users, the default range and roundrobin partitioner are > good enough. > > Thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Follow-up from the kip call: > > > > 1. Onur brought up the question of whether this protocol provides enough > > coordination capabilities to be generally useful in practice (is that > > accurate, Onur?). If it doesn't, then each use case would probably need a > > dependence on zookeeper anyway, and we haven't really gained anything. > The > > group membership provided by this protocol is a useful primitive for > > coordination, but it's limited in the sense that everything shared among > > the group has to be communicated at the time the group is created. If any > > shared data changes, then the only way the group can ensure agreement is > to > > force a rebalance. This is expensive since all members must stall while > the > > rebalancing takes place. As we have also seen, there is a practical limit > > on the amount of metadata that can be sent through this protocol when > > groups get a little larger. This protocol is therefore not suitable to > > cases which require frequent communication or which require a large > amount > > of data to be communicated. For the use cases listed on the wiki, neither > > of these appear to be an issue, but there may be other limitations which > > would limit reuse of the protocol. Perhaps it would be sufficient to > sketch > > how these cases might work? > > > > 2. We talked a little bit about the issue of metadata churn. Becket > brought > > up the interesting point that not only do we depend on topic metadata > > changing relatively infrequently, but we also expect timely agreement > among > > the brokers on what that metadata is. To resolve this, we can have the > > consumers fetch metadata from the coordinator. We still depend on topic > > metadata not changing frequently, but this should resolve any > disagreement > > among the brokers themselves. In fact, since we expect that disagreement > is > > relatively rare, we can have the consumers fetch from the coordinator > only > > when when a disagreement occurs. The nice thing about this proposal is > that > > it doesn't affect the join group semantics, so the coordinator would > remain > > oblivious to the metadata used by the group for agreement. Also, if > > metadata churn becomes an issue, it might be possible to have the > > coordinator provide a snapshot for the group to ensure that a generation > > would be able to reach agreement (this would probably require adding > > groupId/generation to the metadata request). > > > > 3. We talked briefly about support for multiple protocols in the join > group > > request in order to allow changing the assignment strategy without > > downtime. I think it's a little doubtful that this would get much use in > > practice, but I agree it's a nice option to have on the table. An > > alternative, for the sake of argument, is to have each member provide > only > > one version of the protocol, and to let the coordinator choose the > protocol > > with the largest number of supporters. All members which can't support > the > > selected protocol would be kicked out of the group. The drawback in a > > rolling upgrade is that the total capacity of the group would be > > momentarily halved. It would also be a little tricky to handle the case > of > > retrying when a consumer is kicked out of the group. We wouldn't want it > to > > be able to effect a rebalance, for example, if it would just be kicked > out > > again. That would probably complicate the group management logic on the > > coordinator. > > > > > > Thanks, > > Jason > > > > > > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin <j...@linkedin.com.invalid > > > > wrote: > > > > > Jun, > > > > > > Yes, I agree. If the metadata can be synced quickly there should not be > > an > > > issue. It just occurred to me that there is a proposal to allow > consuming > > > from followers in ISR, that could potentially cause more frequent > > metadata > > > change for consumers. Would that be an issue? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > Hi Jun, > > > > > > > > Answers below: > > > > > > > > 1. When there are multiple common protocols in the JoinGroupRequest, > > > which > > > > one would the coordinator pick? > > > > > > > > I was intending to use the list to indicate preference. If all group > > > > members support protocols ["A", "B"] in that order, then we will > choose > > > > "A." If some support ["B", "A"], then we would either choose based on > > > > respective counts or just randomly. The main use case of supporting > the > > > > list is for rolling upgrades when a change is made to the assignment > > > > strategy. In that case, the new assignment strategy would be listed > > first > > > > in the upgraded client. I think it's debatable whether this feature > > would > > > > get much use in practice, so we might consider dropping it. > > > > > > > > 2. If the protocols don't agree, the group construction fails. What > > > exactly > > > > does it mean? Do we send an error in every JoinGroupResponse and > remove > > > all > > > > members in the group in the coordinator? > > > > > > > > Yes, that is right. It would be handled similarly to inconsistent > > > > assignment strategies in the current protocol. The coordinator > returns > > an > > > > error in each join group response, and the client propagates the > error > > to > > > > the user. > > > > > > > > 3. Consumer embedded protocol: The proposal has two different formats > > of > > > > subscription depending on whether wildcards are used or not. This > > seems a > > > > bit complicated. Would it be better to always use the metadata hash? > > The > > > > clients know the subscribed topics already. This way, the client code > > > > behaves the same whether wildcards are used or not. > > > > > > > > Yeah, I think this is possible (Neha also suggested it). I haven't > > > updated > > > > the wiki yet, but the patch I started working on uses only the > metadata > > > > hash. In the case that an explicit topic list is provided, the hash > > just > > > > covers the metadata for those topics. > > > > > > > > > > > > Thanks, > > > > Jason > > > > > > > > > > > > > > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Jason, > > > > > > > > > > Thanks for the writeup. A few comments below. > > > > > > > > > > 1. When there are multiple common protocols in the > JoinGroupRequest, > > > > which > > > > > one would the coordinator pick? > > > > > 2. If the protocols don't agree, the group construction fails. What > > > > exactly > > > > > does it mean? Do we send an error in every JoinGroupResponse and > > remove > > > > all > > > > > members in the group in the coordinator? > > > > > 3. Consumer embedded protocol: The proposal has two different > formats > > > of > > > > > subscription depending on whether wildcards are used or not. This > > > seems a > > > > > bit complicated. Would it be better to always use the metadata > hash? > > > The > > > > > clients know the subscribed topics already. This way, the client > code > > > > > behaves the same whether wildcards are used or not. > > > > > > > > > > Jiangjie, > > > > > > > > > > With respect to rebalance churns due to topics being > created/deleted. > > > > With > > > > > the new consumer, the rebalance can probably settle within 200ms > when > > > > there > > > > > is a topic change. So, as long as we are not changing topic more > > than 5 > > > > > times per sec, there shouldn't be constant churns, right? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson < > ja...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > Hi Kafka Devs, > > > > > > > > > > > > One of the nagging issues in the current design of the new > consumer > > > has > > > > > > been the need to support a variety of assignment strategies. > We've > > > > > > encountered this in particular in the design of copycat and the > > > > > processing > > > > > > framework (KIP-28). From what I understand, Samza also has a > number > > > of > > > > > use > > > > > > cases with custom assignment needs. The new consumer protocol > > > supports > > > > > new > > > > > > assignment strategies by hooking them into the broker. For many > > > > > > environments, this is a major pain and in some cases, a > > non-starter. > > > It > > > > > > also challenges the validation that the coordinator can provide. > > For > > > > > > example, some assignment strategies call for partitions to be > > > assigned > > > > > > multiple times, which means that the coordinator can only check > > that > > > > > > partitions have been assigned at least once. > > > > > > > > > > > > To solve these issues, we'd like to propose moving assignment to > > the > > > > > > client. I've written a wiki which outlines some protocol changes > to > > > > > achieve > > > > > > this: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal > > > > > > . > > > > > > To summarize briefly, instead of the coordinator assigning the > > > > partitions > > > > > > itself, all subscriptions are forwarded to each member of the > group > > > > which > > > > > > then decides independently which partitions it should consume. > The > > > > > protocol > > > > > > provides a mechanism for the coordinator to validate that all > > > consumers > > > > > use > > > > > > the same assignment strategy, but it does not ensure that the > > > resulting > > > > > > assignment is "correct." This provides a powerful capability for > > > users > > > > to > > > > > > control the full data flow on the client side. They control how > > data > > > is > > > > > > written to partitions through the Partitioner interface and they > > > > control > > > > > > how data is consumed through the assignment strategy, all without > > > > > touching > > > > > > the server. > > > > > > > > > > > > Of course nothing comes for free. In particular, this change > > removes > > > > the > > > > > > ability of the coordinator to validate that commits are made by > > > > consumers > > > > > > who were assigned the respective partition. This might not be too > > bad > > > > > since > > > > > > we retain the ability to validate the generation id, but it is a > > > > > potential > > > > > > concern. We have considered alternative protocols which add a > > second > > > > > > round-trip to the protocol in order to give the coordinator the > > > ability > > > > > to > > > > > > confirm the assignment. As mentioned above, the coordinator is > > > somewhat > > > > > > limited in what it can actually validate, but this would return > its > > > > > ability > > > > > > to validate commits. The tradeoff is that it increases the > > protocol's > > > > > > complexity which means more ways for the protocol to fail and > > > > > consequently > > > > > > more edge cases in the code. > > > > > > > > > > > > It also misses an opportunity to generalize the group membership > > > > protocol > > > > > > for additional use cases. In fact, after you've gone to the > trouble > > > of > > > > > > moving assignment to the client, the main thing that is left in > > this > > > > > > protocol is basically a general group management capability. This > > is > > > > > > exactly what is needed for a few cases that are currently under > > > > > discussion > > > > > > (e.g. copycat or single-writer producer). We've taken this > further > > > step > > > > > in > > > > > > the proposal and attempted to envision what that general protocol > > > might > > > > > > look like and how it could be used both by the consumer and for > > some > > > of > > > > > > these other cases. > > > > > > > > > > > > Anyway, since time is running out on the new consumer, we have > > > perhaps > > > > > one > > > > > > last chance to consider a significant change in the protocol like > > > this, > > > > > so > > > > > > have a look at the wiki and share your thoughts. I've no doubt > that > > > > some > > > > > > ideas seem clearer in my mind than they do on paper, so ask > > questions > > > > if > > > > > > there is any confusion. > > > > > > > > > > > > Thanks! > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > -- Thanks, Neha