Guozhang, 

Thanks for giving us a great starting point.

A few questions that come to mind right away:

1) What do you think a reasonable group-change-timeout would be? I am thinking 
on the order of minutes (5 minutes?)

2) Will the nodes that are still alive continue to make progress during a 
static membership rebalance? I believe during a rebalance today all consumers 
wait for the SyncGroupResponse before continuing to read data from the brokers. 
If that is the case, I think it'd be ideal all nodes that are still alive 
during a static group membership change to continue to make progress as if 
nothing happened such that are there is no impact to the majority of the group 
when one node is bounced (quick restart).

3) Do you think an explicit method for forcing a rebalance would be needed? I 
am thinking of a scenario such as a disk failure on a node, and that node will 
definitely not come back. Rather than waiting up to the group-change-timeout, I 
think it'd be good an admin to force a rebalance rather than wait the full 
group-change-timeout. Maybe this is an over optimization, but I think certain 
use cases would benefit from static group membership with the ability to force 
a rebalance at any time. 

Best,

Mike

On 7/30/18, 6:57 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:

    Hello Boyang / Jason / Mike,
    
    Thanks for your thoughtful inputs! Regarding the fencing issue, I've
    thought about leveraging the epoch notion from PID of transactional
    messaging before, but since in this proposal we do not always require
    member ids from clients, and hence could have a mixed of user-specified
    member ids with coordinator-generated member ids, the epoch idea may not be
    very well suited for this scenario. Of course, we can tighten the screws a
    bit by requiring that for a given consumer group, all consumers must either
    be giving their member ids or leveraging on consumer coordinator to give
    member ids, which does not sound a very strict requirement in practice, and
    all we need to do is the add a new field in the join group request (we are
    proposing to bump up its version anyways). And hence I've also thought
    about another simple fencing approach, aka "first comer wins", that is to
    pass in the host / port information from KafkaApis to GroupCoordinator to
    validate if it matches the existing member id's cached host / post. It does
    not always guarantee that we fence the right zombies because of "first
    comer wins" (think of a scenario where the right client gets kicked out,
    and then before it re-joins the actual zombie with the same member id gets
    joined), but as I mentioned previously it will poke some leaks into the
    code hierarchy a bit so I'm also hesitant to do it. If people think it is
    indeed a must-have than good-to-have, I'd suggest we leverage on host-port
    than using the epoch mechanism then.
    
    ------------------------------------------
    
    As for the more general idea of having a static membership protocol to
    better integrated with Cloud environment like k8s, I think the first idea
    may actually be better fit with it.
    
    Just a quick summary of what rebalance issues we face today:
    
    1. Application start: when multi-instance application is started, multiple
    rebalances are triggered to migrate states to newly started instances since
    not all instances are joining at the same time. NOTE that KIP-134
    
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance>
    is
    targeted for this issue, but as an after-thought it may not be the optimal
    solution.
    2. Application shutdown: similarly to 1), when multi-instance application
    is shutting down, multiple rebalances are triggered.
    3. Application scale out: when a new instance is started, one rebalance is
    executed to shuffle all assignment, rather than just a "partial" shuffling
    of some of the members.
    4. Application scale in: similarly to 3), when an existing instance
    gracefully shutdown, once rebalance is executed to shuffle all assignment.
    5. Application instance bounce (upgrade, config change etc): one instance
    shut down and then restart, it will trigger two rebalances. NOTE that
    disabling leave-group is targeted for this issue.
    6. Application instance failure: one instance failed, and probably a new
    instance start to take its assignment (e.g. k8s), it will trigger two
    rebalances. The different with 3) above is that new instance would not have
    local cached tasks.
    
    
    Among them, I think 1/2/5/6 could potentially be grouped together as
    "static membership"; 4/5 could be grouped as another category, of allowing
    "incremental rebalance" or "partial rebalance" than full-rebalance. Of
    course, having incremental rebalances can help on 1/2/5/6 as well to reduce
    the cost of each unnecessary rebalance, but ideally we want NO rebalances
    at all for these cases, which will be more true with k8s / etc integrations
    or static memberships.
    
    ------------------------------------------
    
    So just to throw in a sketchy idea following this route for 1/2/5/6 for
    brainstorming kick-off:
    
    
    1. We bump up the JoinGroupRequest with additional fields:
    
      1.a) a flag indicating "static" or "dynamic" membership protocols.
      1.b) with "static" membership, we also add the pre-defined member id.
      1.c) with "static" membership, we also add an optional
    "group-change-timeout" value.
    
    2. On the broker side, we enforce only one of the two protocols for all
    group members: we accept the protocol on the first joined member of the
    group, and if later joining members indicate a different membership
    protocol, we reject it. If the group-change-timeout value was different to
    the first joined member, we reject it as well.
    
    3. With dynamic membership, nothing is changed; with static membership, we
    do the following:
    
      3.a) never assign member ids, instead always expect the joining members
    to come with their own member id; we could do the fencing based on host /
    port here.
      3.b) upon receiving the first join group request, use the
    "group-change-timeout" instead of the session-timeout as rebalance timeout
    to wait for other members to join. This is for 1) above.
      3.c) upon receiving a leave-group request, use the "group-change-timeout"
    to wait for more members to leave group as well, or for the left members to
    re-join. After the timeout we trigger a rebalance with whatever have left
    in the members list. This is for all 2 (expecting other members to send
    leave-group) and 5/6 (expecting the left member to re-join).
    
    4. As a result, we will deprecate KIP-134 and disable-on-leave-group as
    well.
    
    
    The key idea is that, with "static" membership, groups should be created or
    terminated as a whole, and dynamic member changes are not expected often.
    Hence we would not react to those membership-changing events immediately
    but wait for longer specified time expecting some other systems like k8s
    will resume the group members. WDYT?
    
    
    Guozhang
    
    
    On Mon, Jul 30, 2018 at 3:05 PM, Mike Freyberger <mfreyber...@appnexus.com>
    wrote:
    
    > Jason,
    >
    > I really appreciate the broader conversation that you are bringing up 
