Hi folks,

After further discussion in LinkedIn, we found that while having a more
general group management protocol is very useful, the vast majority of the
clients will not use customized partition assignment strategy. In that
case, the broker side partition assignment would be ideal because it avoids
issues like metadata inconsistency / split brain / exploding subscription
set propagation.

So we have the following proposal that satisfies the majority of the
clients' needs without changing the currently proposed binary protocol.
i.e., Continue to support broker-side assignment if the assignment strategy
is recognized by the coordinator.

1. Keep the binary protocol as currently proposed.

2. Change the way we interpret ProtocolMetadata:
2.1 On consumer side, change partition.assignment.strategy to
partition.assignor.class. Implement the something like the following
PartitionAssignor Interface:

public interface PartitionAssignor {
  List<String> protocolTypes();
  byte[] protocolMetadata();
  // return the Topic->List<Partition> map that are assigned to this
consumer.
  List<TopicPartition> assignPartitions(String protocolType, byte[]
responseProtocolMetadata);
}

public abstract class AbstractPartitionAssignor implements
PartitionAssignor {
  protected final KafkaConsumer consumer;
  AbstractPartitionAssignor(KafkaConsumer consumer) {
    this.consumer = consumer;
  }
}

2.2 The ProtocolMetadata in JoinGroupRequest will be
partitionAssignor.protocolMetadata(). When partition.assignor.class is
"range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest will be a
JSON subscription set. ("range", "roundrobin" will be reserved words, we
can also consider reserving some Prefix such as "broker-" to be more clear)
2.3 On broker side when ProtocolType is "range" or "roundroubin",
coordinator will parse the ProtocolMetadata in the JoinGroupRequest and
assign the partitions for consumers. In the JoinGroupResponse, the
ProtocolMetadata will be the global assignment of partitions.
2.4 On client side, after receiving the JoinGroupResponse,
partitionAssignor.assignPartitions() will be invoked to return the actual
assignment. If the assignor is RangeAssignor or RoundRobinAssignor, they
will parse the assignment from the ProtocolMetadata returned by coordinator.

This approach has a few merits:
1. Does not change the proposed binary protocol, which is still general.
2. The majority of the consumers will not suffer from inconsistent metadata
/ split brain / exploding subscription set propagation. This is
specifically to deal with the issue that the current proposal caters to a
20% use-case while adversely impacting the more common 80% use-cases.
3. Easy to implement. The only thing needed is implement a partitioner
class. For most users, the default range and roundrobin partitioner are
good enough.

Thoughts?

Thanks,

Jiangjie (Becket) Qin

