On Tue, Aug 11, 2015 at 10:03 PM, Onur Karaman <onurkaraman.apa...@gmail.com
> wrote:

> Just to make the conversation a bit easier (I don't think we have really
> established names for these modes yet), basically with the new
> KafkaConsumer today there's:
> - "external management", where the application figures out the group
> management and partition assignment externally
> - "kafka management", where kafka coordinators figure out the group
> management and partition assignment.
>
> With today's design, any sort of custom assignment strategy means you'll
> have to use external management. This proposal adjusts kafka management to
> a place where kafka still provides the group management, but the
> application figures out the partition assignment.
>
> One concern I have regarding the JoinGroupResponse:
> With kafka management today, there's only one thing looking up the
> partitions and figuring out the assignment - the coordinator. All of the
> consumers in the group get a consistent view of the assignment. The
> proposal in the wiki said JoinGroupResponse only contains the member list
> and member metadata. But the consumers still need to find out all the
> partitions for all the topics their group is interested in so that they can
> run the assignment algorithm. You'd probably want to also include all of
> these partitions in the JoinGroupResponse. Otherwise you might run into
> split-brain problems and would require additional coordination steps. I
> don't see how the coordinator can provide these partitions if you put the
> topic subscriptions into the opaque protocol metadata which the coordinator
> never looks at.
>

If you look at the example embedded consumer protocol, you can see that
each client includes the # of partitions it currently thinks exist in the
topic. This does require every client to look those up via metadata
requests (but that's not that bad and they need that info for consuming
data anyway). However, it also means that you can have disagreements if one
consumer's metadata is out of date. There are a couple of options for
resolving that. One is for each consumer to detect this and immediately
refetch metadata and start a new JoinGroup round. This is a bit annoying,
but should resolve the issue very quickly; also this type of change should
be relatively rare, so it's not necessarily worth optimizing. A different
option is for all consumers to just assume whoever reported the max # of
partitions is right and proceed with assignment that way.


>
> Another concern I had was about consumer group rebalances:
> Today, a consumer group can rebalance due to consumer
> joins/failures/leaves(KAFKA-2397), topic partition expansions, or topic
> deletion. I don't see how any of the topic related rebalances can happen if
> you put the topic subscriptions into the opaque protocol metadata which the
> coordinator never looks at.
>
>
Topic partition expansions and deletion can both be picked up by the
consumers as they periodically refresh metadata. At first I thought this
would be slower to be picked up than with the broker watching for those
events. However, in practice I don't think it really is. First of all, even
with the broker watching for those events, you still have to wait for at
least 1 heartbeat period for everyone to get notified (since we can't
proactively send notifications, they are tied to the heartbeat requests).
Second, if you have even a few consumers, they may have reasonably well
distributed metadata updates such that you're not necessarily waiting a
full metadata update period, but rather something closer to metadata update
period / # of consumers.

This does make the client implementation have to do a bit more, and that
may be a significant consideration since it makes 3rd party consumers a bit
harder to write. However, since you already need to be updating metadata it
doesn't seem like a huge additional burden.


> I'm also uncertain about the value of adding a list of SupportedProtocols
> to the JoinGroupRequest as opposed to just one. Adding heuristics to the
> coordinator regarding which protocol to choose seems to add complexity to
> the coordinator and add uncertainty to the consumers over what strategy
> would actually run.
>

Definitely adds a bit of complexity. However, there are a couple of
important use cases centered around zero downtime upgrades. Consider two
scenarios:

1. I start with the default configuration for my consumers, which gives me
range assignment. Now, I realize that was a poor choice -- it's actually
important to use a sticky assignment strategy. If I want to do a rolling
update so my service continues running while I switch to the new config, I
need to be be able to keep the group running in the old mode (range) until
everyone is updated and then they can all switch over. If the metadata
included is different at all, then at least for some time I'll need to be
able to provide both as options -- only once everyone is updated can the
new stick partitioning approach be used.

2. Suppose I'm using a resource-based assignment strategy. Over time, my
resource model grows more complex. Maybe my first version only includes
CPU. In order to use a newer version that now also adds memory, I'm going
to have to send metadata in a new format (we've added the "memory" field).
If you're careful, you may have planned ahead to handle this and guarantee
that the older assignment strategy code can still decode the metadata from
the new version (e.g. you use JSON and just ignore the new fields) and that
everyone can figure out which version of the assignment strategy code they
should use (i.e. they scan through all metadata returned and find the
minimum format version). However, you can also avoid having to be very
careful about it by simply including both formats of metadata and using two
different protocols, i.e. resource-v1 and resource-v2.

The expectation is that the overhead here isn't really that high, and it's
only used during JoinGroup which should not be happening constantly.



>
> I have more questions, but I just wanted to get these initial concerns out
> there.
>
> - Onur
>
> On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Kafka Devs,
> >
> > One of the nagging issues in the current design of the new consumer has
> > been the need to support a variety of assignment strategies. We've
> > encountered this in particular in the design of copycat and the
> processing
> > framework (KIP-28). From what I understand, Samza also has a number of
> use
> > cases with custom assignment needs. The new consumer protocol supports
> new
> > assignment strategies by hooking them into the broker. For many
> > environments, this is a major pain and in some cases, a non-starter. It
> > also challenges the validation that the coordinator can provide. For
> > example, some assignment strategies call for partitions to be assigned
> > multiple times, which means that the coordinator can only check that
> > partitions have been assigned at least once.
> >
> > To solve these issues, we'd like to propose moving assignment to the
> > client. I've written a wiki which outlines some protocol changes to
> achieve
> > this:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > .
> > To summarize briefly, instead of the coordinator assigning the partitions
> > itself, all subscriptions are forwarded to each member of the group which
> > then decides independently which partitions it should consume. The
> protocol
> > provides a mechanism for the coordinator to validate that all consumers
> use
> > the same assignment strategy, but it does not ensure that the resulting
> > assignment is "correct." This provides a powerful capability for users to
> > control the full data flow on the client side. They control how data is
> > written to partitions through the Partitioner interface and they control
> > how data is consumed through the assignment strategy, all without
> touching
> > the server.
> >
> > Of course nothing comes for free. In particular, this change removes the
> > ability of the coordinator to validate that commits are made by consumers
> > who were assigned the respective partition. This might not be too bad
> since
> > we retain the ability to validate the generation id, but it is a
> potential
> > concern. We have considered alternative protocols which add a second
> > round-trip to the protocol in order to give the coordinator the ability
> to
> > confirm the assignment. As mentioned above, the coordinator is somewhat
> > limited in what it can actually validate, but this would return its
> ability
> > to validate commits. The tradeoff is that it increases the protocol's
> > complexity which means more ways for the protocol to fail and
> consequently
> > more edge cases in the code.
> >
> > It also misses an opportunity to generalize the group membership protocol
> > for additional use cases. In fact, after you've gone to the trouble of
> > moving assignment to the client, the main thing that is left in this
> > protocol is basically a general group management capability. This is
> > exactly what is needed for a few cases that are currently under
> discussion
> > (e.g. copycat or single-writer producer). We've taken this further step
> in
> > the proposal and attempted to envision what that general protocol might
> > look like and how it could be used both by the consumer and for some of
> > these other cases.
> >
> > Anyway, since time is running out on the new consumer, we have perhaps
> one
> > last chance to consider a significant change in the protocol like this,
> so
> > have a look at the wiki and share your thoughts. I've no doubt that some
> > ideas seem clearer in my mind than they do on paper, so ask questions if
> > there is any confusion.
> >
> > Thanks!
> > Jason
> >
>



-- 
Thanks,
Ewen

Reply via email to