here.
    >
    > I've been working on an application that does streaming joins for a while
    > now, and we face a similar issue with group membership being dynamic. We
    > are currently using our own StickyAssignor and take special care during
    > rolling restarts to make sure consumer assignments do not change.
    >
    > I think a feature that allows for group membership to be fixed, along with
    > a CLI for adding or removing a node from the group be ideal. This reminds
    > me of some of the work by the DynamoDB team about 10 years back when they
    > differentiated transient failures from permanent failures to deal with 
this
    > problems like this.
    >
    > Best,
    >
    > Mike
    >
    > On 7/30/18, 5:36 PM, "Jason Gustafson" <ja...@confluent.io> wrote:
    >
    >     Hi Boyang,
    >
    >     Thanks for the response. I think the main point I was trying to make
    > is the
    >     need for fencing. I am not too concerned about how to generate a
    > unique id
    >     on the client side. The approach you suggested for streams seems
    >     reasonable. However, any time you reuse an id, you need to be careful
    > that
    >     there is only one instance that can use it at any time. We are always
    >     running into problems where a previous instance of an application 
comes
    >     back to life unexpectedly after we had already presumed it was dead.
    >     Fencing ensures that even if this happens, it cannot do any damage. I
    > would
    >     say that some protection from zombies is a requirement here.
    >
    >     The second point was more abstract and mainly meant to initiate some
    >     discussion. We have gone through several iterations of improvements to
    > try
    >     and reduce the rebalancing in consumer applications. We started out
    > trying
    >     to tune the session timeout. We have added an internal config to skip
    >     leaving the group when streams shuts down. The broker now has a config
    > to
    >     delay rebalances in case all consumers join at about the same time. 