On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Follow-up from the kip call:
>
> 1. Onur brought up the question of whether this protocol provides enough
> coordination capabilities to be generally useful in practice (is that
> accurate, Onur?). If it doesn't, then each use case would probably need a
> dependence on zookeeper anyway, and we haven't really gained anything. The
> group membership provided by this protocol is a useful primitive for
> coordination, but it's limited in the sense that everything shared among
> the group has to be communicated at the time the group is created. If any
> shared data changes, then the only way the group can ensure agreement is to
> force a rebalance. This is expensive since all members must stall while the
> rebalancing takes place. As we have also seen, there is a practical limit
> on the amount of metadata that can be sent through this protocol when
> groups get a little larger. This protocol is therefore not suitable to
> cases which require frequent communication or which require a large amount
> of data to be communicated. For the use cases listed on the wiki, neither
> of these appear to be an issue, but there may be other limitations which
> would limit reuse of the protocol. Perhaps it would be sufficient to sketch
> how these cases might work?
>
> 2. We talked a little bit about the issue of metadata churn. Becket brought
> up the interesting point that not only do we depend on topic metadata
> changing relatively infrequently, but we also expect timely agreement among
> the brokers on what that metadata is. To resolve this, we can have the
> consumers fetch metadata from the coordinator. We still depend on topic
> metadata not changing frequently, but this should resolve any disagreement
> among the brokers themselves. In fact, since we expect that disagreement is
> relatively rare, we can have the consumers fetch from the coordinator only
> when when a disagreement occurs. The nice thing about this proposal is that
> it doesn't affect the join group semantics, so the coordinator would remain
> oblivious to the metadata used by the group for agreement. Also, if
> metadata churn becomes an issue, it might be possible to have the
> coordinator provide a snapshot for the group to ensure that a generation
> would be able to reach agreement (this would probably require adding
> groupId/generation to the metadata request).
>
> 3. We talked briefly about support for multiple protocols in the join group
> request in order to allow changing the assignment strategy without
> downtime. I think it's a little doubtful that this would get much use in
> practice, but I agree it's a nice option to have on the table. An
> alternative, for the sake of argument, is to have each member provide only
> one version of the protocol, and to let the coordinator choose the protocol
> with the largest number of supporters. All members which can't support the
> selected protocol would be kicked out of the group. The drawback in a
> rolling upgrade is that the total capacity of the group would be
> momentarily halved. It would also be a little tricky to handle the case of
> retrying when a consumer is kicked out of the group. We wouldn't want it to
> be able to effect a rebalance, for example, if it would just be kicked out
> again. That would probably complicate the group management logic on the
> coordinator.
>
>
> Thanks,
> Jason
>
>
> On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Jun,
> >
> > Yes, I agree. If the metadata can be synced quickly there should not be
> an
> > issue. It just occurred to me that there is a proposal to allow consuming
> > from followers in ISR, that could potentially cause more frequent
> metadata
> > change for consumers. Would that be an issue?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Answers below:
> > >
> > > 1. When there are multiple common protocols in the JoinGroupRequest,
> > which
> > > one would the coordinator pick?
> > >
> > > I was intending to use the list to indicate preference. If all group
> > > members support protocols ["A", "B"] in that order, then we will choose
> > > "A." If some support ["B", "A"], then we would either choose based on
> > > respective counts or just randomly. The main use case of supporting the
> > > list is for rolling upgrades when a change is made to the assignment
> > > strategy. In that case, the new assignment strategy would be listed
> first
> > > in the upgraded client. I think it's debatable whether this feature
> would
> > > get much use in practice, so we might consider dropping it.
> > >
> > > 2. If the protocols don't agree, the group construction fails. What
> > exactly
> > > does it mean? Do we send an error in every JoinGroupResponse and remove
> > all
> > > members in the group in the coordinator?
> > >
> > > Yes, that is right. It would be handled similarly to inconsistent
> > > assignment strategies in the current protocol. The coordinator returns
> an
> > > error in each join group response, and the client propagates the error
> to
> > > the user.
> > >
> > > 3. Consumer embedded protocol: The proposal has two different formats
> of
> > > subscription depending on whether wildcards are used or not. This
> seems a
> > > bit complicated. Would it be better to always use the metadata hash?
> The
> > > clients know the subscribed topics already. This way, the client code
> > > behaves the same whether wildcards are used or not.
> > >
> > > Yeah, I think this is possible (Neha also suggested it). I haven't
> > updated
> > > the wiki yet, but the patch I started working on uses only the metadata
> > > hash. In the case that an explicit topic list is provided, the hash
> just
> > > covers the metadata for those topics.
> > >
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Jason,
> > > >
> > > > Thanks for the writeup. A few comments below.
> > > >
> > > > 1. When there are multiple common protocols in the JoinGroupRequest,
> > > which
> > > > one would the coordinator pick?
> > > > 2. If the protocols don't agree, the group construction fails. What
> > > exactly
> > > > does it mean? Do we send an error in every JoinGroupResponse and
> remove
> > > all
> > > > members in the group in the coordinator?
> > > > 3. Consumer embedded protocol: The proposal has two different formats
> > of
> > > > subscription depending on whether wildcards are used or not. This
> > seems a
> > > > bit complicated. Would it be better to always use the metadata hash?
> > The
> > > > clients know the subscribed topics already. This way, the client code
> > > > behaves the same whether wildcards are used or not.
> > > >
> > > > Jiangjie,
> > > >
> > > > With respect to rebalance churns due to topics being created/deleted.
> > > With
> > > > the new consumer, the rebalance can probably settle within 200ms when
> > > there
> > > > is a topic change. So, as long as we are not changing topic more
> than 5
> > > > times per sec, there shouldn't be constant churns, right?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <ja...@confluent.io
> >
> > > > wrote:
> > > >
> > > > > Hi Kafka Devs,
> > > > >
> > > > > One of the nagging issues in the current design of the new consumer
> > has
> > > > > been the need to support a variety of assignment strategies. We've
> > > > > encountered this in particular in the design of copycat and the
> > > > processing
> > > > > framework (KIP-28). From what I understand, Samza also has a number
> > of
> > > > use
> > > > > cases with custom assignment needs. The new consumer protocol
> > supports
> > > > new
> > > > > assignment strategies by hooking them into the broker. For many
> > > > > environments, this is a major pain and in some cases, a
> non-starter.
> > It
> > > > > also challenges the validation that the coordinator can provide.
> For
> > > > > example, some assignment strategies call for partitions to be
> > assigned
> > > > > multiple times, which means that the coordinator can only check
> that
> > > > > partitions have been assigned at least once.
> > > > >
> > > > > To solve these issues, we'd like to propose moving assignment to
> the
> > > > > client. I've written a wiki which outlines some protocol changes to
> > > > achieve
> > > > > this:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > .
> > > > > To summarize briefly, instead of the coordinator assigning the
> > > partitions
> > > > > itself, all subscriptions are forwarded to each member of the group
> > > which
> > > > > then decides independently which partitions it should consume. The
> > > > protocol
> > > > > provides a mechanism for the coordinator to validate that all
> > consumers
> > > > use
> > > > > the same assignment strategy, but it does not ensure that the
> > resulting
> > > > > assignment is "correct." This provides a powerful capability for
> > users
> > > to
> > > > > control the full data flow on the client side. They control how
> data
> > is
> > > > > written to partitions through the Partitioner interface and they
> > > control
> > > > > how data is consumed through the assignment strategy, all without
> > > > touching
> > > > > the server.
> > > > >
> > > > > Of course nothing comes for free. In particular, this change
> removes
> > > the
> > > > > ability of the coordinator to validate that commits are made by
> > > consumers
> > > > > who were assigned the respective partition. This might not be too
> bad
> > > > since
> > > > > we retain the ability to validate the generation id, but it is a
> > > > potential
> > > > > concern. We have considered alternative protocols which add a
> second
> > > > > round-trip to the protocol in order to give the coordinator the
> > ability
> > > > to
> > > > > confirm the assignment. As mentioned above, the coordinator is
> > somewhat
> > > > > limited in what it can actually validate, but this would return its
> > > > ability
> > > > > to validate commits. The tradeoff is that it increases the
> protocol's
> > > > > complexity which means more ways for the protocol to fail and
> > > > consequently
> > > > > more edge cases in the code.
> > > > >
> > > > > It also misses an opportunity to generalize the group membership
> > > protocol
> > > > > for additional use cases. In fact, after you've gone to the trouble
> > of
> > > > > moving assignment to the client, the main thing that is left in
> this
> > > > > protocol is basically a general group management capability. This
> is
> > > > > exactly what is needed for a few cases that are currently under
> > > > discussion
> > > > > (e.g. copycat or single-writer producer). We've taken this further
> > step
> > > > in
> > > > > the proposal and attempted to envision what that general protocol
> > might
> > > > > look like and how it could be used both by the consumer and for
> some
> > of
> > > > > these other cases.
> > > > >
> > > > > Anyway, since time is running out on the new consumer, we have
> > perhaps
> > > > one
> > > > > last chance to consider a significant change in the protocol like
> > this,
> > > > so
> > > > > have a look at the wiki and share your thoughts. I've no doubt that
> > > some
> > > > > ideas seem clearer in my mind than they do on paper, so ask
> questions
> > > if
> > > > > there is any confusion.
> > > > >
> > > > > Thanks!
> > > > > Jason
> > > > >
> > > >
> > >
> >
>

Reply via email to