Ewen,

Thanks for the explanation.

For (1), I am more concerned about the failure case instead of normal case.
What if a consumer somehow was kick out of a group but is still consuming
and committing offsets? Does that mean the new owner and old owner might
potentially consuming from and committing offsets for the same partition?
In the old consumer, this won't happen because the new consumer will not be
able to start consumption unless the previous owner has released its
ownership. Basically, without the ownership guarantee, I don't see how the
communication among consumers themselves alone can solve the problem here.

For (2) and (3), now I understand how metadata are used. But I still don't
see why should we let the consumers to pass the topic information across
instead of letting coordinator give the information. The single producer
use case does not solve the ownership problem in abnormal case either,
which seems to be a little bit vulnerable.

Thanks,

Jiangjie (Becket) Qin


On Tue, Aug 11, 2015 at 11:06 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Hi Jason,
> >
> > Thanks for writing this up. It would be useful to generalize the group
> > concept. I have a few questions below.
> >
> > 1. In old consumer actually the partition assignment are done by
> consumers
> > themselves. We used zookeeper to guarantee that a partition will only be
> > consumed by one consumer thread who successfully claimed its ownership.
> > Does the new protocol plan to provide the same guarantee?
> >
>
> Once you have all the metadata from all the consumers, assignment should
> just be a simple function mapping that Map<ConsumerId, Metadata> to
> Map<ConsumerId, List<TopicPartition>>. If everyone is consistent in
> computing that, you don't need ZK involved at all.
>
> In practice, this shouldn't be that hard to ensure for most assignment
> strategies just by having decent unit testing on them. You just have to do
> things like ensure your assignment strategy sorts lists into a consistent
> order.
>
> You do give up the ability to use some techniques (e.g. any randomized
> algorithm if you can't distribute the seed w/ the metadata) and it's true
> that nothing validates the assignment, but if that assignment algorithm
> step is kept simple, small, and well tested, the risk is very minimal.
>
>
> >
> > 2. It looks that both JoinGroupRequest and JoinGroupResponse has the
> > ProtocolMetadata.AssignmentStrategyMetadata, what would be the metadata
> be
> > sent and returned by coordinator? How will the coordinator handle the
> > metadata?
> >
>
> The coordinator is basically just blindly broadcasting all of it to group
> members so they have a consistent view.
>
> So from the coordinators perspective, it sees something like:
>
> Consumer 1 -> JoinGroupRequest with GroupProtocols = [ "consumer"
> <Consumer1 opaque byte[]>]
> Consumer 2 -> JoinGroupRequest with GroupProtocols = [ "consumer"
> <Consumer2 opaque byte[]>]
>
> Then, in the responses would look like:
>
> Consumer 1 <- JoinGroupResponse with GroupProtocol = "consumer" and
> GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>, Consumer 2
> <Consumer2 opaque byte[]>]
> Consumer 2 <- JoinGroupResponse with GroupProtocol = "consumer" and
> GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>, Consumer 2
> <Consumer2 opaque byte[]>]
>
> So all the responses include all the metadata for every member in the
> group, and everyone can use that to consistently decide on assignment. The
> broker doesn't care and cannot even understand the metadata since the data
> format for it is dependent on the assignment strategy being used.
>
> As another example that is *not* a consumer, let's say you just want to
> have a single writer in the group which everyone will forward requests to.
> To accomplish this, you could use a very dumb assignment strategy: there is
> no metadata (empty byte[]) and all we care about is who is the first member
> in the group (e.g. when IDs are sorted lexicographically). That member is
> selected as the writer. In that case, we actually just care about the
> membership list, there's no additional info about each member that is
> required to determine who is the writer.
>
>
> > 3. Do you mean that the number of partitions in JoinGroupResponse will be
> > the max partition number of a topic among all the reported partition
> number
> > by consumers? Is there any reason not just let Coordinator to return the
> > number of partitions of a topic in its metadata cache?
> >
>
> Nothing from the embedded protocol is touched by the broker. The broker
> just collects opaque bytes of metadata, does the selection of the strategy
> if multiple are supported by some consumers, and then returns that opaque
> metadata for all the members back to every member. In that way they all
> have a consistent view of the group. For regular consumers, that view of
> the group includes information about how many partitions each consumer
> currently thinks the topics it is subscribed to has. These could be
> inconsistent due to out of date metadata and it would be up to the
> assignment strategy on the *client* to resolve that. As you point out, in
> that case they could just take the max value that any consumer reported
> seeing and use that. The consumers that notice that their metadata had a
> smaller # of partitions should also trigger a metadata update when they see
> someone else observing a larger # of partitions.
>
>
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Kafka Devs,
> > >
> > > One of the nagging issues in the current design of the new consumer has
> > > been the need to support a variety of assignment strategies. We've
> > > encountered this in particular in the design of copycat and the
> > processing
> > > framework (KIP-28). From what I understand, Samza also has a number of
> > use
> > > cases with custom assignment needs. The new consumer protocol supports
> > new
> > > assignment strategies by hooking them into the broker. For many
> > > environments, this is a major pain and in some cases, a non-starter. It
> > > also challenges the validation that the coordinator can provide. For
> > > example, some assignment strategies call for partitions to be assigned
> > > multiple times, which means that the coordinator can only check that
> > > partitions have been assigned at least once.
> > >
> > > To solve these issues, we'd like to propose moving assignment to the
> > > client. I've written a wiki which outlines some protocol changes to
> > achieve
> > > this:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > .
> > > To summarize briefly, instead of the coordinator assigning the
> partitions
> > > itself, all subscriptions are forwarded to each member of the group
> which
> > > then decides independently which partitions it should consume. The
> > protocol
> > > provides a mechanism for the coordinator to validate that all consumers
> > use
> > > the same assignment strategy, but it does not ensure that the resulting
> > > assignment is "correct." This provides a powerful capability for users
> to
> > > control the full data flow on the client side. They control how data is
> > > written to partitions through the Partitioner interface and they
> control
> > > how data is consumed through the assignment strategy, all without
> > touching
> > > the server.
> > >
> > > Of course nothing comes for free. In particular, this change removes
> the
> > > ability of the coordinator to validate that commits are made by
> consumers
> > > who were assigned the respective partition. This might not be too bad
> > since
> > > we retain the ability to validate the generation id, but it is a
> > potential
> > > concern. We have considered alternative protocols which add a second
> > > round-trip to the protocol in order to give the coordinator the ability
> > to
> > > confirm the assignment. As mentioned above, the coordinator is somewhat
> > > limited in what it can actually validate, but this would return its
> > ability
> > > to validate commits. The tradeoff is that it increases the protocol's
> > > complexity which means more ways for the protocol to fail and
> > consequently
> > > more edge cases in the code.
> > >
> > > It also misses an opportunity to generalize the group membership
> protocol
> > > for additional use cases. In fact, after you've gone to the trouble of
> > > moving assignment to the client, the main thing that is left in this
> > > protocol is basically a general group management capability. This is
> > > exactly what is needed for a few cases that are currently under
> > discussion
> > > (e.g. copycat or single-writer producer). We've taken this further step
> > in
> > > the proposal and attempted to envision what that general protocol might
> > > look like and how it could be used both by the consumer and for some of
> > > these other cases.
> > >
> > > Anyway, since time is running out on the new consumer, we have perhaps
> > one
> > > last chance to consider a significant change in the protocol like this,
> > so
> > > have a look at the wiki and share your thoughts. I've no doubt that
> some
> > > ideas seem clearer in my mind than they do on paper, so ask questions
> if
> > > there is any confusion.
> > >
> > > Thanks!
> > > Jason
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to