The
    >     approach in this KIP is a step in a more principled direction, but it
    > still
    >     feels like we are making this unnecessarily hard on ourselves by
    > insisting
    >     that group membership is a dynamic concept. In practice, the number of
    >     nodes dedicated to an application tends to remain fixed for long
    > periods of
    >     time and only scales up or down when needed. And these days you've got
    >     frameworks like kubernetes which can automatically provision new nodes
    > if
    >     one fails. So the argument for dynamic membership is becoming weaker
    > in my
    >     opinion. This KIP is basically trying to impose a small degree of
    > static
    >     membership anyway so that rolling restarts do not change membership.
    >     Anyway, curious to hear some thoughts about this from you and the
    > others
    >     who work on streams.
    >
    >     Thanks,
    >     Jason
    >
    >
    >     On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen <bche...@outlook.com>
    > wrote:
    >
    >     > Thanks for the replies, James and Jason. Let me try to summarize 
your
    >     > concerns.
    >     >
    >     >
    >     > I think James' question is primarily the severity of user using this
    >     > config wrongly. The impact would be that the same member id being
    > used by
    >     > multiple or even all of the consumers. The assignment protocol
    > couldn't
    >     > distinguish any of the overlapping consumers, thus assigning the
    > exact same
    >     > partitions multiple times to different consumers. I would say the
    > processed
    >     > result would be including a lot of duplicates and unnecessary heavy
    > load on
    >     > the client side, The correctness will depend on the user logic,
    > however I'm
    >     > pessimistic.
    >     >
    >     >
    >     > Although the impact is very high, the symptom is not hard to triage,
    >     > because user could visualize consumer identity overlapping fairly
    > easily by
    >     > exported consumer metrics. On the user standpoint, they would be
    > fully
    >     > aware of the potential erratic status before enabling "member.id"
    >     > configuration IMO. Let me know your thoughts James!
    >     >
    >     >
    >     > Next is Jason's suggestion. Jason shared a higher viewpoint and
    > pointed
    >     > out the problem that we need to solve is to maintain "a strong bias
    > towards
    >     > being able to reuse previous state". The proposed approach is to
    > separate
    >     > the notion of consumer membership and consumer identity.
    >     >
    >     >
    >     > The original idea of this KIP was on the Stream application, so I
    >     > understand that the identities of multiple consumers belong to one
    >     > instance, where each Stream thread will be using one dedicated main
    >     > consumer. So in a Stream use case, we could internally generate
    > member id
    >     > with USER_DEFINED_ID + STREAM_THREAD_ID.
    >     >
    >     >
    >     > In pure consumer use case, this could be a little bit challenging
    > since
    >     > user could arbitrarily initiate multiple consumers on the same
    > instance
    >     > which is out of our library control. This could add up the
    > possibility of
    >     > member id collision. So instead of making developers life easier,
    >     > introducing member id config could break the existing code logic and
    > take
    >     > long time to understand and fix. Although I still assume this is an
    >     > advanced config, user may use member id config even before they 
