Jason,

I really appreciate the broader conversation that you are bringing up here.

I've been working on an application that does streaming joins for a while now, 
and we face a similar issue with group membership being dynamic. We are 
currently using our own StickyAssignor and take special care during rolling 
restarts to make sure consumer assignments do not change. 

I think a feature that allows for group membership to be fixed, along with a 
CLI for adding or removing a node from the group be ideal. This reminds me of 
some of the work by the DynamoDB team about 10 years back when they 
differentiated transient failures from permanent failures to deal with this 
problems like this. 

Best,

Mike

On 7/30/18, 5:36 PM, "Jason Gustafson" <ja...@confluent.io> wrote:

    Hi Boyang,
    
    Thanks for the response. I think the main point I was trying to make is the
    need for fencing. I am not too concerned about how to generate a unique id
    on the client side. The approach you suggested for streams seems
    reasonable. However, any time you reuse an id, you need to be careful that
    there is only one instance that can use it at any time. We are always
    running into problems where a previous instance of an application comes
    back to life unexpectedly after we had already presumed it was dead.
    Fencing ensures that even if this happens, it cannot do any damage. I would
    say that some protection from zombies is a requirement here.
    
    The second point was more abstract and mainly meant to initiate some
    discussion. We have gone through several iterations of improvements to try
    and reduce the rebalancing in consumer applications. We started out trying
    to tune the session timeout. We have added an internal config to skip
    leaving the group when streams shuts down. The broker now has a config to
    delay rebalances in case all consumers join at about the same time. The
    approach in this KIP is a step in a more principled direction, but it still
    feels like we are making this unnecessarily hard on ourselves by insisting
    that group membership is a dynamic concept. In practice, the number of
    nodes dedicated to an application tends to remain fixed for long periods of
    time and only scales up or down when needed. And these days you've got
    frameworks like kubernetes which can automatically provision new nodes if
    one fails. So the argument for dynamic membership is becoming weaker in my
    opinion. This KIP is basically trying to impose a small degree of static
    membership anyway so that rolling restarts do not change membership.
    Anyway, curious to hear some thoughts about this from you and the others
    who work on streams.
    
    Thanks,
    Jason
    
    
    On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen <bche...@outlook.com> wrote:
    
    > Thanks for the replies, James and Jason. Let me try to summarize your
    > concerns.
    >
    >
    > I think James' question is primarily the severity of user using this
    > config wrongly. The impact would be that the same member id being used by
    > multiple or even all of the consumers. The assignment protocol couldn't
    > distinguish any of the overlapping consumers, thus assigning the exact 
same
    > partitions multiple times to different consumers. I would say the 
processed
    > result would be including a lot of duplicates and unnecessary heavy load 
on
    > the client side, The correctness will depend on the user logic, however 
I'm
    > pessimistic.
    >
    >
    > Although the impact is very high, the symptom is not hard to triage,
    > because user could visualize consumer identity overlapping fairly easily 
by
    > exported consumer metrics. On the user standpoint, they would be fully
    > aware of the potential erratic status before enabling "member.id"
    > configuration IMO. Let me know your thoughts James!
    >
    >
    > Next is Jason's suggestion. Jason shared a higher viewpoint and pointed
    > out the problem that we need to solve is to maintain "a strong bias 
towards
    > being able to reuse previous state". The proposed approach is to separate
    > the notion of consumer membership and consumer identity.
    >
    >
    > The original idea of this KIP was on the Stream application, so I
    > understand that the identities of multiple consumers belong to one
    > instance, where each Stream thread will be using one dedicated main
    > consumer. So in a Stream use case, we could internally generate member id
    > with USER_DEFINED_ID + STREAM_THREAD_ID.
    >
    >
    > In pure consumer use case, this could be a little bit challenging since
    > user could arbitrarily initiate multiple consumers on the same instance
    > which is out of our library control. This could add up the possibility of
    > member id collision. So instead of making developers life easier,
    > introducing member id config could break the existing code logic and take
    > long time to understand and fix. Although I still assume this is an
    > advanced config, user may use member id config even before they fully
    > understand the problem, and use the same set of initialization logic cross
    > multiple consumers on the same instance.
    >
    >
    > I hope I have explained my understanding of the pros and cons of this KIP
    > better. Remember the core argument of this KIP: If the broker recognize
    > this consumer as an existing member, it shouldn't trigger rebalance. If we
    > build our discussion on top of this argument, the client management of
    > group membership could be tricky at first, but considering our original
    > motivation to leader-follower rebalance model, I feel that having broker 
to
    > create membership info and let client maintain would be less appealing and
    > fragile. Having client generate membership data could build up
    > source-of-truth model and streamline the current architecture. We need 
also
    > consider flexibility introduced by this KIP for cloud users to coordinate
    > consumer/stream instances more freely. Honestly, I'm interested in Jason's
    > registration id proposal and open to more voices, but I feel it would be
    > more complex than the current KIP for user to understand. Hope this makes
    > sense, Jason.
    >
    >
    > Thank you again for the feedback!
    >
    >
    > Best,
    >
    > Boyang
    >
    >
    > ________________________________
    > From: Jason Gustafson <ja...@confluent.io>
    > Sent: Saturday, July 28, 2018 6:50 AM
    > To: dev
    > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
    > specifying member id
    >
    > Hey Boyang,
    >
    > Thanks for the KIP. I think my main question is in the same vein as 
James'.
    > The problem is that the coordinator needs to be able to identify which
    > instance of a particular memberId is the active one. For EOS, each
    > transactionalId gets an epoch. When a new producer is started, it bumps 
the
    > epoch which allows the transaction coordinator to fence off any zombie
    > instances which may try to continue doing work with the old epoch. It 
