@Boyang,

* Yes registry id should be generated via a config, similar to what you
proposed in the KIP currently for member id. And "even if multiple
consumers are using the same member id" (I think you meant registry id
here?), we can fence against them.

BTW I'm not married (but cannot speak for Jason :P ) to "registry id", so
any other naming suggestions are welcome. John's suggestion of `member
name` sounds okay to me. Another wild idea I had is just name it "consumer
id".

* About the metrics: note that during a consumer bouncing case, it will
resume and come back with an empty member id and a pre-existed registry id.
On the coordinator case, it is considered a "fenced" event, but it is
really not an event that should be alerted. So I think we should not simply
record a metric whenever the coordinator updates the registryId -> memberId
map.

We could, however, consider adding a metric for the other event: whenever a
coordinator is rejecting a request because of incorrect member id. Note
this event covers both a static membership related fencing event, plus
today's normal cases where members are kicked out of the group. During a
rebalance, we may see some spikes of this metrics but then during normal
operations, this metric should stay as 0; but when we see this metric stays
as non-zero consistently, it means there are some zombies lurking around.

* About the code base: GroupCoordinator represents the broker-side
coordinator logic, and ConsumerCoordinator is on the client side,
representing the related logic for consumer to communicate with the
broker-side coordinator. When we talk about the "coordinator" in this
discussion, unless clearly specified, it should always be to the
broker-side logic, i.e. GroupCoordinator.


@John

Note that a consumer can only know that it is already fenced when talking
to the coordinator (again). More specifically, once a consumer gets
assignment, it will keep fetching from the brokers without talking to
coordinator until the next heartbeat request. Before that time, it is
un-avoidable that two consumers may be fetching from the same partitions
now, I used commit request just as an example of any requests sent to the
coordinator, not saying that they will only be aware until then. In fact,
when they sent Heartbeat request to the coordinator, they could also be
notified that "you are fenced".


Guozhang


On Wed, Aug 1, 2018 at 10:41 PM, Boyang Chen <bche...@outlook.com> wrote:

> Hey Guozhang,
>
>
> thanks for this detailed proposal! Quickly I want to clarify, where this
> registry id is generated? My understanding is that the registry id is a
> unique id provided by user correct? This way even if multiple consumers are
> using the same member id, we are fencing against polluting the commit
> history. Very thoughtful approach! Furthermore, I feel we should export
> alerting metrics on catching duplicate consumer instances since multiple of
> them could still be reading the same topic partition and executing
> duplicate business logic.
>
>
> Also a minor comment is that our current codebase has two different
> classes: the GroupCoordinator and ConsumerCoordinator. When talking about
> term "coordinator", it would be great to distinguish between the two, or
> explicitly nominate broker coordinator as default "coordinator".
>
>
> Best,
>
> Boyang
>
> ________________________________
> From: Guozhang Wang <wangg...@gmail.com>
> Sent: Thursday, August 2, 2018 1:59 AM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hello Boyang,
>
> Thanks for the great summary.
>
> Personally I think 1) is still doable as we are not necessarily have to
> rely on source of truth to always fence "the right zombie"; in fact, my
> point is that as long as we can fence "someone" when there are zombies
> sharing the same member id so that users are notified of such issues and
> then react to it, it is good enough. Letting first or last comer wins falls
> in that consideration.
>
> I've thought about Jason's "registry id" approach, and I think it may
> actually be suitable for providing fencing while approach the goal of avoid
> unnecessary rebalances. Here are some more concrete details (@Jason, if it
> is not the same as what you had originally in mind, let me know):
>
>
> a. In JoinGroupRequest, include a "registry id" in addition to the "member
> id" to the coordinator. In all other requests like CommitOffset, etc, still
> only include the "member id".
>
> b. On the coordinator side, when receiving a JoinGroupRequest with a
> registry id with empty member id, and this registry id is indeed in the
> cache, it still generates a new member id and return it in the response;
> additionally, and update the [registry-id, member-id] mapping in its member
> metadata cache (i.e. blind-write to the cache regardless whether it already
> contains this registry id); if both registry-id and member-id are
> specified, check if it matches in the cache, if not or if the cache does
> not include this mapping at all, return an ILLEGAL_MEMBER_ID error to let
> the client reset its member id and reset again.
>
> ** NOTE ** this logic is actually "letting the last comer win", i.e.
> whenever a new client comes with a registry id and empty member id, always
> treat it as a valid consumer and replace with whatever existed with this
> registry id, and the previous member id will be effectively removed from
> the group.
>
> c. On the coordinator side, when receiving any other group requests like
> CommitOffset, make sure the member id is indeed in this cache (i.e. it is
> contained in the [registry-id. member-id] mapping), otherwise reject it as
> we do today.
>
>
> ------------------
>
> So to give an example, if clientA joins group with registry id 1, it will
> be assigned a member id X and coordinator remembers current mapping [1, X]
> and "X" is the valid member belonging to this group; if another clientB
> joins group but with the same registry id 1 with empty member id,
> coordinator assigns it with a new member id Y and update the mapping [1,
> Y]. Hence the coordinator always blindly trust the new comer as the valid
> member and kicks out the previous one. At this time both clientA and
> clientB may be fetching from the same assigned partitions, BUT when clientA
> tries to commit the offset, by sending with member id X, it will be
> rejected.
>
> ------------------
>
> And then back to the original goal of 5/6, when a consumer shuts down and
> restarts, it will send a JoinGroupRequest with the registry id with empty
> member id, as long as coordinator still remembers this registry id, it will
> sends back the original assigned partitions to the consumer, while updating
> its cache with the newly generated member id. No rebalance will be
> triggered, and if there are other zombies with the same registry ids and
> already get the assignment before, although they can still fetch data, they
> will not be able to commit.
>
> ------------------
>
>
>
> Guozhang
>
>
>
> On Wed, Aug 1, 2018 at 8:07 AM, Boyang Chen <bche...@outlook.com> wrote:
>
> > Thanks for everyone's (Guozhang, Jason, Mike, John and someone in case
> I'm
> > missing) input here! Looks like we have got more interesting thoughts on
> > this KIP.
> >
> >
> > When I draft KIP-345, the original proposal is targeted at solving
> "static
> > membership", which is trying to address 1/2/5/6  proposed by Guozhang. In
> > my mindset, the most important detail I'm trying to follow is the owner
> of
> > member id generation or the membership metadata (proposed by John).
> >
> >
> > Guozhang's static membership approach overall LGTM. It would make the
> > whole protocol much clear with group member come and go explicitly in a
> > static setting. The static approach appoints Consumer/Stream app as the
> > owner of member id generation, which brought up multiple validation
> > concerns. As the host:port approach is vetoed, and I have thought for a
> > while for other validation strategies but failed, I think it's time to
> > decide whether we want to focus our next step discussion on
> >
> >
> >   1.  how to let server validate id uniqueness generated by client, or
> >   2.  how to let client materialize the id provided by the server
> >
> >
> > Let me know your favor of the two topics here. Personally I'm inclined to
> > 2 as 1 is intrinsically complex (no source of truth model). Hope we are
> on
> > the same page now.
> >
> >
> > Best,
> >
> > Boyang
> >
> > ________________________________
> > From: John Roesler <j...@confluent.io>
> > Sent: Wednesday, August 1, 2018 5:28 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hi Boyang,
> >
> > Overall, this seems like a good addition to the consumer.
> >
> > I agree with the others that we should attempt to validate the uniqueness
> > of member.id usage. FWIW, Jason's idea of using a configured logical id
> +
> > assigned unique id seems to be suitably flexible and free of assumptions,
> > as opposed to picking machine characteristics like hostname.
> >
> > It seems like a survey of solutions to the same problem in other software
> > might help to guide this discussion. Kafka Consumer isn't the first
> > stateful distributed system to encounter the need for stable shard
> > assignment. I would think that any mature distributed database and any
> > stateful stream processing framework would have some algorithm to solve
> > this problem, some of which might offer unique advantages over what we've
> > discussed so far.
> >
> > Hopefully it wouldn't pollute the discussion if I threw out an
> alternative
> > idea: It seems like the purpose behind this proposal is to allow stateful
> > consumers to re-acquire their previous partitions when they return to the
> > group. In this scenario, it seems reliable that the consumers would know
> > what partitions they previously had assigned (since they are stateful).
> > Instead of reporting a member.id, they could just include their prior
> list
> > of partitions in the join-group request. This would solve the problem
> just
> > as well as member.id, while allowing more advanced techniques like
> > splitting state between two instances. For example, an instance I1 with
> TP1
> > and TP2 state shuts down, and we copy the state for TP2 to I2 (and delete
> > it from I1); then we start I1 and I2 up, and I1 only reports that it
> wants
> > TP1 while I2 only reports that it wants TP2. Thus, we can scale out or in
> > without incurring costly state reconstruction.
> >
> >
> > On a separate note, what is the impact of changing the default for
> > LEAVE_GROUP_ON_CLOSE_CONFIG? It seems like could potentially harm
> > applications not using member.id. Can we circumvent the issue without
> > changing that default globally? Such as ignoring that config when
> > member.id
> > is set?
> >
> > Thanks for the proposal!
> > -John
> >
> > On Mon, Jul 30, 2018 at 9:57 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > @Jason,
> > >
> > > Good point about disconnects. And with that I think I agree that a
> > registry
> > > id maybe a better idea to enable fencing than validating on host /
> port.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Jul 30, 2018 at 5:40 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Thanks for the detailed response. Really quick about the fencing
> > issue, I
> > > > think host/port will not be sufficient because it cannot handle
> > > > disconnects. For example, if the coordinator moves to another broker,
> > > then
> > > > there is no way we'd be able to guarantee the same host/port.
> Currently
> > > we
> > > > try to avoid rebalancing when the coordinator moves. That said, I
> agree
> > > in
> > > > principle with the "first comer wins" approach you've suggested.
> > > Basically
> > > > a member is only removed from the group if its session expires or it
> > > leaves
> > > > the group explicitly.
> > > >
> > > > -Jason
> > > >
> > > > On Mon, Jul 30, 2018 at 4:24 PM, Mike Freyberger <
> > > mfreyber...@appnexus.com
> > > > >
> > > > wrote:
> > > >
> > > > > Guozhang,
> > > > >
> > > > > Thanks for giving us a great starting point.
> > > > >
> > > > > A few questions that come to mind right away:
> > > > >
> > > > > 1) What do you think a reasonable group-change-timeout would be? I
> am
> > > > > thinking on the order of minutes (5 minutes?)
> > > > >
> > > > > 2) Will the nodes that are still alive continue to make progress
> > > during a
> > > > > static membership rebalance? I believe during a rebalance today all
> > > > > consumers wait for the SyncGroupResponse before continuing to read
> > data
> > > > > from the brokers. If that is the case, I think it'd be ideal all
> > nodes
> > > > that
> > > > > are still alive during a static group membership change to continue
> > to
> > > > make
> > > > > progress as if nothing happened such that are there is no impact to
> > the
> > > > > majority of the group when one node is bounced (quick restart).
> > > > >
> > > > > 3) Do you think an explicit method for forcing a rebalance would be
> > > > > needed? I am thinking of a scenario such as a disk failure on a
> node,
> > > and
> > > > > that node will definitely not come back. Rather than waiting up to
> > the
> > > > > group-change-timeout, I think it'd be good an admin to force a
> > > rebalance
> > > > > rather than wait the full group-change-timeout. Maybe this is an
> over
> > > > > optimization, but I think certain use cases would benefit from
> static
> > > > group
> > > > > membership with the ability to force a rebalance at any time.
> > > > >
> > > > > Best,
> > > > >
> > > > > Mike
> > > > >
> > > > > On 7/30/18, 6:57 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> > > > >
> > > > >     Hello Boyang / Jason / Mike,
> > > > >
> > > > >     Thanks for your thoughtful inputs! Regarding the fencing issue,
> > > I've
> > > > >     thought about leveraging the epoch notion from PID of
> > transactional
> > > > >     messaging before, but since in this proposal we do not always
> > > require
> > > > >     member ids from clients, and hence could have a mixed of
> > > > user-specified
> > > > >     member ids with coordinator-generated member ids, the epoch
> idea
> > > may
> > > > > not be
> > > > >     very well suited for this scenario. Of course, we can tighten
> the
> > > > > screws a
> > > > >     bit by requiring that for a given consumer group, all consumers
> > > must
> > > > > either
> > > > >     be giving their member ids or leveraging on consumer
> coordinator
> > to
> > > > > give
> > > > >     member ids, which does not sound a very strict requirement in
> > > > > practice, and
> > > > >     all we need to do is the add a new field in the join group
> > request
> > > > (we
> > > > > are
> > > > >     proposing to bump up its version anyways). And hence I've also
> > > > thought
> > > > >     about another simple fencing approach, aka "first comer wins",
> > that
> > > > is
> > > > > to
> > > > >     pass in the host / port information from KafkaApis to
> > > > GroupCoordinator
> > > > > to
> > > > >     validate if it matches the existing member id's cached host /
> > post.
> > > > It
> > > > > does
> > > > >     not always guarantee that we fence the right zombies because of
> > > > "first
> > > > >     comer wins" (think of a scenario where the right client gets
> > kicked
> > > > > out,
> > > > >     and then before it re-joins the actual zombie with the same
> > member
> > > id
> > > > > gets
> > > > >     joined), but as I mentioned previously it will poke some leaks
> > into
> > > > the
> > > > >     code hierarchy a bit so I'm also hesitant to do it. If people
> > think
> > > > it
> > > > > is
> > > > >     indeed a must-have than good-to-have, I'd suggest we leverage
> on
> > > > > host-port
> > > > >     than using the epoch mechanism then.
> > > > >
> > > > >     ------------------------------------------
> > > > >
> > > > >     As for the more general idea of having a static membership
> > protocol
> > > > to
> > > > >     better integrated with Cloud environment like k8s, I think the
> > > first
> > > > > idea
> > > > >     may actually be better fit with it.
> > > > >
> > > > >     Just a quick summary of what rebalance issues we face today:
> > > > >
> > > > >     1. Application start: when multi-instance application is
> started,
> > > > > multiple
> > > > >     rebalances are triggered to migrate states to newly started
> > > instances
> > > > > since
> > > > >     not all instances are joining at the same time. NOTE that
> KIP-134
> > > > >     <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 134%3A+Delay+initial+consumer+group+rebalance>
> > > > >     is
> > > > >     targeted for this issue, but as an after-thought it may not be
> > the
> > > > > optimal
> > > > >     solution.
> > > > >     2. Application shutdown: similarly to 1), when multi-instance
> > > > > application
> > > > >     is shutting down, multiple rebalances are triggered.
> > > > >     3. Application scale out: when a new instance is started, one
> > > > > rebalance is
> > > > >     executed to shuffle all assignment, rather than just a
> "partial"
> > > > > shuffling
> > > > >     of some of the members.
> > > > >     4. Application scale in: similarly to 3), when an existing
> > instance
> > > > >     gracefully shutdown, once rebalance is executed to shuffle all
> > > > > assignment.
> > > > >     5. Application instance bounce (upgrade, config change etc):
> one
> > > > > instance
> > > > >     shut down and then restart, it will trigger two rebalances.
> NOTE
> > > that
> > > > >     disabling leave-group is targeted for this issue.
> > > > >     6. Application instance failure: one instance failed, and
> > probably
> > > a
> > > > > new
> > > > >     instance start to take its assignment (e.g. k8s), it will
> trigger
> > > two
> > > > >     rebalances. The different with 3) above is that new instance
> > would
> > > > not
> > > > > have
> > > > >     local cached tasks.
> > > > >
> > > > >
> > > > >     Among them, I think 1/2/5/6 could potentially be grouped
> together
> > > as
> > > > >     "static membership"; 4/5 could be grouped as another category,
> of
> > > > > allowing
> > > > >     "incremental rebalance" or "partial rebalance" than
> > full-rebalance.
> > > > Of
> > > > >     course, having incremental rebalances can help on 1/2/5/6 as
> well
> > > to
> > > > > reduce
> > > > >     the cost of each unnecessary rebalance, but ideally we want NO
> > > > > rebalances
> > > > >     at all for these cases, which will be more true with k8s / etc
> > > > > integrations
> > > > >     or static memberships.
> > > > >
> > > > >     ------------------------------------------
> > > > >
> > > > >     So just to throw in a sketchy idea following this route for
> > 1/2/5/6
> > > > for
> > > > >     brainstorming kick-off:
> > > > >
> > > > >
> > > > >     1. We bump up the JoinGroupRequest with additional fields:
> > > > >
> > > > >       1.a) a flag indicating "static" or "dynamic" membership
> > > protocols.
> > > > >       1.b) with "static" membership, we also add the pre-defined
> > member
> > > > id.
> > > > >       1.c) with "static" membership, we also add an optional
> > > > >     "group-change-timeout" value.
> > > > >
> > > > >     2. On the broker side, we enforce only one of the two protocols
> > for
> > > > all
> > > > >     group members: we accept the protocol on the first joined
> member
> > of
> > > > the
> > > > >     group, and if later joining members indicate a different
> > membership
> > > > >     protocol, we reject it. If the group-change-timeout value was
> > > > > different to
> > > > >     the first joined member, we reject it as well.
> > > > >
> > > > >     3. With dynamic membership, nothing is changed; with static
> > > > > membership, we
> > > > >     do the following:
> > > > >
> > > > >       3.a) never assign member ids, instead always expect the
> joining
> > > > > members
> > > > >     to come with their own member id; we could do the fencing based
> > on
> > > > > host /
> > > > >     port here.
> > > > >       3.b) upon receiving the first join group request, use the
> > > > >     "group-change-timeout" instead of the session-timeout as
> > rebalance
> > > > > timeout
> > > > >     to wait for other members to join. This is for 1) above.
> > > > >       3.c) upon receiving a leave-group request, use the
> > > > > "group-change-timeout"
> > > > >     to wait for more members to leave group as well, or for the
> left
> > > > > members to
> > > > >     re-join. After the timeout we trigger a rebalance with whatever
> > > have
> > > > > left
> > > > >     in the members list. This is for all 2 (expecting other members
> > to
> > > > send
> > > > >     leave-group) and 5/6 (expecting the left member to re-join).
> > > > >
> > > > >     4. As a result, we will deprecate KIP-134 and
> > > disable-on-leave-group
> > > > as
> > > > >     well.
> > > > >
> > > > >
> > > > >     The key idea is that, with "static" membership, groups should
> be
> > > > > created or
> > > > >     terminated as a whole, and dynamic member changes are not
> > expected
> > > > > often.
> > > > >     Hence we would not react to those membership-changing events
> > > > > immediately
> > > > >     but wait for longer specified time expecting some other systems
> > > like
> > > > > k8s
> > > > >     will resume the group members. WDYT?
> > > > >
> > > > >
> > > > >     Guozhang
> > > > >
> > > > >
> > > > >     On Mon, Jul 30, 2018 at 3:05 PM, Mike Freyberger <
> > > > > mfreyber...@appnexus.com>
> > > > >     wrote:
> > > > >
> > > > >     > Jason,
> > > > >     >
> > > > >     > I really appreciate the broader conversation that you are
> > > bringing
> > > > > up here.
> > > > >     >
> > > > >     > I've been working on an application that does streaming joins
> > > for a
> > > > > while
> > > > >     > now, and we face a similar issue with group membership being
> > > > > dynamic. We
> > > > >     > are currently using our own StickyAssignor and take special
> > care
> > > > > during
> > > > >     > rolling restarts to make sure consumer assignments do not
> > change.
> > > > >     >
> > > > >     > I think a feature that allows for group membership to be
> fixed,
> > > > > along with
> > > > >     > a CLI for adding or removing a node from the group be ideal.
> > This
> > > > > reminds
> > > > >     > me of some of the work by the DynamoDB team about 10 years
> back
> > > > when
> > > > > they
> > > > >     > differentiated transient failures from permanent failures to
> > deal
> > > > > with this
> > > > >     > problems like this.
> > > > >     >
> > > > >     > Best,
> > > > >     >
> > > > >     > Mike
> > > > >     >
> > > > >     > On 7/30/18, 5:36 PM, "Jason Gustafson" <ja...@confluent.io>
> > > wrote:
> > > > >     >
> > > > >     >     Hi Boyang,
> > > > >     >
> > > > >     >     Thanks for the response. I think the main point I was
> > trying
> > > to
> > > > > make
> > > > >     > is the
> > > > >     >     need for fencing. I am not too concerned about how to
> > > generate
> > > > a
> > > > >     > unique id
> > > > >     >     on the client side. The approach you suggested for
> streams
> > > > seems
> > > > >     >     reasonable. However, any time you reuse an id, you need
> to
> > be
> > > > > careful
> > > > >     > that
> > > > >     >     there is only one instance that can use it at any time.
> We
> > > are
> > > > > always
> > > > >     >     running into problems where a previous instance of an
> > > > > application comes
> > > > >     >     back to life unexpectedly after we had already presumed
> it
> > > was
> > > > > dead.
> > > > >     >     Fencing ensures that even if this happens, it cannot do
> any
> > > > > damage. I
> > > > >     > would
> > > > >     >     say that some protection from zombies is a requirement
> > here.
> > > > >     >
> > > > >     >     The second point was more abstract and mainly meant to
> > > initiate
> > > > > some
> > > > >     >     discussion. We have gone through several iterations of
> > > > > improvements to
> > > > >     > try
> > > > >     >     and reduce the rebalancing in consumer applications. We
> > > started
> > > > > out
> > > > >     > trying
> > > > >     >     to tune the session timeout. We have added an internal
> > config
> > > > to
> > > > > skip
> > > > >     >     leaving the group when streams shuts down. The broker now
> > > has a
> > > > > config
> > > > >     > to
> > > > >     >     delay rebalances in case all consumers join at about the
> > same
> > > > > time. The
> > > > >     >     approach in this KIP is a step in a more principled
> > > direction,
> > > > > but it
> > > > >     > still
> > > > >     >     feels like we are making this unnecessarily hard on
> > ourselves
> > > > by
> > > > >     > insisting
> > > > >     >     that group membership is a dynamic concept. In practice,
> > the
> > > > > number of
> > > > >     >     nodes dedicated to an application tends to remain fixed
> for
> > > > long
> > > > >     > periods of
> > > > >     >     time and only scales up or down when needed. And these
> days
> > > > > you've got
> > > > >     >     frameworks like kubernetes which can automatically
> > provision
> > > > new
> > > > > nodes
> > > > >     > if
> > > > >     >     one fails. So the argument for dynamic membership is
> > becoming
> > > > > weaker
> > > > >     > in my
> > > > >     >     opinion. This KIP is basically trying to impose a small
> > > degree
> > > > of
> > > > >     > static
> > > > >     >     membership anyway so that rolling restarts do not change
> > > > > membership.
> > > > >     >     Anyway, curious to hear some thoughts about this from you
> > and
> > > > the
> > > > >     > others
> > > > >     >     who work on streams.
> > > > >     >
> > > > >     >     Thanks,
> > > > >     >     Jason
> > > > >     >
> > > > >     >
> > > > >     >     On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen <
> > > > > bche...@outlook.com>
> > > > >     > wrote:
> > > > >     >
> > > > >     >     > Thanks for the replies, James and Jason. Let me try to
> > > > > summarize your
> > > > >     >     > concerns.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > I think James' question is primarily the severity of
> user
> > > > > using this
> > > > >     >     > config wrongly. The impact would be that the same
> member
> > id
> > > > > being
> > > > >     > used by
> > > > >     >     > multiple or even all of the consumers. The assignment
> > > > protocol
> > > > >     > couldn't
> > > > >     >     > distinguish any of the overlapping consumers, thus
> > > assigning
> > > > > the
> > > > >     > exact same
> > > > >     >     > partitions multiple times to different consumers. I
> would
> > > say
> > > > > the
> > > > >     > processed
> > > > >     >     > result would be including a lot of duplicates and
> > > unnecessary
> > > > > heavy
> > > > >     > load on
> > > > >     >     > the client side, The correctness will depend on the
> user
> > > > logic,
> > > > >     > however I'm
> > > > >     >     > pessimistic.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > Although the impact is very high, the symptom is not
> hard
> > > to
> > > > > triage,
> > > > >     >     > because user could visualize consumer identity
> > overlapping
> > > > > fairly
> > > > >     > easily by
> > > > >     >     > exported consumer metrics. On the user standpoint, they
> > > would
> > > > > be
> > > > >     > fully
> > > > >     >     > aware of the potential erratic status before enabling "
> > > > > member.id"
> > > > >     >     > configuration IMO. Let me know your thoughts James!
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > Next is Jason's suggestion. Jason shared a higher
> > viewpoint
> > > > and
> > > > >     > pointed
> > > > >     >     > out the problem that we need to solve is to maintain "a
> > > > strong
> > > > > bias
> > > > >     > towards
> > > > >     >     > being able to reuse previous state". The proposed
> > approach
> > > is
> > > > > to
> > > > >     > separate
> > > > >     >     > the notion of consumer membership and consumer
> identity.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > The original idea of this KIP was on the Stream
> > > application,
> > > > > so I
> > > > >     >     > understand that the identities of multiple consumers
> > belong
> > > > to
> > > > > one
> > > > >     >     > instance, where each Stream thread will be using one
> > > > dedicated
> > > > > main
> > > > >     >     > consumer. So in a Stream use case, we could internally
> > > > generate
> > > > >     > member id
> > > > >     >     > with USER_DEFINED_ID + STREAM_THREAD_ID.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > In pure consumer use case, this could be a little bit
> > > > > challenging
> > > > >     > since
> > > > >     >     > user could arbitrarily initiate multiple consumers on
> the
> > > > same
> > > > >     > instance
> > > > >     >     > which is out of our library control. This could add up
> > the
> > > > >     > possibility of
> > > > >     >     > member id collision. So instead of making developers
> life
> > > > > easier,
> > > > >     >     > introducing member id config could break the existing
> > code
> > > > > logic and
> > > > >     > take
> > > > >     >     > long time to understand and fix. Although I still
> assume
> > > this
> > > > > is an
> > > > >     >     > advanced config, user may use member id config even
> > before
> > > > > they fully
> > > > >     >     > understand the problem, and use the same set of
> > > > initialization
> > > > > logic
> > > > >     > cross
> > > > >     >     > multiple consumers on the same instance.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > I hope I have explained my understanding of the pros
> and
> > > cons
> > > > > of
> > > > >     > this KIP
> > > > >     >     > better. Remember the core argument of this KIP: If the
> > > broker
> > > > >     > recognize
> > > > >     >     > this consumer as an existing member, it shouldn't
> trigger
> > > > > rebalance.
> > > > >     > If we
> > > > >     >     > build our discussion on top of this argument, the
> client
> > > > > management
> > > > >     > of
> > > > >     >     > group membership could be tricky at first, but
> > considering
> > > > our
> > > > >     > original
> > > > >     >     > motivation to leader-follower rebalance model, I feel
> > that
> > > > > having
> > > > >     > broker to
> > > > >     >     > create membership info and let client maintain would be
> > > less
> > > > >     > appealing and
> > > > >     >     > fragile. Having client generate membership data could
> > build
> > > > up
> > > > >     >     > source-of-truth model and streamline the current
> > > > architecture.
> > > > > We
> > > > >     > need also
> > > > >     >     > consider flexibility introduced by this KIP for cloud
> > users
> > > > to
> > > > >     > coordinate
> > > > >     >     > consumer/stream instances more freely. Honestly, I'm
> > > > > interested in
> > > > >     > Jason's
> > > > >     >     > registration id proposal and open to more voices, but I
> > > feel
> > > > it
> > > > >     > would be
> > > > >     >     > more complex than the current KIP for user to
> understand.
> > > > Hope
> > > > > this
> > > > >     > makes
> > > > >     >     > sense, Jason.
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > Thank you again for the feedback!
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > Best,
> > > > >     >     >
> > > > >     >     > Boyang
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > ________________________________
> > > > >     >     > From: Jason Gustafson <ja...@confluent.io>
> > > > >     >     > Sent: Saturday, July 28, 2018 6:50 AM
> > > > >     >     > To: dev
> > > > >     >     > Subject: Re: [DISCUSS] KIP-345: Reduce multiple
> consumer
> > > > > rebalances
> > > > >     > by
> > > > >     >     > specifying member id
> > > > >     >     >
> > > > >     >     > Hey Boyang,
> > > > >     >     >
> > > > >     >     > Thanks for the KIP. I think my main question is in the
> > same
> > > > > vein as
> > > > >     > James'.
> > > > >     >     > The problem is that the coordinator needs to be able to
> > > > > identify
> > > > >     > which
> > > > >     >     > instance of a particular memberId is the active one.
> For
> > > EOS,
> > > > > each
> > > > >     >     > transactionalId gets an epoch. When a new producer is
> > > > started,
> > > > > it
> > > > >     > bumps the
> > > > >     >     > epoch which allows the transaction coordinator to fence
> > off
> > > > any
> > > > >     > zombie
> > > > >     >     > instances which may try to continue doing work with the
> > old
> > > > > epoch.
> > > > >     > It seems
> > > > >     >     > like we need a similar protection for consumer members.
> > > > >     >     >
> > > > >     >     > Suppose for example that we distinguish between a
> > > > registration
> > > > > id
> > > > >     > which is
> > > > >     >     > provided by the user and a member id which is assigned
> > > > > uniquely by
> > > > >     > the
> > > > >     >     > coordinator. In the JoinGroup request, both the
> > > registration
> > > > > id and
> > > > >     > the
> > > > >     >     > member id are provided. When a consumer is first
> started,
> > > it
> > > > > doesn't
> > > > >     > know
> > > > >     >     > the memberId, so it it provides only the registration
> id.
> > > The
> > > > >     > coordinator
> > > > >     >     > can then assign a new memberId and invalidate the
> > previous
> > > > one
> > > > > that
> > > > >     > was
> > > > >     >     > associated with the registration id. This would then
> > fence
> > > > off
> > > > > the
> > > > >     > previous
> > > > >     >     > instance which was still trying to use the member id.
> > > > >     >     >
> > > > >     >     > Taking a little bit of a step back, I think the main
> > > > > observation in
> > > > >     > this
> > > > >     >     > KIP is that applications with heavy local state need to
> > > have
> > > > a
> > > > >     > strong bias
> > > > >     >     > toward being able to reuse that state. It is a bit like
> > > Kafka
> > > > > itself
> > > > >     > in the
> > > > >     >     > sense that a replica is not moved just because the
> broker
> > > is
> > > > >     > shutdown as
> > > > >     >     > the cost of moving the log is extremely high. I'm
> > wondering
> > > > if
> > > > > we
> > > > >     > need to
> > > > >     >     > think about streams applications in a similar way.
> Should
> > > > > there be a
> > > > >     > static
> > > > >     >     > notion of the members of the group so that streams can
> > make
> > > > >     > rebalancing
> > > > >     >     > decisions more easily without depending so heavily on
> > > > transient
> > > > >     > membership?
> > > > >     >     > I feel the hacks we've put in place in some cases to
> > avoid
> > > > >     > rebalances are a
> > > > >     >     > bit brittle. Delaying group joining for example is an
> > > example
> > > > > of
> > > > >     > this. If
> > > > >     >     > you knew ahead of time who the stable members of the
> > group
> > > > > were,
> > > > >     > then this
> > > > >     >     > would not be needed. Anyway, just a thought.
> > > > >     >     >
> > > > >     >     > Thanks,
> > > > >     >     > Jason
> > > > >     >     >
> > > > >     >     >
> > > > >     >     >
> > > > >     >     > On Fri, Jul 27, 2018 at 1:58 PM, James Cheng <
> > > > > wushuja...@gmail.com>
> > > > >     > wrote:
> > > > >     >     >
> > > > >     >     > > When you say that it will "break", what does this
> > > breakage
> > > > > look
> > > > >     > like?
> > > > >     >     > Will
> > > > >     >     > > the consumer-group be non-functional? Will just those
> > > > > instances be
> > > > >     >     > > non-functional? Or will the group be functional, but
> > the
> > > > >     > rebalancing be
> > > > >     >     > > non-optimal and require more
> round-trips/data-transfer?
> > > > > (similar
> > > > >     > to the
> > > > >     >     > > current algorithm)
> > > > >     >     > >
> > > > >     >     > > I'm trying to assess the potential for user-error and
> > the
> > > > > impact of
> > > > >     >     > > user-error.
> > > > >     >     > >
> > > > >     >     > > -James
> > > > >     >     > >
> > > > >     >     > > > On Jul 27, 2018, at 11:25 AM, Boyang Chen <
> > > > > bche...@outlook.com>
> > > > >     > wrote:
> > > > >     >     > > >
> > > > >     >     > > > Hey James,
> > > > >     >     > > >
> > > > >     >     > > >
> > > > >     >     > > > the algorithm is relying on client side to provide
> > > unique
> > > > >     > consumer
> > > > >     >     > > member id. It will break unless we enforce some sort
> of
> > > > > validation
> > > > >     > (host
> > > > >     >     > +
> > > > >     >     > > port) on the server side. To simplify the first
> > version,
> > > we
> > > > > do not
> > > > >     > plan
> > > > >     >     > to
> > > > >     >     > > enforce validation. A good comparison would be the
> EOS
> > > > > producer
> > > > >     > which is
> > > > >     >     > in
> > > > >     >     > > charge of generating unique transaction id sequence.
> > IMO
> > > > for
> > > > > broker
> > > > >     >     > logic,
> > > > >     >     > > the tolerance of client side error is not unlimited.
> > > > >     >     > > >
> > > > >     >     > > >
> > > > >     >     > > > Thank you!
> > > > >     >     > > >
> > > > >     >     > > >
> > > > >     >     > > > ________________________________
> > > > >     >     > > > From: James Cheng <wushuja...@gmail.com>
> > > > >     >     > > > Sent: Saturday, July 28, 2018 1:26 AM
> > > > >     >     > > > To: dev@kafka.apache.org
> > > > >     >     > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple
> > > consumer
> > > > >     > rebalances by
> > > > >     >     > > specifying member id
> > > > >     >     > > >
> > > > >     >     > > >
> > > > >     >     > > >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang <
> > > > > wangg...@gmail.com
> > > > >     > >
> > > > >     >     > wrote:
> > > > >     >     > > >>
> > > > >     >     > > >> Hi Boyang,
> > > > >     >     > > >>
> > > > >     >     > > >> Thanks for the proposed KIP. I made a pass over
> the
> > > wiki
> > > > > and
> > > > >     > here are
> > > > >     >     > > some
> > > > >     >     > > >> comments / questions:
> > > > >     >     > > >>
> > > > >     >     > > >> 1. In order to preserve broker compatibility, we
> > need
> > > to
> > > > > make
> > > > >     > sure the
> > > > >     >     > > >> broker version discovery logic can be integrated
> > with
> > > > > this new
> > > > >     > logic.
> > > > >     >     > > I.e.
> > > > >     >     > > >> if a newer versioned consumer is talking to an
> older
> > > > > versioned
> > > > >     > broker
> > > > >     >     > > who
> > > > >     >     > > >> does not recognize V4, the client needs to
> downgrade
> > > its
> > > > >     >     > > JoinGroupRequest
> > > > >     >     > > >> version to V3 and not setting the member-id
> > > > specifically.
> > > > > You
> > > > >     > can
> > > > >     >     > take a
> > > > >     >     > > >> look at the ApiVersionsRequest and see how to work
> > > with
> > > > > it.
> > > > >     >     > > >>
> > > > >     >     > > >> 2. There may exist some manners to validate that
> two
> > > > > different
> > > > >     > clients
> > > > >     >     > > do
> > > > >     >     > > >> not send with the same member id, for example if
> we
> > > pass
> > > > > along
> > > > >     > the
> > > > >     >     > > >> host:port information from KafkaApis to the
> > > > > GroupCoordinator
> > > > >     >     > interface.
> > > > >     >     > > But
> > > > >     >     > > >> I think this is overly complicated the logic and
> may
> > > not
> > > > >     > worthwhile
> > > > >     >     > than
> > > > >     >     > > >> relying on users to specify unique member ids.
> > > > >     >     > > >
> > > > >     >     > > > Boyang,
> > > > >     >     > > >
> > > > >     >     > > > Thanks for the KIP! How will the algorithm behave
> if
> > > > > multiple
> > > > >     > consumers
> > > > >     >     > > provide the same member id?
> > > > >     >     > > >
> > > > >     >     > > > -James
> > > > >     >     > > >
> > > > >     >     > > >> 3. Minor: you would need to bumping up the version
> > of
> > > > >     >     > JoinGroupResponse
> > > > >     >     > > to
> > > > >     >     > > >> V4 as well.
> > > > >     >     > > >>
> > > > >     >     > > >> 4. Minor: in the wiki page, you need to specify
> the
> > > > actual
> > > > >     > string
> > > > >     >     > value
> > > > >     >     > > for
> > > > >     >     > > >> `MEMBER_ID`, for example "member.id".
> > > > >     >     > > >>
> > > > >     >     > > >> 5. When this additional config it specified by
> > users,
> > > we
> > > > > should
> > > > >     >     > consider
> > > > >     >     > > >> setting the default of internal
> > > > > `LEAVE_GROUP_ON_CLOSE_CONFIG` to
> > > > >     >     > false,
> > > > >     >     > > >> since otherwise its effectiveness would be less.
> > > > >     >     > > >>
> > > > >     >     > > >>
> > > > >     >     > > >> Guozhang
> > > > >     >     > > >>
> > > > >     >     > > >>
> > > > >     >     > > >>
> > > > >     >     > > >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen <
> > > > >     > bche...@outlook.com>
> > > > >     >     > > wrote:
> > > > >     >     > > >>>
> > > > >     >     > > >>> Hey friends,
> > > > >     >     > > >>>
> > > > >     >     > > >>>
> > > > >     >     > > >>> I would like to open a discussion thread on
> > KIP-345:
> > > > >     >     > > >>>
> > > > >     >     > > >>>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 345%3A
> > > > >     >     > > >>> +Reduce+multiple+consumer+reba
> > lances+by+specifying+
> > > > > member+id
> > > > >     >     > > >>>
> > > > >     >     > > >>>
> > > > >     >     > > >>> This KIP is trying to resolve multiple rebalances
> > by
> > > > >     > maintaining the
> > > > >     >     > > >>> consumer member id across rebalance generations.
> I
> > > have
> > > > >     > verified the
> > > > >     >     > > theory
> > > > >     >     > > >>> on our internal Stream application, and it could
> > > reduce
> > > > >     > rebalance
> > > > >     >     > time
> > > > >     >     > > to a
> > > > >     >     > > >>> few seconds when service is rolling restart.
> > > > >     >     > > >>>
> > > > >     >     > > >>>
> > > > >     >     > > >>> Let me know your thoughts, thank you!
> > > > >     >     > > >>>
> > > > >     >     > > >>>
> > > > >     >     > > >>> Best,
> > > > >     >     > > >>>
> > > > >     >     > > >>> Boyang
> > > > >     >     > > >>>
> > > > >     >     > > >>
> > > > >     >     > > >>
> > > > >     >     > > >>
> > > > >     >     > > >> --
> > > > >     >     > > >> -- Guozhang
> > > > >     >     > >
> > > > >     >     > >
> > > > >     >     >
> > > > >     >
> > > > >     >
> > > > >     >
> > > > >
> > > > >
> > > > >     --
> > > > >     -- Guozhang
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to