fully
    >     > understand the problem, and use the same set of initialization logic
    > cross
    >     > multiple consumers on the same instance.
    >     >
    >     >
    >     > I hope I have explained my understanding of the pros and cons of
    > this KIP
    >     > better. Remember the core argument of this KIP: If the broker
    > recognize
    >     > this consumer as an existing member, it shouldn't trigger rebalance.
    > If we
    >     > build our discussion on top of this argument, the client management
    > of
    >     > group membership could be tricky at first, but considering our
    > original
    >     > motivation to leader-follower rebalance model, I feel that having
    > broker to
    >     > create membership info and let client maintain would be less
    > appealing and
    >     > fragile. Having client generate membership data could build up
    >     > source-of-truth model and streamline the current architecture. We
    > need also
    >     > consider flexibility introduced by this KIP for cloud users to
    > coordinate
    >     > consumer/stream instances more freely. Honestly, I'm interested in
    > Jason's
    >     > registration id proposal and open to more voices, but I feel it
    > would be
    >     > more complex than the current KIP for user to understand. Hope this
    > makes
    >     > sense, Jason.
    >     >
    >     >
    >     > Thank you again for the feedback!
    >     >
    >     >
    >     > Best,
    >     >
    >     > Boyang
    >     >
    >     >
    >     > ________________________________
    >     > From: Jason Gustafson <ja...@confluent.io>
    >     > Sent: Saturday, July 28, 2018 6:50 AM
    >     > To: dev
    >     > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
    > by
    >     > specifying member id
    >     >
    >     > Hey Boyang,
    >     >
    >     > Thanks for the KIP. I think my main question is in the same vein as
    > James'.
    >     > The problem is that the coordinator needs to be able to identify
    > which
    >     > instance of a particular memberId is the active one. For EOS, each
    >     > transactionalId gets an epoch. When a new producer is started, it
    > bumps the
    >     > epoch which allows the transaction coordinator to fence off any
    > zombie
    >     > instances which may try to continue doing work with the old epoch.
    > It seems
    >     > like we need a similar protection for consumer members.
    >     >
    >     > Suppose for example that we distinguish between a registration id
    > which is
    >     > provided by the user and a member id which is assigned uniquely by
    > the
    >     > coordinator. In the JoinGroup request, both the registration id and
    > the
    >     > member id are provided. When a consumer is first started, it doesn't
    > know
    >     > the memberId, so it it provides only the registration id. The
    > coordinator
    >     > can then assign a new memberId and invalidate the previous one that
    > was
    >     > associated with the registration id. This would then fence off the
    > previous
    >     > instance which was still trying to use the member id.
    >     >
    >     > Taking a little bit of a step back, I think the main observation in
    > this
    >     > KIP is that applications with heavy local state need to have a
    > strong bias
    >     > toward being able to reuse that state. It is a bit like Kafka itself
    > in the
    >     > sense that a replica is not moved just because the broker is
    > shutdown as
    >     > the cost of moving the log is extremely high. I'm wondering if we
    > need to
    >     > think about streams applications in a similar way. Should there be a
    > static
    >     > notion of the members of the group so that streams can make
    > rebalancing
    >     > decisions more easily without depending so heavily on transient
    > membership?
    >     > I feel the hacks we've put in place in some cases to avoid
    > rebalances are a
    >     > bit brittle. Delaying group joining for example is an example of
    > this. If
    >     > you knew ahead of time who the stable members of the group were,
    > then this
    >     > would not be needed. Anyway, just a thought.
    >     >
    >     > Thanks,
    >     > Jason
    >     >
    >     >
    >     >
    >     > On Fri, Jul 27, 2018 at 1:58 PM, James Cheng <wushuja...@gmail.com>
    > wrote:
    >     >
    >     > > When you say that it will "break", what does this breakage look
    > like?
    >     > Will
    >     > > the consumer-group be non-functional? Will just those instances be
    >     > > non-functional? Or will the group be functional, but the
    > rebalancing be
    >     > > non-optimal and require more round-trips/data-transfer? (similar
    > to the
    >     > > current algorithm)
    >     > >
    >     > > I'm trying to assess the potential for user-error and the impact 
of
    >     > > user-error.
    >     > >
    >     > > -James
    >     > >
    >     > > > On Jul 27, 2018, at 11:25 AM, Boyang Chen <bche...@outlook.com>
    > wrote:
    >     > > >
    >     > > > Hey James,
    >     > > >
    >     > > >
    >     > > > the algorithm is relying on client side to provide unique
    > consumer
    >     > > member id. It will break unless we enforce some sort of validation
    > (host
    >     > +
    >     > > port) on the server side. To simplify the first version, we do not
    > plan
    >     > to
    >     > > enforce validation. A good comparison would be the EOS producer
    > which is
    >     > in
    >     > > charge of generating unique transaction id sequence. IMO for 