seems
    > like we need a similar protection for consumer members.
    >
    > Suppose for example that we distinguish between a registration id which is
    > provided by the user and a member id which is assigned uniquely by the
    > coordinator. In the JoinGroup request, both the registration id and the
    > member id are provided. When a consumer is first started, it doesn't know
    > the memberId, so it it provides only the registration id. The coordinator
    > can then assign a new memberId and invalidate the previous one that was
    > associated with the registration id. This would then fence off the 
previous
    > instance which was still trying to use the member id.
    >
    > Taking a little bit of a step back, I think the main observation in this
    > KIP is that applications with heavy local state need to have a strong bias
    > toward being able to reuse that state. It is a bit like Kafka itself in 
the
    > sense that a replica is not moved just because the broker is shutdown as
    > the cost of moving the log is extremely high. I'm wondering if we need to
    > think about streams applications in a similar way. Should there be a 
static
    > notion of the members of the group so that streams can make rebalancing
    > decisions more easily without depending so heavily on transient 
membership?
    > I feel the hacks we've put in place in some cases to avoid rebalances are 
a
    > bit brittle. Delaying group joining for example is an example of this. If
    > you knew ahead of time who the stable members of the group were, then this
    > would not be needed. Anyway, just a thought.
    >
    > Thanks,
    > Jason
    >
    >
    >
    > On Fri, Jul 27, 2018 at 1:58 PM, James Cheng <wushuja...@gmail.com> wrote:
    >
    > > When you say that it will "break", what does this breakage look like?
    > Will
    > > the consumer-group be non-functional? Will just those instances be
    > > non-functional? Or will the group be functional, but the rebalancing be
    > > non-optimal and require more round-trips/data-transfer? (similar to the
    > > current algorithm)
    > >
    > > I'm trying to assess the potential for user-error and the impact of
    > > user-error.
    > >
    > > -James
    > >
    > > > On Jul 27, 2018, at 11:25 AM, Boyang Chen <bche...@outlook.com> wrote:
    > > >
    > > > Hey James,
    > > >
    > > >
    > > > the algorithm is relying on client side to provide unique consumer
    > > member id. It will break unless we enforce some sort of validation (host
    > +
    > > port) on the server side. To simplify the first version, we do not plan
    > to
    > > enforce validation. A good comparison would be the EOS producer which is
    > in
    > > charge of generating unique transaction id sequence. IMO for broker
    > logic,
    > > the tolerance of client side error is not unlimited.
    > > >
    > > >
    > > > Thank you!
    > > >
    > > >
    > > > ________________________________
    > > > From: James Cheng <wushuja...@gmail.com>
    > > > Sent: Saturday, July 28, 2018 1:26 AM
    > > > To: dev@kafka.apache.org
    > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
    > > specifying member id
    > > >
    > > >
    > > >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang <wangg...@gmail.com>
    > wrote:
    > > >>
    > > >> Hi Boyang,
    > > >>
    > > >> Thanks for the proposed KIP. I made a pass over the wiki and here are
    > > some
    > > >> comments / questions:
    > > >>
    > > >> 1. In order to preserve broker compatibility, we need to make sure 
the
    > > >> broker version discovery logic can be integrated with this new logic.
    > > I.e.
    > > >> if a newer versioned consumer is talking to an older versioned broker
    > > who
    > > >> does not recognize V4, the client needs to downgrade its
    > > JoinGroupRequest
    > > >> version to V3 and not setting the member-id specifically. You can
    > take a
    > > >> look at the ApiVersionsRequest and see how to work with it.
    > > >>
    > > >> 2. There may exist some manners to validate that two different 
clients
    > > do
    > > >> not send with the same member id, for example if we pass along the
    > > >> host:port information from KafkaApis to the GroupCoordinator
    > interface.
    > > But
    > > >> I think this is overly complicated the logic and may not worthwhile
    > than
    > > >> relying on users to specify unique member ids.
    > > >
    > > > Boyang,
    > > >
    > > > Thanks for the KIP! How will the algorithm behave if multiple 
consumers
    > > provide the same member id?
    > > >
    > > > -James
    > > >
    > > >> 3. Minor: you would need to bumping up the version of
    > JoinGroupResponse
    > > to
    > > >> V4 as well.
    > > >>
    > > >> 4. Minor: in the wiki page, you need to specify the actual string
    > value
    > > for
    > > >> `MEMBER_ID`, for example "member.id".
    > > >>
    > > >> 5. When this additional config it specified by users, we should
    > consider
    > > >> setting the default of internal `LEAVE_GROUP_ON_CLOSE_CONFIG` to
    > false,
    > > >> since otherwise its effectiveness would be less.
    > > >>
    > > >>
    > > >> Guozhang
    > > >>
    > > >>
    > > >>
    > > >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen <bche...@outlook.com>
    > > wrote:
    > > >>>
    > > >>> Hey friends,
    > > >>>
    > > >>>
    > > >>> I would like to open a discussion thread on KIP-345:
    > > >>>
    > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
    > > >>> +Reduce+multiple+consumer+rebalances+by+specifying+member+id
    > > >>>
    > > >>>
    > > >>> This KIP is trying to resolve multiple rebalances by maintaining the
    > > >>> consumer member id across rebalance generations. I have verified the
    > > theory
    > > >>> on our internal Stream application, and it could reduce rebalance
    > time
    > > to a
    > > >>> few seconds when service is rolling restart.
    > > >>>
    > > >>>
    > > >>> Let me know your thoughts, thank you!
    > > >>>
    > > >>>
    > > >>> Best,
    > > >>>
    > > >>> Boyang
    > > >>>
    > > >>
    > > >>
    > > >>
    > > >> --
    > > >> -- Guozhang
    > >
    > >
    >
    

Reply via email to