Neha, Ewen and Jason,

Maybe I am over concerning and I agree that it does depend on the metadata
change frequency. As Neha said, a few tests will be helpful. We can see how
it goes.

What worries me is that in LinkedIn we are in the progress of running Kafka
as a service. That means user will have the ability to create/delete topics
and update their topic configurations programmatically or through UI using
LinkedIn internal cloud service platform. And some automated test will also
be consistently running to create topic, produce/consume some data, then
clean up the topics. This brings us some new challenges for mirror maker
because we need to make sure it actually copies data instead of spending
too much time on rebalance. As what we see now is that each rebalance will
probably take 30 seconds or more to finish even with some tricks and tuning.

Another related use case I want to bring up is that today when we want to
do a rolling upgrade of mirror maker (26 nodes), there will be two rounds
of rebalance to bounce each node, each rebalance takes about 30 seconds. So
bouncing 26 nodes takes roughly half an hour. Awkwardly, during the rolling
bounce, because mirror maker is keeping rebalancing, it actually does not
really copy any data! So the pipeline will literally stall for half an
hour! Since we are designing the new protocol, it will also be good it we
make sure this use case is addressed.

Thanks,

Jiangjie (Becket) Qin

On Sat, Aug 15, 2015 at 10:50 AM, Neha Narkhede <n...@confluent.io> wrote:

> Becket,
>
> This is a clever approach for to ensure that only one thing communicates
> the metadata so even if it is stale, the entire group has the same view.
> However, the big assumption this makes is that the coordinator is that one
> process that has the ability to know the metadata for group members, which
> does not work for any non-consumer use case.
>
> I wonder if we may be complicating the design of 95% use cases for the
> remaining 5%. For instance, how many times do people create and remove
> topics or even add partitions? We operated LI clusters for a long time and
> this wasn't a frequent event that would need us to optimize this design
> for.
>
> Also, this is something we can easily validate by running a few tests on
> the patch and I suggest we wait for that.
>
> Thanks,
> Neha
>
> On Sat, Aug 15, 2015 at 9:14 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Jiangjie,
> >
> > I was thinking about the same problem. When metadata is changing
> > frequently, the clients may not be able to ever find agreement on the
> > current state. The server doesn't have this problem, as you say, because
> it
> > can just take a snapshot and send that to the clients. Adding a dampening
> > setting to the client would help if churn is sporadic, but not if it is
> > steady. I think the metadata would have to be changing really frequently
> > for this to be a problem practically, but this is a case where the
> > server-side approach has an advantage.
> >
> > Including the metadata in the join group response would require making
> the
> > subscription available to the coordinator, right? We lose a little bit of
> > the generality of the protocol, but it might not be too bad since most
> use
> > cases for reusing this protocol have a similar need for metadata (and
> they
> > can always pass an empty subscription if they don't). We've gone back and
> > forth a few times on this, and I'm generally not opposed. It might help
> if
> > we try to quantify the impact of the metadata churn in practice. I think
> > the rate of change would have to be in the low seconds for this to be a
> > real problem. It does seem nice though that we have a way to manage even
> > this much churn when we do this.
> >
> > -Jason
> >
> >
> >
> > On Fri, Aug 14, 2015 at 9:03 PM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> > wrote:
> >
> > > Ewen,
> > >
> > > I agree that if there is a churn in metadata, the consumers need
> several
> > > rounds of rebalances to succeed. The difference I am thinking is that
> > with
> > > coordinator as single source of truth, we can let the consumer finish
> one
> > > round of rebalance, work for a while and start the next round of
> > rebalance.
> > > If we purely depend on the consumers to synchronize by themselves based
> > on
> > > different metadata sources, is it possible we have some groups
> spending a
> > > lot of time on rebalancing but not able to make too much progress in
> > > consuming?
> > >
> > > I'm thinking can we let consumers to fetch metadata only from their
> > > coordinator? So the rebalance can be done in the following way:
> > >
> > > 1. Consumers refresh their metadata periodically
> > > 2. If one of the consumer sees a change in metadata that triggers a
> > > rebalance, it sends JoinGroupRequest to coordinator.
> > > 3. Once the coordinator receives the first JoinGroupRequest of a group,
> > it
> > > takes a snapshot of current metadata and the group enters
> > prepare-rebalance
> > > state.
> > > 4. The metadata snapshot will be used for this round of rebalance. i.e.
> > the
> > > metadata snapshot will be sent to consumers in JoinGroupResponse.
> > > 4.1 If the consumers are subscribing to explicit topic lists (not
> regex),
> > > the JoinGroupResponse needs only contain the metadata of all topics the
> > > group is interested.
> > > 4.2 If the consumers are subscribing using regex, all the topic
> metadata
> > > will be returned to the consumer.
> > > 5. Consumers get JoinGroupResponse, refresh metadata using the metadata
> > in
> > > JoinGroupResponse, run algorithm to assign partitions and start
> consume.
> > > 6. Go back to 1.
> > >
> > > The benefit here is that we can let rebalance finish in one round, and
> > all
> > > the rest of changes will be captured in next consumer metadata refresh
> -
> > so
> > > we get group commit. One concern might be letting consumer refresh
> > metadata
> > > from coordinator might cause issue for big consumer groups. Maybe that
> is
> > > OK because metadata refresh is infrequent.
> > >
> > > This approach actually is very similar to what is proposed now:
> rebalance
> > > is triggered by metadata refresh, consumer provides subscription list
> to
> > > pass around. The only difference is that we don't need metadata hash
> > > anymore because the metadata is guaranteed to be the same. Replacing
> > > metadata hash with actual metadata will not have too much overhead for
> > > small subscription groups. There will be some overhead for regex
> > > subscriptions, but this can save the potential extra round of metadata
> > > fetch and will only occur when consumer see metadata change, which is
> > > infrequent.
> > >
> > > Any thoughts?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Aug 14, 2015 at 12:57 PM, Ewen Cheslack-Postava <
> > e...@confluent.io
> > > >
> > > wrote:
> > >
> > > > On Fri, Aug 14, 2015 at 10:59 AM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Neha and Ewen,
> > > > >
> > > > > About the metadata change frequency. I guess it really depends on
> how
> > > > > frequent the metadata change might occur. If we run Kafka as a
> > > service, I
> > > > > can see that happens from time to time. As I can imagine people
> will
> > > > create
> > > > > some topic, test and maybe delete the topic in some automated test.
> > If
> > > > so,
> > > > > the proposed protocol might be a little bit vulnerable.
> > > > >
> > > > > More specifically the scenario I am thinking is:
> > > > > 1. Consumer 0 periodically refresh metadata and detected a metadata
> > > > change.
> > > > > So it sends a JoinGroupRequest with metadata_hash_0.
> > > > > 2. Consumer 1 was notified by controller to start a rebalance, so
> it
> > > > > refreshes its metadata and send a JoingGroupRequest with
> > > metadata_hash_1,
> > > > > which is different from metadata hash 0.
> > > > > 3. Rebalance failed and both consumer refresh there metadata again
> > from
> > > > > different brokers.
> > > > > 4. Depending on the metadata change frequency(or some admin
> operation
> > > > like
> > > > > partition reassigment), they may or may not have the same metadata
> > > > > returned, so the restart from 3 again.
> > > > >
> > > > > I agree that step 4 might not be a big concern if consumers updates
> > > > > metadata at almost the same time, but I'm a little bit worried
> > whether
> > > > that
> > > > > assumption really stands because we do not have control over how
> > > frequent
> > > > > the metadata can change.
> > > > >
> > > > >
> > > > Is this really that different from what would happen if the
> coordinator
> > > > distributed the metadata to consumers? In that case you would
> trivially
> > > > have everyone in a consistent state, but those metadata changes would
> > > still
> > > > cause churn and require JoinGroup rounds, during which processing is
> > > > stalled for the nodes that are waiting on other members to re-join
> the
> > > > group.
> > > >
> > > > -Ewen
> > > >
> > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <
> > > > e...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <
> n...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Becket,
> > > > > > >
> > > > > > > As you say, the metadata hash addresses the concern you
> > originally
> > > > > raised
> > > > > > > about large topic subscriptions. Can you please list other
> > problems
> > > > you
> > > > > > are
> > > > > > > raising more clearly? It is more helpful to know problems that
> > the
> > > > > > proposal
> > > > > > > does not address or addresses poorly.
> > > > > > >
> > > > > > > Regarding other things you said -
> > > > > > >
> > > > > > > it is required that each
> > > > > > > > consumer refresh their metadata before sending a
> > JoinGroupRequest
> > > > > > > >
> > > > > > >
> > > > > > > This is required for wildcard topic subscriptions anyway. So
> this
> > > > > > proposal
> > > > > > > does not introduce a regression. We had agreed earlier that it
> > does
> > > > not
> > > > > > > make sense for the server to deserialize regular expressions
> sent
> > > by
> > > > > the
> > > > > > > consumer.
> > > > > > >
> > > > > >
> > > > > > I don't think consumers need to do a metadata refresh before
> > sending
> > > a
> > > > > > JoinGroupRequest. Metadata changes that affect assignment are
> rare
> > --
> > > > it
> > > > > > requires changing the number of partitions in a topic. But you
> > might
> > > > > send a
> > > > > > JoinGroupRequest simply because a new member is trying to join
> the
> > > > group.
> > > > > > That case is presumably much more common.
> > > > > >
> > > > > > I think it's actually a good idea to have the first JoinGroup
> cycle
> > > > fail
> > > > > in
> > > > > > some cases, and has little impact. Lets say the metadata does
> > change
> > > > > > because partitions are added. Then we might fail in the first
> > round,
> > > > but
> > > > > > then all members detect that issue *immediately*, refresh their
> > > > metadata,
> > > > > > and submit a new join group request. This second cycle does not
> > > > require a
> > > > > > full heartbeat cycle. It happens much more quickly because
> everyone
> > > > > > detected the inconsistency based on the first JoinGroupResponse.
> > The
> > > > > > inconsistency should be resolved very quickly (barring other
> > failures
> > > > > like
> > > > > > a member leaving mid-rebalance)
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > > >
> > > > > > >
> > > > > > > > the metadata might still be inconsistent if there is a topic
> or
> > > > > > partition
> > > > > > > > change because the
> > > > > > > > UpdateMetadataRequest from controller might be handled at
> > > different
> > > > > > time.
> > > > > > > >
> > > > > > >
> > > > > > > Topic metadata does not change frequently and even if it did, a
> > > > couple
> > > > > > > rebalance attempts will be needed whether the coordinator
> drives
> > > the
> > > > > > > assignments or the consumer. Because guess how the coordinator
> > > knows
> > > > > > about
> > > > > > > the topic metadata changes -- indirectly through either a zk
> > > callback
> > > > > or
> > > > > > > UpdateMetadataRequest, so it is completely possible the
> > coordinator
> > > > > sees
> > > > > > > the topic metadata changes in batches, not all at once.
> > > > > > >
> > > > > >
> > > > > > > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <
> > n...@confluent.io
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Ewen/Jason,
> > > > > > > >
> > > > > > > > The metadata hash is a clever approach and certainly
> addresses
> > > the
> > > > > > > problem
> > > > > > > > of large metadata for consumers like mirror maker. Few
> > comments -
> > > > > > > >
> > > > > > > >
> > > > > > > >    1. In the interest of simplifying the format of the
> > consumer's
> > > > > > > >    metadata - Why not just always include only the topic
> names
> > in
> > > > the
> > > > > > > metadata
> > > > > > > >    followed by the metadata hash? If the metadata hash check
> > > > > succeeds,
> > > > > > > each
> > > > > > > >    consumer uses the # of partitions it had fetched. If it
> > > fails, a
> > > > > > > rebalance
> > > > > > > >    happens and the metadata is not used anyway.
> > > > > > >
> > > > > >
> > > > > > Doing this requires that every consumer always fetch the full
> > > metadata.
> > > > > The
> > > > > > most common use case is consumers that just want to consume one
> or
> > a
> > > > > couple
> > > > > > of topics, in which case grabbing all metadata for the entire
> > cluster
> > > > is
> > > > > > wasteful. If I subscribe only to topic A, why make all consumers
> > grab
> > > > > > metadata for the entire topic (and need to rebalance every time
> it
> > > > > > changes!). Including the # of partitions for each topic lets you
> > > avoid
> > > > > > having to grab the global set of metadata.
> > > > > >
> > > > > > So if you're just subscribing to one or a couple of topics, why
> not
> > > > just
> > > > > > compute the hash by filtering out everything but the topics you
> are
> > > > > > subscribed to? The problem there is if you ever add/remove
> > > > subscriptions
> > > > > > and want to support rolling upgrades. If the group was subscribed
> > to
> > > > > topic
> > > > > > A, but later changes require subscribing to A + B, then to
> achieve
> > a
> > > > > > seamless rolling upgrade would require one (old) consumer to be
> > > > > subscribing
> > > > > > to A and one (new) consumer to be subscribing to A+B. If we
> > computed
> > > > > > metadata hashes based on filtered metadata, those two would
> > disagree
> > > > and
> > > > > we
> > > > > > could not perform assignment while the upgrade was in progress.
> > > > > >
> > > > > > The solution is to differentiate between the cases when a very
> > small
> > > > > amount
> > > > > > of the metadata is needed (one or a couple of topic
> subscriptions;
> > > > > > communicate and share this via metadata in the JoinGroup
> protocol)
> > vs
> > > > > when
> > > > > > *all* the metadata is needed (regex subscription; verify
> agreement
> > > via
> > > > > > hash).
> > > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > > >    2. Do you need a topic list and topic regex to be
> separate?
> > A
> > > > > single
> > > > > > > >    topic or list of topics can be expressed as a regex.
> > > > > > >
> > > > > >
> > > > > > See above note about collecting all metadata when you really only
> > > need
> > > > it
> > > > > > for 1 or 2 topics. There's probably some debate to be had about
> > > whether
> > > > > > this cost would be too high -- every consumer would need to
> request
> > > the
> > > > > > metadata for all topics, and they'd need to request that all
> every
> > > time
> > > > > > they might be out of date.
> > > > > >
> > > > > Are we going to allow consumers in the same group to subscribe to
> > > > different
> > > > > topic set? If we do, we need to let them refresh metadata for all
> the
> > > > > topics a group is consuming from. If we don't then in the protocol
> we
> > > > only
> > > > > need a subscription set hash.
> > > > >
> > > > > >
> > > > > >
> > > > > > > >    3. Let's include a version explicitly at the beginning of
> > the
> > > > > > > >    ProtocolMetadata. The version dictates how to deserialize
> > the
> > > > > > > >    ProtocolMetadata blob and is consistent with the rest of
> > > Kafka.
> > > > > > >
> > > > > >
> > > > > > If I'm understanding correctly, in JoinGroupRequest I would
> change
> > > > > >
> > > > > > GroupProtocols          => [Protocol ProtocolMetadata]
> > > > > >
> > > > > > to
> > > > > >
> > > > > > GroupProtocols          => [Protocol ProtocolVersion
> > > ProtocolMetadata]
> > > > > >
> > > > > > We had been talking about just baking the version into the
> Protocol
> > > > > field,
> > > > > > but making it separate seems perfectly reasonable to me. Jason,
> any
> > > > issue
> > > > > > with splitting the version out into a separate field like this?
> > > > > >
> > > > > > >
> > > > > > > > That can simplify the metadata format to the following:
> > > > > > > >
> > > > > > > > GroupType => "consumer"
> > > > > > > >>
> > > > > > > >> Protocol => AssignmentStrategy
> > > > > > > >>   AssignmentStrategy   => String
> > > > > > > >>
> > > > > > > >> ProtocolMetadata => Version Subscription
> > > > AssignmentStrategyMetadata
> > > > > > > >
> > > > > > > >     Version                    => String
> > > > > > > >
> > > > > > > >   Subscription                 => TopicRegex MetadataHash
> > > > > > > >>     TopicRegex                 => String
> > > > > > > >>     MetadataHash               => String
> > > > > > > >>   AssignmentStrategyMetadata   => bytes
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Ewen and Jason,
> > > > > > > >>
> > > > > > > >> Thanks for the reply. Sorry I missed the metadata hash. Yes,
> > > that
> > > > > is a
> > > > > > > >> clever approach and would solve my concern about the data
> > > passing
> > > > > > > around.
> > > > > > > >> I
> > > > > > > >> can see both pros and cons from doing this, though. The
> > > advantage
> > > > is
> > > > > > we
> > > > > > > >> don't need the topic metadata in JoinGroupResponse anymore.
> > The
> > > > > > downside
> > > > > > > >> is
> > > > > > > >> that now rebalance have extra dependency on the consensus of
> > > > > metadata
> > > > > > of
> > > > > > > >> all consumers, which is obtained separately. So it is
> required
> > > > that
> > > > > > each
> > > > > > > >> consumer refresh their metadata before sending a
> > > JoinGroupRequest,
> > > > > > > >> otherwise in some cases (e.g. wildcard consumers) will
> almost
> > > > > > certainly
> > > > > > > >> fail for the first rebalance attempt. Even if we do that,
> > since
> > > > the
> > > > > > > >> consumers are getting metadata from different brokers, the
> > > > metadata
> > > > > > > might
> > > > > > > >> still be inconsistent if there is a topic or partition
> change
> > > > > because
> > > > > > > the
> > > > > > > >> UpdateMetadataRequest from controller might be handled at
> > > > different
> > > > > > > time.
> > > > > > > >> Just want to make sure we think through the cases so the
> > > protocol
> > > > > does
> > > > > > > not
> > > > > > > >> cause us unexpected issues.
> > > > > > > >>
> > > > > > > >> About the number of consumers, I think with the current
> > > liveliness
> > > > > > > >> definition, we can tolerate churns by bumping up the session
> > > > > timeout.
> > > > > > > Also
> > > > > > > >> I guess we will see an increasing number of consumers for
> new
> > > > > > consumer,
> > > > > > > >> because every the old consumer thread will probably become a
> > > > > consumer.
> > > > > > > >>
> > > > > > > >> It is a valid concern for consumers that have large
> > subscription
> > > > > set.
> > > > > > > This
> > > > > > > >> might not be avoided though for client side assignment
> > approach.
> > > > One
> > > > > > > >> solution is having topic names associate with a topic ID.
> And
> > > only
> > > > > use
> > > > > > > >> topic ID in JoinGroupRequest and JoinGroupResponse, There
> is a
> > > > > > > discussion
> > > > > > > >> thread about this to solve the topic renaming case but this
> > is a
> > > > > > > >> completely
> > > > > > > >> different discussion.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> Jiangjie (Becket) Qin
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <
> > > > > ja...@confluent.io>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Thanks Jiangjie, that information helps. I agree the
> > protocol
> > > > must
> > > > > > > >> consider
> > > > > > > >> > scalability. My point was that the synchronization barrier
> > in
> > > > the
> > > > > > > >> current
> > > > > > > >> > protocol already effectively limits the number of
> consumers
> > > > since
> > > > > it
> > > > > > > >> > provides no way to gracefully handle churn. It wouldn't be
> > > worth
> > > > > > > >> worrying
> > > > > > > >> > about scaling up to 100,000 members, for example, because
> > > > there's
> > > > > no
> > > > > > > way
> > > > > > > >> > the group would be stable. So we just need to set some
> clear
> > > > > > > >> expectations
> > > > > > > >> > on the size we can scale to, and that can help inform the
> > > > > discussion
> > > > > > > on
> > > > > > > >> the
> > > > > > > >> > size of messages in this protocol.
> > > > > > > >> >
> > > > > > > >> > Ewen and I were discussing this morning along similar
> lines
> > to
> > > > > what
> > > > > > > >> you're
> > > > > > > >> > suggesting. However, even if the coordinator decides on
> the
> > > > > metadata
> > > > > > > for
> > > > > > > >> > the group, each member still needs to communicate its
> > > > > subscriptions
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > rest of the group. This is nice for the regex case since
> the
> > > > regex
> > > > > > is
> > > > > > > >> > probably small, but if the members have a large topic
> list,
> > > then
> > > > > we
> > > > > > > have
> > > > > > > >> > the same problem. One thing I was thinking about was
> whether
> > > we
> > > > > > really
> > > > > > > >> need
> > > > > > > >> > to handle different subscriptions for every member. If the
> > > > > > coordinator
> > > > > > > >> > could guarantee that all members had the same
> subscription,
> > > then
> > > > > > there
> > > > > > > >> > would be no need for the coordinator to return the
> > > subscriptions
> > > > > for
> > > > > > > >> each
> > > > > > > >> > member. However, this would prevent graceful upgrades. We
> > > might
> > > > be
> > > > > > > able
> > > > > > > >> to
> > > > > > > >> > fix that problem by allowing the consumer to provide two
> > > > > > subscriptions
> > > > > > > >> to
> > > > > > > >> > allowing rolling updates, but that starts to sound pretty
> > > nasty.
> > > > > > > >> >
> > > > > > > >> > -Jason
> > > > > > > >> >
> > > > > > > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > > <j...@linkedin.com.invalid
> > > > > > > >> >
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Jason,
> > > > > > > >> > >
> > > > > > > >> > > The protocol has to consider the scalability. The
> protocol
> > > in
> > > > > the
> > > > > > > wiki
> > > > > > > >> > > means the JoinGroupResoponse size would be:
> > > > > > > >> > > NumberOfTopics * (AvgTopicNameLength + 4) *
> > > > > (NumberOfConsumers)^2
> > > > > > > >> > >
> > > > > > > >> > > To give some real number, we have 26-node Mirror Maker
> > > > cluster,
> > > > > > each
> > > > > > > >> > with 4
> > > > > > > >> > > consumers. That is 104 consumers using regex ".*". And
> > most
> > > of
> > > > > our
> > > > > > > >> > clusters
> > > > > > > >> > > have around 3000 topics, whose topic name are typically
> > > around
> > > > > 20
> > > > > > > >> > > characters.
> > > > > > > >> > >
> > > > > > > >> > > I think the key issue for client side partition
> assignment
> > > > logic
> > > > > > is
> > > > > > > to
> > > > > > > >> > make
> > > > > > > >> > > sure 1) all the clients run the same algorithm. 2) all
> the
> > > > > clients
> > > > > > > >> make
> > > > > > > >> > > decision on the same topic metadata. The second purpose
> > can
> > > be
> > > > > > done
> > > > > > > by
> > > > > > > >> > > simply letting coordinator provide the topic metadata
> and
> > > all
> > > > > then
> > > > > > > >> member
> > > > > > > >> > > information as source of truth. Is it necessary to pass
> > > topic
> > > > > > > >> metadata of
> > > > > > > >> > > each consumer around? Can we keep the protocol metadata
> > > field
> > > > > > > >> completely
> > > > > > > >> > > independent of topic metadata? I think In the
> > > > JoinGroupResponse,
> > > > > > we
> > > > > > > >> > should
> > > > > > > >> > > have only one copy of topic metadata provided by
> > coordinator
> > > > and
> > > > > > is
> > > > > > > >> > outside
> > > > > > > >> > > of protocol metadata. If user decides to put some
> metadata
> > > in
> > > > > the
> > > > > > > >> > > JoinGroupRequest and let coordinator pass around, they
> are
> > > > > > > responsible
> > > > > > > >> > for
> > > > > > > >> > > understanding the risk.
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > >
> > > > > > > >> > > Jiangjie (Becket) Qin
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason Gustafson <
> > > > > > > ja...@confluent.io
> > > > > > > >> >
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hey Onur and Jiangjie,
> > > > > > > >> > > >
> > > > > > > >> > > > I've updated that wiki with a proposal to add regex
> > > > > > subscriptions
> > > > > > > to
> > > > > > > >> > the
> > > > > > > >> > > > consumer metadata. Can you have a look to see if it
> > > > addresses
> > > > > > your
> > > > > > > >> > > concern?
> > > > > > > >> > > > In general, I think we should be a little careful when
> > we
> > > > are
> > > > > > > >> talking
> > > > > > > >> > > about
> > > > > > > >> > > > the scalability of the protocol. Regardless of whether
> > > > > > assignment
> > > > > > > is
> > > > > > > >> > done
> > > > > > > >> > > > on the server or the client, the protocol assumes a
> > > > relatively
> > > > > > > >> stable
> > > > > > > >> > > > configuration. When the number of consumers increases
> > > > beyond a
> > > > > > > >> certain
> > > > > > > >> > > > limit, then membership churn becomes a major concern.
> > > > > Similarly
> > > > > > > >> there
> > > > > > > >> > is
> > > > > > > >> > > a
> > > > > > > >> > > > notion of metadata churn when topics are added,
> deleted,
> > > or
> > > > > > > >> resized. If
> > > > > > > >> > > > either membership or metadata changes, then the
> protocol
> > > > > forces
> > > > > > > all
> > > > > > > >> > > > consumers to stop consumption and rejoin the group. If
> > > this
> > > > > > > happens
> > > > > > > >> > often
> > > > > > > >> > > > enough, then it can severely impact the ability of the
> > > > > consumer
> > > > > > to
> > > > > > > >> make
> > > > > > > >> > > > progress. The point is that the protocol may already
> be
> > > > > unsuited
> > > > > > > to
> > > > > > > >> > cases
> > > > > > > >> > > > where there are either a large number of consumers or
> > > > topics.
> > > > > I
> > > > > > > >> wonder
> > > > > > > >> > if
> > > > > > > >> > > > you guys can share your thoughts about your scaling
> > > > > > expectations?
> > > > > > > >> > > >
> > > > > > > >> > > > -Jason
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson <
> > > > > > > >> ja...@confluent.io>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hey Jiangjie,
> > > > > > > >> > > > >
> > > > > > > >> > > > > That's a great point. In the worst case (the mirror
> > > maker
> > > > > > case I
> > > > > > > >> > > guess),
> > > > > > > >> > > > > the join group response can be massive. This would
> be
> > > > > > especially
> > > > > > > >> > deadly
> > > > > > > >> > > > > when there is a lot of churn in the group (e.g. in a
> > > > rolling
> > > > > > > >> > upgrade).
> > > > > > > >> > > > The
> > > > > > > >> > > > > current protocol is not great for this case either,
> > but
> > > > it's
> > > > > > > >> > > > significantly
> > > > > > > >> > > > > better. Here are a couple ways to deal with the
> size:
> > > > > > > >> > > > >
> > > > > > > >> > > > > 1. First, we could have the coordinator compress the
> > > > > > responses.
> > > > > > > >> This
> > > > > > > >> > > > would
> > > > > > > >> > > > > probably be pretty effective if applied across the
> > > > metadata
> > > > > > from
> > > > > > > >> all
> > > > > > > >> > > > > members.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 2. I think the regex case is the main problem. Is
> that
> > > > > right?
> > > > > > We
> > > > > > > >> > could
> > > > > > > >> > > > > extend the metadata to allow the consumer to embed
> its
> > > > regex
> > > > > > > >> > > subscription
> > > > > > > >> > > > > in the metadata directly (note this might be a good
> > idea
> > > > > > > >> regardless
> > > > > > > >> > of
> > > > > > > >> > > > the
> > > > > > > >> > > > > rest of this proposal). To support regex on the
> > > consumer,
> > > > we
> > > > > > > must
> > > > > > > >> > fetch
> > > > > > > >> > > > > metadata for all topics. Rather than having all
> regex
> > > > > > > subscribers
> > > > > > > >> > embed
> > > > > > > >> > > > all
> > > > > > > >> > > > > of this metadata in their join group requests, they
> > > could
> > > > > > > instead
> > > > > > > >> > > embed a
> > > > > > > >> > > > > hash of it. Then after the join group responses are
> > > > > received,
> > > > > > > they
> > > > > > > >> > just
> > > > > > > >> > > > > need to check that the hashes are the same. If there
> > is
> > > a
> > > > > > > mismatch
> > > > > > > >> > > (which
> > > > > > > >> > > > > should only occur when topics are created, deleted,
> or
> > > > > > resized),
> > > > > > > >> then
> > > > > > > >> > > the
> > > > > > > >> > > > > group members must refetch the metadata and rejoin
> the
> > > > > group.
> > > > > > > >> This is
> > > > > > > >> > > > also
> > > > > > > >> > > > > how the current protocol behaves when there is a
> > change
> > > in
> > > > > the
> > > > > > > >> topic
> > > > > > > >> > > > > metadata affecting the group--someone (either the
> > > > > coordinator
> > > > > > or
> > > > > > > >> the
> > > > > > > >> > > > > consumer) detects the change and forces the group to
> > > > > > rebalance.
> > > > > > > >> > > > >
> > > > > > > >> > > > > What do you think?
> > > > > > > >> > > > >
> > > > > > > >> > > > > (Also I think adding groupId/generationId to fetch
> and
> > > > > produce
> > > > > > > >> > requests
> > > > > > > >> > > > > seems like an interesting line of thought.)
> > > > > > > >> > > > >
> > > > > > > >> > > > > -Jason
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin
> > > > > > > >> > > <j...@linkedin.com.invalid
> > > > > > > >> > > > >
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > >> Hey Ewen,
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Onur and I discussed this a little bit more. And we
> > are
> > > > > still
> > > > > > > >> > worrying
> > > > > > > >> > > > >> about passing all the metadata of all consumers
> > around.
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Let's say I have a cluster has 10,000 topics, the
> > > average
> > > > > > topic
> > > > > > > >> name
> > > > > > > >> > > > >> length
> > > > > > > >> > > > >> is 10 bytes. In this case, the opaque metadata will
> > > have
> > > > > 10 *
> > > > > > > >> > 10,000 =
> > > > > > > >> > > > >> 100KB for topic name, for each topic, there is a
> > 4-byte
> > > > > > integer
> > > > > > > >> of
> > > > > > > >> > > > number
> > > > > > > >> > > > >> of partitions, that's another 40KB. So one global
> > topic
> > > > > > > metadata
> > > > > > > >> > will
> > > > > > > >> > > > have
> > > > > > > >> > > > >> 140KB data. If I have 100 consumers who are using
> > > > wildcard
> > > > > to
> > > > > > > >> > consume
> > > > > > > >> > > > from
> > > > > > > >> > > > >> all the topics. That means the protocol metadata
> end
> > up
> > > > in
> > > > > > the
> > > > > > > >> > > > >> JoinGroupResponse will be 140KB * 100 = 14MB. And
> the
> > > > > > > >> > > JoinGroupResponse
> > > > > > > >> > > > >> will need to be sent to 100 different consumers,
> that
> > > > means
> > > > > > > 14MB
> > > > > > > >> *
> > > > > > > >> > > 100 =
> > > > > > > >> > > > >> 1.4GB need to be sent by the consumer coordinator
> for
> > > one
> > > > > > > >> rebalance.
> > > > > > > >> > > How
> > > > > > > >> > > > >> would that work?
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Also, having two consumers (old owner and new
> owner)
> > > > > > consuming
> > > > > > > >> from
> > > > > > > >> > > the
> > > > > > > >> > > > >> same partition might also be a problem. e.g. people
> > are
> > > > > > > updating
> > > > > > > >> > > > database.
> > > > > > > >> > > > >> One thing might worth doing is to add GroupId and
> > > > > Generation
> > > > > > ID
> > > > > > > >> to
> > > > > > > >> > > > >> ProducerRequest and FetchRequest as well. This will
> > > also
> > > > > help
> > > > > > > >> with
> > > > > > > >> > the
> > > > > > > >> > > > >> single producer use case. However, this is probably
> > > > > > orthogonal
> > > > > > > to
> > > > > > > >> > this
> > > > > > > >> > > > >> thread given the current new consumer also has this
> > > > problem
> > > > > > > and I
> > > > > > > >> > > > believe
> > > > > > > >> > > > >> we need to fix it.
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Thanks,
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Jiangjie (Becket) Qin
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen
> > > Cheslack-Postava <
> > > > > > > >> > > > >> e...@confluent.io>
> > > > > > > >> > > > >> wrote:
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie Qin
> > > > > > > >> > > > >> <j...@linkedin.com.invalid>
> > > > > > > >> > > > >> > wrote:
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > > Ewen,
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > Thanks for the explanation.
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > For (1), I am more concerned about the failure
> > case
> > > > > > instead
> > > > > > > >> of
> > > > > > > >> > > > normal
> > > > > > > >> > > > >> > case.
> > > > > > > >> > > > >> > > What if a consumer somehow was kick out of a
> > group
> > > > but
> > > > > is
> > > > > > > >> still
> > > > > > > >> > > > >> consuming
> > > > > > > >> > > > >> > > and committing offsets? Does that mean the new
> > > owner
> > > > > and
> > > > > > > old
> > > > > > > >> > owner
> > > > > > > >> > > > >> might
> > > > > > > >> > > > >> > > potentially consuming from and committing
> offsets
> > > for
> > > > > the
> > > > > > > >> same
> > > > > > > >> > > > >> partition?
> > > > > > > >> > > > >> > > In the old consumer, this won't happen because
> > the
> > > > new
> > > > > > > >> consumer
> > > > > > > >> > > will
> > > > > > > >> > > > >> not
> > > > > > > >> > > > >> > be
> > > > > > > >> > > > >> > > able to start consumption unless the previous
> > owner
> > > > has
> > > > > > > >> released
> > > > > > > >> > > its
> > > > > > > >> > > > >> > > ownership. Basically, without the ownership
> > > > guarantee,
> > > > > I
> > > > > > > >> don't
> > > > > > > >> > see
> > > > > > > >> > > > how
> > > > > > > >> > > > >> > the
> > > > > > > >> > > > >> > > communication among consumers themselves alone
> > can
> > > > > solve
> > > > > > > the
> > > > > > > >> > > problem
> > > > > > > >> > > > >> > here.
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > The generation ID check still applies to offset
> > > > commits.
> > > > > If
> > > > > > > >> one of
> > > > > > > >> > > the
> > > > > > > >> > > > >> > consumers is kicked out and misbehaving, it can
> > > > obviously
> > > > > > > still
> > > > > > > >> > > fetch
> > > > > > > >> > > > >> and
> > > > > > > >> > > > >> > process messages, but offset commits will not
> work
> > > > since
> > > > > it
> > > > > > > >> will
> > > > > > > >> > not
> > > > > > > >> > > > >> have
> > > > > > > >> > > > >> > the current generation ID.
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > For (2) and (3), now I understand how metadata
> > are
> > > > > used.
> > > > > > > But
> > > > > > > >> I
> > > > > > > >> > > still
> > > > > > > >> > > > >> > don't
> > > > > > > >> > > > >> > > see why should we let the consumers to pass the
> > > topic
> > > > > > > >> > information
> > > > > > > >> > > > >> across
> > > > > > > >> > > > >> > > instead of letting coordinator give the
> > > information.
> > > > > The
> > > > > > > >> single
> > > > > > > >> > > > >> producer
> > > > > > > >> > > > >> > > use case does not solve the ownership problem
> in
> > > > > abnormal
> > > > > > > >> case
> > > > > > > >> > > > either,
> > > > > > > >> > > > >> > > which seems to be a little bit vulnerable.
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > One of the goals here was to generalize group
> > > > membership
> > > > > so
> > > > > > > we
> > > > > > > >> > can,
> > > > > > > >> > > > for
> > > > > > > >> > > > >> > example, use it for balancing Copycat tasks
> across
> > > > > workers.
> > > > > > > >> > There's
> > > > > > > >> > > no
> > > > > > > >> > > > >> > topic subscription info in that case. The
> metadata
> > > for
> > > > > > > copycat
> > > > > > > >> > > workers
> > > > > > > >> > > > >> > would instead need to somehow indicate the
> current
> > > set
> > > > of
> > > > > > > tasks
> > > > > > > >> > that
> > > > > > > >> > > > >> need
> > > > > > > >> > > > >> > to be assigned to workers. By making the metadata
> > > > > > completely
> > > > > > > >> > opaque
> > > > > > > >> > > to
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > protocol, it becomes more generally useful since
> it
> > > > > focuses
> > > > > > > >> > squarely
> > > > > > > >> > > > on
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > group membership problem, allowing for that
> > > additional
> > > > > bit
> > > > > > of
> > > > > > > >> > > metadata
> > > > > > > >> > > > >> so
> > > > > > > >> > > > >> > you don't just get a list of members, but also
> get
> > a
> > > > > little
> > > > > > > >> bit of
> > > > > > > >> > > > info
> > > > > > > >> > > > >> > about each of them.
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > A different option that we explored is to use a
> > sort
> > > of
> > > > > > mixed
> > > > > > > >> > model
> > > > > > > >> > > --
> > > > > > > >> > > > >> > still bake all the topic subscriptions directly
> > into
> > > > the
> > > > > > > >> protocol
> > > > > > > >> > > but
> > > > > > > >> > > > >> also
> > > > > > > >> > > > >> > include metadata. That would allow us to maintain
> > the
> > > > > > > existing
> > > > > > > >> > > > >> > coordinator-driven approach to handling the
> > metadata
> > > > and
> > > > > > > change
> > > > > > > >> > > events
> > > > > > > >> > > > >> like
> > > > > > > >> > > > >> > the ones Onur pointed out. Then something like
> the
> > > > > Copycat
> > > > > > > >> workers
> > > > > > > >> > > > would
> > > > > > > >> > > > >> > just not fill in any topic subscriptions and it
> > would
> > > > be
> > > > > > > >> handled
> > > > > > > >> > as
> > > > > > > >> > > a
> > > > > > > >> > > > >> > degenerate case. Based on the way I explained
> that
> > we
> > > > can
> > > > > > > >> handle
> > > > > > > >> > > those
> > > > > > > >> > > > >> > types of events, I personally feel its cleaner
> and
> > a
> > > > > nicer
> > > > > > > >> > > > >> generalization
> > > > > > > >> > > > >> > to not include the subscriptions in the join
> group
> > > > > > protocol,
> > > > > > > >> > making
> > > > > > > >> > > it
> > > > > > > >> > > > >> part
> > > > > > > >> > > > >> > of the metadata instead.
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > For the single producer case, are you saying it
> > > doesn't
> > > > > > solve
> > > > > > > >> > > > ownership
> > > > > > > >> > > > >> in
> > > > > > > >> > > > >> > the abnormal case because a producer that doesn't
> > > know
> > > > it
> > > > > > has
> > > > > > > >> been
> > > > > > > >> > > > >> kicked
> > > > > > > >> > > > >> > out of the group yet can still produce data even
> > > though
> > > > > it
> > > > > > > >> > shouldn't
> > > > > > > >> > > > be
> > > > > > > >> > > > >> > able to anymore? I definitely agree that that is
> a
> > > risk
> > > > > --
> > > > > > > this
> > > > > > > >> > > > >> provides a
> > > > > > > >> > > > >> > way to get closer to a true single-writer, but
> > there
> > > > are
> > > > > > > >> > definitely
> > > > > > > >> > > > >> still
> > > > > > > >> > > > >> > failure modes that this does not address.
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > -Ewen
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > Thanks,
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > Jiangjie (Becket) Qin
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen
> > > > > Cheslack-Postava <
> > > > > > > >> > > > >> > e...@confluent.io
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > wrote:
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie
> Qin
> > > > > > > >> > > > >> > <j...@linkedin.com.invalid
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > wrote:
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > > Hi Jason,
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > Thanks for writing this up. It would be
> > useful
> > > to
> > > > > > > >> generalize
> > > > > > > >> > > the
> > > > > > > >> > > > >> > group
> > > > > > > >> > > > >> > > > > concept. I have a few questions below.
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > 1. In old consumer actually the partition
> > > > > assignment
> > > > > > > are
> > > > > > > >> > done
> > > > > > > >> > > by
> > > > > > > >> > > > >> > > > consumers
> > > > > > > >> > > > >> > > > > themselves. We used zookeeper to guarantee
> > > that a
> > > > > > > >> partition
> > > > > > > >> > > will
> > > > > > > >> > > > >> only
> > > > > > > >> > > > >> > > be
> > > > > > > >> > > > >> > > > > consumed by one consumer thread who
> > > successfully
> > > > > > > claimed
> > > > > > > >> its
> > > > > > > >> > > > >> > ownership.
> > > > > > > >> > > > >> > > > > Does the new protocol plan to provide the
> > same
> > > > > > > guarantee?
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > Once you have all the metadata from all the
> > > > > consumers,
> > > > > > > >> > > assignment
> > > > > > > >> > > > >> > should
> > > > > > > >> > > > >> > > > just be a simple function mapping that
> > > > > Map<ConsumerId,
> > > > > > > >> > Metadata>
> > > > > > > >> > > > to
> > > > > > > >> > > > >> > > > Map<ConsumerId, List<TopicPartition>>. If
> > > everyone
> > > > is
> > > > > > > >> > consistent
> > > > > > > >> > > > in
> > > > > > > >> > > > >> > > > computing that, you don't need ZK involved at
> > > all.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > In practice, this shouldn't be that hard to
> > > ensure
> > > > > for
> > > > > > > most
> > > > > > > >> > > > >> assignment
> > > > > > > >> > > > >> > > > strategies just by having decent unit testing
> > on
> > > > > them.
> > > > > > > You
> > > > > > > >> > just
> > > > > > > >> > > > >> have to
> > > > > > > >> > > > >> > > do
> > > > > > > >> > > > >> > > > things like ensure your assignment strategy
> > sorts
> > > > > lists
> > > > > > > >> into a
> > > > > > > >> > > > >> > consistent
> > > > > > > >> > > > >> > > > order.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > You do give up the ability to use some
> > techniques
> > > > > (e.g.
> > > > > > > any
> > > > > > > >> > > > >> randomized
> > > > > > > >> > > > >> > > > algorithm if you can't distribute the seed w/
> > the
> > > > > > > metadata)
> > > > > > > >> > and
> > > > > > > >> > > > it's
> > > > > > > >> > > > >> > true
> > > > > > > >> > > > >> > > > that nothing validates the assignment, but if
> > > that
> > > > > > > >> assignment
> > > > > > > >> > > > >> algorithm
> > > > > > > >> > > > >> > > > step is kept simple, small, and well tested,
> > the
> > > > risk
> > > > > > is
> > > > > > > >> very
> > > > > > > >> > > > >> minimal.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > 2. It looks that both JoinGroupRequest and
> > > > > > > >> JoinGroupResponse
> > > > > > > >> > > has
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > > > >
> ProtocolMetadata.AssignmentStrategyMetadata,
> > > what
> > > > > > would
> > > > > > > >> be
> > > > > > > >> > the
> > > > > > > >> > > > >> > metadata
> > > > > > > >> > > > >> > > > be
> > > > > > > >> > > > >> > > > > sent and returned by coordinator? How will
> > the
> > > > > > > >> coordinator
> > > > > > > >> > > > handle
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > > > > metadata?
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > The coordinator is basically just blindly
> > > > > broadcasting
> > > > > > > all
> > > > > > > >> of
> > > > > > > >> > it
> > > > > > > >> > > > to
> > > > > > > >> > > > >> > group
> > > > > > > >> > > > >> > > > members so they have a consistent view.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > So from the coordinators perspective, it sees
> > > > > something
> > > > > > > >> like:
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with
> > > GroupProtocols
> > > > =
> > > > > [
> > > > > > > >> > > "consumer"
> > > > > > > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > > > > > > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with
> > > GroupProtocols
> > > > =
> > > > > [
> > > > > > > >> > > "consumer"
> > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > Then, in the responses would look like:
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with
> > > GroupProtocol
> > > > =
> > > > > > > >> > "consumer"
> > > > > > > >> > > > and
> > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > > byte[]>,
> > > > > > > >> > Consumer
> > > > > > > >> > > 2
> > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with
> > > GroupProtocol
> > > > =
> > > > > > > >> > "consumer"
> > > > > > > >> > > > and
> > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > > byte[]>,
> > > > > > > >> > Consumer
> > > > > > > >> > > 2
> > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > So all the responses include all the metadata
> > for
> > > > > every
> > > > > > > >> member
> > > > > > > >> > > in
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > > > group, and everyone can use that to
> > consistently
> > > > > decide
> > > > > > > on
> > > > > > > >> > > > >> assignment.
> > > > > > > >> > > > >> > > The
> > > > > > > >> > > > >> > > > broker doesn't care and cannot even
> understand
> > > the
> > > > > > > metadata
> > > > > > > >> > > since
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > > data
> > > > > > > >> > > > >> > > > format for it is dependent on the assignment
> > > > strategy
> > > > > > > being
> > > > > > > >> > > used.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > As another example that is *not* a consumer,
> > > let's
> > > > > say
> > > > > > > you
> > > > > > > >> > just
> > > > > > > >> > > > >> want to
> > > > > > > >> > > > >> > > > have a single writer in the group which
> > everyone
> > > > will
> > > > > > > >> forward
> > > > > > > >> > > > >> requests
> > > > > > > >> > > > >> > > to.
> > > > > > > >> > > > >> > > > To accomplish this, you could use a very dumb
> > > > > > assignment
> > > > > > > >> > > strategy:
> > > > > > > >> > > > >> > there
> > > > > > > >> > > > >> > > is
> > > > > > > >> > > > >> > > > no metadata (empty byte[]) and all we care
> > about
> > > is
> > > > > who
> > > > > > > is
> > > > > > > >> the
> > > > > > > >> > > > first
> > > > > > > >> > > > >> > > member
> > > > > > > >> > > > >> > > > in the group (e.g. when IDs are sorted
> > > > > > > lexicographically).
> > > > > > > >> > That
> > > > > > > >> > > > >> member
> > > > > > > >> > > > >> > is
> > > > > > > >> > > > >> > > > selected as the writer. In that case, we
> > actually
> > > > > just
> > > > > > > care
> > > > > > > >> > > about
> > > > > > > >> > > > >> the
> > > > > > > >> > > > >> > > > membership list, there's no additional info
> > about
> > > > > each
> > > > > > > >> member
> > > > > > > >> > > that
> > > > > > > >> > > > >> is
> > > > > > > >> > > > >> > > > required to determine who is the writer.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > > 3. Do you mean that the number of
> partitions
> > in
> > > > > > > >> > > > JoinGroupResponse
> > > > > > > >> > > > >> > will
> > > > > > > >> > > > >> > > be
> > > > > > > >> > > > >> > > > > the max partition number of a topic among
> all
> > > the
> > > > > > > >> reported
> > > > > > > >> > > > >> partition
> > > > > > > >> > > > >> > > > number
> > > > > > > >> > > > >> > > > > by consumers? Is there any reason not just
> > let
> > > > > > > >> Coordinator
> > > > > > > >> > to
> > > > > > > >> > > > >> return
> > > > > > > >> > > > >> > > the
> > > > > > > >> > > > >> > > > > number of partitions of a topic in its
> > metadata
> > > > > > cache?
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > Nothing from the embedded protocol is touched
> > by
> > > > the
> > > > > > > >> broker.
> > > > > > > >> > The
> > > > > > > >> > > > >> broker
> > > > > > > >> > > > >> > > > just collects opaque bytes of metadata, does
> > the
> > > > > > > selection
> > > > > > > >> of
> > > > > > > >> > > the
> > > > > > > >> > > > >> > > strategy
> > > > > > > >> > > > >> > > > if multiple are supported by some consumers,
> > and
> > > > then
> > > > > > > >> returns
> > > > > > > >> > > that
> > > > > > > >> > > > >> > opaque
> > > > > > > >> > > > >> > > > metadata for all the members back to every
> > > member.
> > > > In
> > > > > > > that
> > > > > > > >> way
> > > > > > > >> > > > they
> > > > > > > >> > > > >> all
> > > > > > > >> > > > >> > > > have a consistent view of the group. For
> > regular
> > > > > > > consumers,
> > > > > > > >> > that
> > > > > > > >> > > > >> view
> > > > > > > >> > > > >> > of
> > > > > > > >> > > > >> > > > the group includes information about how many
> > > > > > partitions
> > > > > > > >> each
> > > > > > > >> > > > >> consumer
> > > > > > > >> > > > >> > > > currently thinks the topics it is subscribed
> to
> > > > has.
> > > > > > > These
> > > > > > > >> > could
> > > > > > > >> > > > be
> > > > > > > >> > > > >> > > > inconsistent due to out of date metadata and
> it
> > > > would
> > > > > > be
> > > > > > > >> up to
> > > > > > > >> > > the
> > > > > > > >> > > > >> > > > assignment strategy on the *client* to
> resolve
> > > > that.
> > > > > As
> > > > > > > you
> > > > > > > >> > > point
> > > > > > > >> > > > >> out,
> > > > > > > >> > > > >> > in
> > > > > > > >> > > > >> > > > that case they could just take the max value
> > that
> > > > any
> > > > > > > >> consumer
> > > > > > > >> > > > >> reported
> > > > > > > >> > > > >> > > > seeing and use that. The consumers that
> notice
> > > that
> > > > > > their
> > > > > > > >> > > metadata
> > > > > > > >> > > > >> had
> > > > > > > >> > > > >> > a
> > > > > > > >> > > > >> > > > smaller # of partitions should also trigger a
> > > > > metadata
> > > > > > > >> update
> > > > > > > >> > > when
> > > > > > > >> > > > >> they
> > > > > > > >> > > > >> > > see
> > > > > > > >> > > > >> > > > someone else observing a larger # of
> > partitions.
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > Thanks,
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason
> > > Gustafson
> > > > <
> > > > > > > >> > > > >> ja...@confluent.io
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> > > > > wrote:
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > > > > Hi Kafka Devs,
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > One of the nagging issues in the current
> > > design
> > > > > of
> > > > > > > the
> > > > > > > >> new
> > > > > > > >> > > > >> consumer
> > > > > > > >> > > > >> > > has
> > > > > > > >> > > > >> > > > > > been the need to support a variety of
> > > > assignment
> > > > > > > >> > strategies.
> > > > > > > >> > > > >> We've
> > > > > > > >> > > > >> > > > > > encountered this in particular in the
> > design
> > > of
> > > > > > > copycat
> > > > > > > >> > and
> > > > > > > >> > > > the
> > > > > > > >> > > > >> > > > > processing
> > > > > > > >> > > > >> > > > > > framework (KIP-28). From what I
> understand,
> > > > Samza
> > > > > > > also
> > > > > > > >> > has a
> > > > > > > >> > > > >> number
> > > > > > > >> > > > >> > > of
> > > > > > > >> > > > >> > > > > use
> > > > > > > >> > > > >> > > > > > cases with custom assignment needs. The
> new
> > > > > > consumer
> > > > > > > >> > > protocol
> > > > > > > >> > > > >> > > supports
> > > > > > > >> > > > >> > > > > new
> > > > > > > >> > > > >> > > > > > assignment strategies by hooking them
> into
> > > the
> > > > > > > broker.
> > > > > > > >> For
> > > > > > > >> > > > many
> > > > > > > >> > > > >> > > > > > environments, this is a major pain and in
> > > some
> > > > > > > cases, a
> > > > > > > >> > > > >> > non-starter.
> > > > > > > >> > > > >> > > It
> > > > > > > >> > > > >> > > > > > also challenges the validation that the
> > > > > coordinator
> > > > > > > can
> > > > > > > >> > > > provide.
> > > > > > > >> > > > >> > For
> > > > > > > >> > > > >> > > > > > example, some assignment strategies call
> > for
> > > > > > > >> partitions to
> > > > > > > >> > > be
> > > > > > > >> > > > >> > > assigned
> > > > > > > >> > > > >> > > > > > multiple times, which means that the
> > > > coordinator
> > > > > > can
> > > > > > > >> only
> > > > > > > >> > > > check
> > > > > > > >> > > > >> > that
> > > > > > > >> > > > >> > > > > > partitions have been assigned at least
> > once.
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > To solve these issues, we'd like to
> propose
> > > > > moving
> > > > > > > >> > > assignment
> > > > > > > >> > > > to
> > > > > > > >> > > > >> > the
> > > > > > > >> > > > >> > > > > > client. I've written a wiki which
> outlines
> > > some
> > > > > > > >> protocol
> > > > > > > >> > > > >> changes to
> > > > > > > >> > > > >> > > > > achieve
> > > > > > > >> > > > >> > > > > > this:
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >>
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > > > >> > > > >> > > > > > .
> > > > > > > >> > > > >> > > > > > To summarize briefly, instead of the
> > > > coordinator
> > > > > > > >> assigning
> > > > > > > >> > > the
> > > > > > > >> > > > >> > > > partitions
> > > > > > > >> > > > >> > > > > > itself, all subscriptions are forwarded
> to
> > > each
> > > > > > > member
> > > > > > > >> of
> > > > > > > >> > > the
> > > > > > > >> > > > >> group
> > > > > > > >> > > > >> > > > which
> > > > > > > >> > > > >> > > > > > then decides independently which
> partitions
> > > it
> > > > > > should
> > > > > > > >> > > consume.
> > > > > > > >> > > > >> The
> > > > > > > >> > > > >> > > > > protocol
> > > > > > > >> > > > >> > > > > > provides a mechanism for the coordinator
> to
> > > > > > validate
> > > > > > > >> that
> > > > > > > >> > > all
> > > > > > > >> > > > >> > > consumers
> > > > > > > >> > > > >> > > > > use
> > > > > > > >> > > > >> > > > > > the same assignment strategy, but it does
> > not
> > > > > > ensure
> > > > > > > >> that
> > > > > > > >> > > the
> > > > > > > >> > > > >> > > resulting
> > > > > > > >> > > > >> > > > > > assignment is "correct." This provides a
> > > > powerful
> > > > > > > >> > capability
> > > > > > > >> > > > for
> > > > > > > >> > > > >> > > users
> > > > > > > >> > > > >> > > > to
> > > > > > > >> > > > >> > > > > > control the full data flow on the client
> > > side.
> > > > > They
> > > > > > > >> > control
> > > > > > > >> > > > how
> > > > > > > >> > > > >> > data
> > > > > > > >> > > > >> > > is
> > > > > > > >> > > > >> > > > > > written to partitions through the
> > Partitioner
> > > > > > > interface
> > > > > > > >> > and
> > > > > > > >> > > > they
> > > > > > > >> > > > >> > > > control
> > > > > > > >> > > > >> > > > > > how data is consumed through the
> assignment
> > > > > > strategy,
> > > > > > > >> all
> > > > > > > >> > > > >> without
> > > > > > > >> > > > >> > > > > touching
> > > > > > > >> > > > >> > > > > > the server.
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > Of course nothing comes for free. In
> > > > particular,
> > > > > > this
> > > > > > > >> > change
> > > > > > > >> > > > >> > removes
> > > > > > > >> > > > >> > > > the
> > > > > > > >> > > > >> > > > > > ability of the coordinator to validate
> that
> > > > > commits
> > > > > > > are
> > > > > > > >> > made
> > > > > > > >> > > > by
> > > > > > > >> > > > >> > > > consumers
> > > > > > > >> > > > >> > > > > > who were assigned the respective
> partition.
> > > > This
> > > > > > > might
> > > > > > > >> not
> > > > > > > >> > > be
> > > > > > > >> > > > >> too
> > > > > > > >> > > > >> > bad
> > > > > > > >> > > > >> > > > > since
> > > > > > > >> > > > >> > > > > > we retain the ability to validate the
> > > > generation
> > > > > > id,
> > > > > > > >> but
> > > > > > > >> > it
> > > > > > > >> > > > is a
> > > > > > > >> > > > >> > > > > potential
> > > > > > > >> > > > >> > > > > > concern. We have considered alternative
> > > > protocols
> > > > > > > which
> > > > > > > >> > add
> > > > > > > >> > > a
> > > > > > > >> > > > >> > second
> > > > > > > >> > > > >> > > > > > round-trip to the protocol in order to
> give
> > > the
> > > > > > > >> > coordinator
> > > > > > > >> > > > the
> > > > > > > >> > > > >> > > ability
> > > > > > > >> > > > >> > > > > to
> > > > > > > >> > > > >> > > > > > confirm the assignment. As mentioned
> above,
> > > the
> > > > > > > >> > coordinator
> > > > > > > >> > > is
> > > > > > > >> > > > >> > > somewhat
> > > > > > > >> > > > >> > > > > > limited in what it can actually validate,
> > but
> > > > > this
> > > > > > > >> would
> > > > > > > >> > > > return
> > > > > > > >> > > > >> its
> > > > > > > >> > > > >> > > > > ability
> > > > > > > >> > > > >> > > > > > to validate commits. The tradeoff is that
> > it
> > > > > > > increases
> > > > > > > >> the
> > > > > > > >> > > > >> > protocol's
> > > > > > > >> > > > >> > > > > > complexity which means more ways for the
> > > > protocol
> > > > > > to
> > > > > > > >> fail
> > > > > > > >> > > and
> > > > > > > >> > > > >> > > > > consequently
> > > > > > > >> > > > >> > > > > > more edge cases in the code.
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > It also misses an opportunity to
> generalize
> > > the
> > > > > > group
> > > > > > > >> > > > membership
> > > > > > > >> > > > >> > > > protocol
> > > > > > > >> > > > >> > > > > > for additional use cases. In fact, after
> > > you've
> > > > > > gone
> > > > > > > to
> > > > > > > >> > the
> > > > > > > >> > > > >> trouble
> > > > > > > >> > > > >> > > of
> > > > > > > >> > > > >> > > > > > moving assignment to the client, the main
> > > thing
> > > > > > that
> > > > > > > is
> > > > > > > >> > left
> > > > > > > >> > > > in
> > > > > > > >> > > > >> > this
> > > > > > > >> > > > >> > > > > > protocol is basically a general group
> > > > management
> > > > > > > >> > capability.
> > > > > > > >> > > > >> This
> > > > > > > >> > > > >> > is
> > > > > > > >> > > > >> > > > > > exactly what is needed for a few cases
> that
> > > are
> > > > > > > >> currently
> > > > > > > >> > > > under
> > > > > > > >> > > > >> > > > > discussion
> > > > > > > >> > > > >> > > > > > (e.g. copycat or single-writer producer).
> > > We've
> > > > > > taken
> > > > > > > >> this
> > > > > > > >> > > > >> further
> > > > > > > >> > > > >> > > step
> > > > > > > >> > > > >> > > > > in
> > > > > > > >> > > > >> > > > > > the proposal and attempted to envision
> what
> > > > that
> > > > > > > >> general
> > > > > > > >> > > > >> protocol
> > > > > > > >> > > > >> > > might
> > > > > > > >> > > > >> > > > > > look like and how it could be used both
> by
> > > the
> > > > > > > consumer
> > > > > > > >> > and
> > > > > > > >> > > > for
> > > > > > > >> > > > >> > some
> > > > > > > >> > > > >> > > of
> > > > > > > >> > > > >> > > > > > these other cases.
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > Anyway, since time is running out on the
> > new
> > > > > > > consumer,
> > > > > > > >> we
> > > > > > > >> > > have
> > > > > > > >> > > > >> > > perhaps
> > > > > > > >> > > > >> > > > > one
> > > > > > > >> > > > >> > > > > > last chance to consider a significant
> > change
> > > in
> > > > > the
> > > > > > > >> > protocol
> > > > > > > >> > > > >> like
> > > > > > > >> > > > >> > > this,
> > > > > > > >> > > > >> > > > > so
> > > > > > > >> > > > >> > > > > > have a look at the wiki and share your
> > > > thoughts.
> > > > > > I've
> > > > > > > >> no
> > > > > > > >> > > doubt
> > > > > > > >> > > > >> that
> > > > > > > >> > > > >> > > > some
> > > > > > > >> > > > >> > > > > > ideas seem clearer in my mind than they
> do
> > on
> > > > > > paper,
> > > > > > > so
> > > > > > > >> > ask
> > > > > > > >> > > > >> > questions
> > > > > > > >> > > > >> > > > if
> > > > > > > >> > > > >> > > > > > there is any confusion.
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > > > Thanks!
> > > > > > > >> > > > >> > > > > > Jason
> > > > > > > >> > > > >> > > > > >
> > > > > > > >> > > > >> > > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > > > --
> > > > > > > >> > > > >> > > > Thanks,
> > > > > > > >> > > > >> > > > Ewen
> > > > > > > >> > > > >> > > >
> > > > > > > >> > > > >> > >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > --
> > > > > > > >> > > > >> > Thanks,
> > > > > > > >> > > > >> > Ewen
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Thanks,
> > > > > > > > Neha
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Ewen
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> >
>
>
>
> --
> Thanks,
> Neha
>

Reply via email to