broker
    >     > logic,
    >     > > the tolerance of client side error is not unlimited.
    >     > > >
    >     > > >
    >     > > > Thank you!
    >     > > >
    >     > > >
    >     > > > ________________________________
    >     > > > From: James Cheng <wushuja...@gmail.com>
    >     > > > Sent: Saturday, July 28, 2018 1:26 AM
    >     > > > To: dev@kafka.apache.org
    >     > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
    > rebalances by
    >     > > specifying member id
    >     > > >
    >     > > >
    >     > > >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang <wangg...@gmail.com
    > >
    >     > wrote:
    >     > > >>
    >     > > >> Hi Boyang,
    >     > > >>
    >     > > >> Thanks for the proposed KIP. I made a pass over the wiki and
    > here are
    >     > > some
    >     > > >> comments / questions:
    >     > > >>
    >     > > >> 1. In order to preserve broker compatibility, we need to make
    > sure the
    >     > > >> broker version discovery logic can be integrated with this new
    > logic.
    >     > > I.e.
    >     > > >> if a newer versioned consumer is talking to an older versioned
    > broker
    >     > > who
    >     > > >> does not recognize V4, the client needs to downgrade its
    >     > > JoinGroupRequest
    >     > > >> version to V3 and not setting the member-id specifically. You
    > can
    >     > take a
    >     > > >> look at the ApiVersionsRequest and see how to work with it.
    >     > > >>
    >     > > >> 2. There may exist some manners to validate that two different
    > clients
    >     > > do
    >     > > >> not send with the same member id, for example if we pass along
    > the
    >     > > >> host:port information from KafkaApis to the GroupCoordinator
    >     > interface.
    >     > > But
    >     > > >> I think this is overly complicated the logic and may not
    > worthwhile
    >     > than
    >     > > >> relying on users to specify unique member ids.
    >     > > >
    >     > > > Boyang,
    >     > > >
    >     > > > Thanks for the KIP! How will the algorithm behave if multiple
    > consumers
    >     > > provide the same member id?
    >     > > >
    >     > > > -James
    >     > > >
    >     > > >> 3. Minor: you would need to bumping up the version of
    >     > JoinGroupResponse
    >     > > to
    >     > > >> V4 as well.
    >     > > >>
    >     > > >> 4. Minor: in the wiki page, you need to specify the actual
    > string
    >     > value
    >     > > for
    >     > > >> `MEMBER_ID`, for example "member.id".
    >     > > >>
    >     > > >> 5. When this additional config it specified by users, we should
    >     > consider
    >     > > >> setting the default of internal `LEAVE_GROUP_ON_CLOSE_CONFIG` 
to
    >     > false,
    >     > > >> since otherwise its effectiveness would be less.
    >     > > >>
    >     > > >>
    >     > > >> Guozhang
    >     > > >>
    >     > > >>
    >     > > >>
    >     > > >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen <
    > bche...@outlook.com>
    >     > > wrote:
    >     > > >>>
    >     > > >>> Hey friends,
    >     > > >>>
    >     > > >>>
    >     > > >>> I would like to open a discussion thread on KIP-345:
    >     > > >>>
    >     > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
    >     > > >>> +Reduce+multiple+consumer+rebalances+by+specifying+member+id
    >     > > >>>
    >     > > >>>
    >     > > >>> This KIP is trying to resolve multiple rebalances by
    > maintaining the
    >     > > >>> consumer member id across rebalance generations. I have
    > verified the
    >     > > theory
    >     > > >>> on our internal Stream application, and it could reduce
    > rebalance
    >     > time
    >     > > to a
    >     > > >>> few seconds when service is rolling restart.
    >     > > >>>
    >     > > >>>
    >     > > >>> Let me know your thoughts, thank you!
    >     > > >>>
    >     > > >>>
    >     > > >>> Best,
    >     > > >>>
    >     > > >>> Boyang
    >     > > >>>
    >     > > >>
    >     > > >>
    >     > > >>
    >     > > >> --
    >     > > >> -- Guozhang
    >     > >
    >     > >
    >     >
    >
    >
    >
    
    
    -- 
    -- Guozhang
    

Reply via email to