Hey Guozhang,

I don't have very detailed design. But I have some high level idea that can
probably work. Here is how it looks like:

- When controller needs to do leadership movement
   - Controller sends a request asking the brokers involved in the
migration to use acks=[isr_set] and wait for response
   - Controller updates znodes, sends LeaderAndIsrRequest to migrate
leadership and wait for response.
   - Controller sends a request asking brokers involves in the migration to
use acks=[min_isr_num]. This improves tail latency when there is no
leadership movement.

Personally I find this approach to be relatively straightforward. It wraps
around the existing logic of leadership movement and we can still reply on
the existing logic to avoid data loss in case of network partition. Does
this sound reasonable?

Thanks,
Dong

On Fri, Feb 2, 2018 at 11:26 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Dong,
>
> Could you elaborate a bit more how controller could affect leaders to
> switch between all and quorum?
>
>
> Guozhang
>
>
> On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:
>
>> Hey Guazhang,
>>
>> Got it. Thanks for the detailed explanation. I guess my point is that we
>> can probably achieve the best of both worlds, i.e. maintain the existing
>> behavior of ack="all" while improving the tail latency.
>>
>> Thanks,
>> Dong
>>
>>
>>
>> On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Hi Dong,
>>>
>>> Yes, in terms of fault tolerance "quorum" does not do better than "all",
>>> as I said, with {min.isr} to X+1 Kafka is able to tolerate X failures only.
>>> So if A and B are partitioned off at the same time, then there are two
>>> concurrent failures and we do not guarantee all acked messages will be
>>> retained.
>>>
>>> The goal of my approach is to maintain the behavior of ack="all", which
>>> happen to do better than what Kafka is actually guaranteed: when both A and
>>> B are partitioned off, produced records will not be acked since "all"
>>> requires all replicas (not only ISRs, my previous email has an incorrect
>>> term) are required. This is doing better than tolerating X failures, which
>>> I was proposing to keep, so that we would not introduce any regression
>>> "surprises" to users who are already using "all". In other words, "quorum"
>>> is trading a bit of failure tolerance that is strictly defined on min.isr
>>> for better tail latency.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>
>>>> Hey Guozhang,
>>>>
>>>> According to the new proposal, with 3 replicas, min.isr=2 and
>>>> acks="quorum", it seems that acknowledged messages can still be truncated
>>>> in the network partition scenario you mentioned, right? So I guess the goal
>>>> is for some user to achieve better tail latency at the cost of potential
>>>> message loss?
>>>>
>>>> If this is the case, then I think it may be better to adopt an approach
>>>> where controller dynamically turn on/off this optimization. This provides
>>>> user with peace of mind (i.e. no message loss) while still reducing tail
>>>> latency. What do you think?
>>>>
>>>> Thanks,
>>>> Dong
>>>>
>>>>
>>>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Litao,
>>>>>
>>>>> Just double checking on the leader election details, do you have time
>>>>> to complete the proposal on that part?
>>>>>
>>>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
>>>>> discussion thread that Dong is working on, I figured it is worth pointing
>>>>> out here with a tentative solution:
>>>>>
>>>>>
>>>>> ```
>>>>> Currently, if the producer uses acks=-1, a write will only succeed if
>>>>> the write is received by all in-sync replicas (i.e., committed). This
>>>>> is true even when min.isr is set since we first wait for a message to
>>>>> be committed and then check the min.isr requirement. KIP-250 may
>>>>> change that, but we can discuss the implication there.
>>>>> ```
>>>>>
>>>>> The caveat is that, if we change the acking semantics in KIP-250 that
>>>>> we will only requires num of {min.isr} to acknowledge a produce, then the
>>>>> above scenario will have a caveat: imagine you have {A, B, C} replicas of 
>>>>> a
>>>>> partition with A as the leader, all in the isr list, and min.isr is 2.
>>>>>
>>>>> 1. Say there is a network partition and both A and B are fenced off. C
>>>>> is elected as the new leader, it shrinks its isr list to only {C}; from 
>>>>> A's
>>>>> point of view it does not know it becomes the "ghost" and no longer the
>>>>> leader, all it does is shrinking the isr list to {A, B}.
>>>>>
>>>>> 2. At this time, any new writes with ack=-1 to C will not be acked,
>>>>> since from C's pov there is only one replica. This is correct.
>>>>>
>>>>> 3. However, any writes that are send to A (NOTE this is totally
>>>>> possible, since producers would only refresh metadata periodically,
>>>>> additionally if they happen to ask A or B they will get the stale metadata
>>>>> that A's still the leader), since A thinks that isr list is {A, B} and as
>>>>> long as B has replicated the message, A can acked the produce.
>>>>>
>>>>>     This is not correct behavior, since when network heals, A would
>>>>> realize it is not the leader and will truncate its log. And hence as a
>>>>> result the acked records are lost, violating Kafka's guarantees. And
>>>>> KIP-232 would not help preventing this scenario.
>>>>>
>>>>>
>>>>> Although one can argue that, with 3 replicas and min.isr set to 2,
>>>>> Kafka is guaranteeing to tolerate only one failure, while the above
>>>>> scenario is actually two concurrent failures (both A and B are considered
>>>>> wedged), this is still a regression to the current version.
>>>>>
>>>>> So to resolve this issue, I'd propose we can change the semantics in
>>>>> the following way (this is only slightly different from your proposal):
>>>>>
>>>>>
>>>>> 1. Add one more value to client-side acks config:
>>>>>
>>>>>    0: no acks needed at all.
>>>>>    1: ack from the leader.
>>>>>    all: ack from ALL the ISR replicas AND that current number of isr
>>>>> replicas has to be no smaller than {min.isr} (i.e. not changing this
>>>>> semantic).
>>>>>    quorum: this is the new value, it requires ack from enough number
>>>>> of ISR replicas no smaller than majority of the replicas AND no smaller
>>>>> than {min.isr}.
>>>>>
>>>>> 2. Clarify in the docs that if a user wants to tolerate X failures,
>>>>> she needs to set client acks=all or acks=quorum (better tail latency than
>>>>> "all") with broker {min.sir} to be X+1; however, "all" is not necessarily
>>>>> stronger than "quorum":
>>>>>
>>>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list
>>>>> of scenarios:
>>>>>
>>>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>>>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>>>>> c. ISR list has 1: "all" and "quorum" would not ack.
>>>>>
>>>>> If {min.isr} is set to 1, interestingly, here would be the list of
>>>>> scenarios:
>>>>>
>>>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>>>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>>>>> c. ISR list has 1: "all" waits for leader to return, while "quorum"
>>>>> would not ack (because it requires that number > {min.isr}, AND >=
>>>>> {majority of num.replicas}, so its actually stronger than "all").
>>>>>
>>>>>
>>>>> WDYT?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>>>
>>>>>> Hey Litao,
>>>>>>
>>>>>> Not sure there will be an easy way to select the broker with highest
>>>>>> LEO
>>>>>> without losing acknowledged message. In case it is useful, here is
>>>>>> another
>>>>>> idea. Maybe we can have a mechanism to turn switch between the
>>>>>> min.isr and
>>>>>> isr set for determining when to acknowledge a message. Controller can
>>>>>> probably use RPC to request the current leader to use isr set before
>>>>>> it
>>>>>> sends LeaderAndIsrRequest for leadership change.
>>>>>>
>>>>>> Regards,
>>>>>> Dong
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
>>>>>> <litao.d...@airbnb.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>> > Thanks Jun for the detailed feedback.
>>>>>> >
>>>>>> > Yes, for #1, I mean the live replicas from the ISR.
>>>>>> >
>>>>>> > Actually, I believe for all of the 4 new leader election strategies
>>>>>> > (offline, reassign, preferred replica and controlled shutdown), we
>>>>>> need to
>>>>>> > make corresponding changes. Will document the details in the KIP.
>>>>>> >
>>>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote:
>>>>>> >
>>>>>> > > Hi, Litao,
>>>>>> > >
>>>>>> > > Thanks for the KIP. Good proposal. A few comments below.
>>>>>> > >
>>>>>> > > 1. The KIP says "select the live replica with the largest LEO".
>>>>>> I guess
>>>>>> > > what you meant is selecting the live replicas in ISR with the
>>>>>> largest
>>>>>> > LEO?
>>>>>> > >
>>>>>> > > 2. I agree that we can probably just reuse the current min.isr
>>>>>> > > configuration, but with a slightly different semantics.
>>>>>> Currently, if
>>>>>> > > min.isr is set, a user expects the record to be in at least
>>>>>> min.isr
>>>>>> > > replicas on successful ack. This KIP guarantees this too. Most
>>>>>> people are
>>>>>> > > probably surprised that currently the ack is only sent back after
>>>>>> all
>>>>>> > > replicas in ISR receive the record. This KIP will change the ack
>>>>>> to only
>>>>>> > > wait on min.isr replicas, which matches the user's expectation
>>>>>> and gives
>>>>>> > > better latency. Currently, we guarantee no data loss if there are
>>>>>> fewer
>>>>>> > > than replication factor failures. The KIP changes that to fewer
>>>>>> than
>>>>>> > > min.isr failures. The latter probably matches the user
>>>>>> expectation.
>>>>>> > >
>>>>>> > > 3. I agree that the new leader election process is a bit more
>>>>>> > complicated.
>>>>>> > > The controller now needs to contact all replicas in ISR to
>>>>>> determine who
>>>>>> > > has the longest log. However, this happens infrequently. So, it's
>>>>>> > probably
>>>>>> > > worth doing for the better latency in #2.
>>>>>> > >
>>>>>> > > 4. We have to think through the preferred leader election process.
>>>>>> > > Currently, the first assigned replica is preferred for load
>>>>>> balancing.
>>>>>> > > There is a process to automatically move the leader to the
>>>>>> preferred
>>>>>> > > replica when it's in sync. The issue is that the preferred
>>>>>> replica may no
>>>>>> > > be the replica with the longest log. Naively switching to the
>>>>>> preferred
>>>>>> > > replica may cause data loss when there are actually fewer
>>>>>> failures than
>>>>>> > > configured min.isr. One way to address this issue is to do the
>>>>>> following
>>>>>> > > steps during preferred leader election: (a) controller sends an
>>>>>> RPC
>>>>>> > request
>>>>>> > > to the current leader; (b) the current leader stops taking new
>>>>>> writes
>>>>>> > > (sending a new error code to the clients) and returns its LEO
>>>>>> (call it L)
>>>>>> > > to the controller; (c) the controller issues an RPC request to the
>>>>>> > > preferred replica and waits its LEO to reach L; (d) the controller
>>>>>> > changes
>>>>>> > > the leader to the preferred replica.
>>>>>> > >
>>>>>> > > Jun
>>>>>> > >
>>>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
>>>>>> > <litao.d...@airbnb.com.invalid
>>>>>> > > >
>>>>>> > > wrote:
>>>>>> > >
>>>>>> > > > Sorry folks, just realized I didn't use the correct thread
>>>>>> format for
>>>>>> > the
>>>>>> > > > discussion. I started this new one and copied all of the
>>>>>> responses from
>>>>>> > > the
>>>>>> > > > old one.
>>>>>> > > >
>>>>>> > > > @Dong
>>>>>> > > > It makes sense to just use the min.insync.replicas instead of
>>>>>> > > introducing a
>>>>>> > > > new config, and we must make this change together with the
>>>>>> LEO-based
>>>>>> > new
>>>>>> > > > leader election.
>>>>>> > > >
>>>>>> > > > @Xi
>>>>>> > > > I thought about embedding the LEO information to the
>>>>>> ControllerContext,
>>>>>> > > > didn't find a way. Using RPC will make the leader election
>>>>>> period
>>>>>> > longer
>>>>>> > > > and this should happen in very rare cases (broker failure,
>>>>>> controlled
>>>>>> > > > shutdown, preferred leader election and partition reassignment).
>>>>>> > > >
>>>>>> > > > @Jeff
>>>>>> > > > The current leader election is to pick the first replica from
>>>>>> AR which
>>>>>> > > > exists both in the live brokers and ISR sets. I agree with you
>>>>>> about
>>>>>> > > > changing the current/default behavior will cause many
>>>>>> confusions, and
>>>>>> > > > that's the reason the title is "Add Support ...". In this case,
>>>>>> we
>>>>>> > > wouldn't
>>>>>> > > > break any current promises and provide a separate option for
>>>>>> our user.
>>>>>> > > > In terms of KIP-250, I feel it is more like the "Semisynchronous
>>>>>> > > > Replication" in the MySQL world, and yes it is something
>>>>>> between acks=1
>>>>>> > > and
>>>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227
>>>>>> are
>>>>>> > > > two orthogonal improvements. KIP-227 is to improve the
>>>>>> replication
>>>>>> > > protocol
>>>>>> > > > (like the introduction of parallel replication in MySQL), and
>>>>>> KIP-250
>>>>>> > is
>>>>>> > > an
>>>>>> > > > enhancement for the replication architecture (sync, semi-sync,
>>>>>> and
>>>>>> > > async).
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > Dong Lin
>>>>>> > > >
>>>>>> > > > > Thanks for the KIP. I have one quick comment before you
>>>>>> provide more
>>>>>> > > > detail
>>>>>> > > > > on how to select the leader with the largest LEO.
>>>>>> > > > > Do you think it would make sense to change the default
>>>>>> behavior of
>>>>>> > > > acks=-1,
>>>>>> > > > > such that broker will acknowledge the message once the
>>>>>> message has
>>>>>> > been
>>>>>> > > > > replicated to min.insync.replicas brokers? This would allow
>>>>>> us to
>>>>>> > keep
>>>>>> > > > the
>>>>>> > > > > same durability guarantee, improve produce request latency
>>>>>> without
>>>>>> > > > having a
>>>>>> > > > > new config.
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > Hu Xi
>>>>>> > > >
>>>>>> > > > > Currently,  with holding the assigned replicas(AR) for all
>>>>>> > partitions,
>>>>>> > > > > controller is now able to elect new leaders by selecting the
>>>>>> first
>>>>>> > > > replica
>>>>>> > > > > of AR which occurs in both live replica set and ISR. If
>>>>>> switching to
>>>>>> > > the
>>>>>> > > > > LEO-based strategy, controller context might need to be
>>>>>> enriched or
>>>>>> > > > > augmented to store those values.  If retrieving those LEOs
>>>>>> real-time,
>>>>>> > > > > several rounds of RPCs are unavoidable which seems to violate
>>>>>> the
>>>>>> > > > original
>>>>>> > > > > intention of this KIP.​
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > Jeff Widman
>>>>>> > > >
>>>>>> > > > > I agree with Dong, we should see if it's possible to change
>>>>>> the
>>>>>> > default
>>>>>> > > > > behavior so that as soon as min.insync.replicas brokers
>>>>>> respond than
>>>>>> > > the
>>>>>> > > > > broker acknowledges the message back to the client without
>>>>>> waiting
>>>>>> > for
>>>>>> > > > > additional brokers who are in the in-sync replica list to
>>>>>> respond. (I
>>>>>> > > > > actually thought it already worked this way).
>>>>>> > > > > As you implied in the KIP though, changing this default
>>>>>> introduces a
>>>>>> > > > weird
>>>>>> > > > > state where an in-sync follower broker is not guaranteed to
>>>>>> have a
>>>>>> > > > > message...
>>>>>> > > > > So at a minimum, the leadership failover algorithm would need
>>>>>> to be
>>>>>> > > sure
>>>>>> > > > to
>>>>>> > > > > pick the most up-to-date follower... I thought it already did
>>>>>> this?
>>>>>> > > > > But if multiple brokers fail in quick succession, then a
>>>>>> broker that
>>>>>> > > was
>>>>>> > > > in
>>>>>> > > > > the ISR could become a leader without ever receiving the
>>>>>> message...
>>>>>> > > > > violating the current promises of unclean.leader.election.
>>>>>> > > > enable=False...
>>>>>> > > > > so changing the default might be not be a tenable solution.
>>>>>> > > > > What also jumped out at me in the KIP was the goal of
>>>>>> reducing p999
>>>>>> > > when
>>>>>> > > > > setting replica lag time at 10 seconds(!!)... I understand
>>>>>> the desire
>>>>>> > > to
>>>>>> > > > > minimize frequent ISR shrink/expansion, as I face this same
>>>>>> issue at
>>>>>> > my
>>>>>> > > > day
>>>>>> > > > > job. But what you're essentially trying to do here is create
>>>>>> an
>>>>>> > > > additional
>>>>>> > > > > replication state that is in-between acks=1 and acks = ISR to
>>>>>> paper
>>>>>> > > over
>>>>>> > > > a
>>>>>> > > > > root problem of ISR shrink/expansion...
>>>>>> > > > > I'm just wary of shipping more features (and more operational
>>>>>> > > confusion)
>>>>>> > > > if
>>>>>> > > > > it's only addressing the symptom rather than the root cause.
>>>>>> For
>>>>>> > > example,
>>>>>> > > > > my day job's problem is we run a very high number of
>>>>>> low-traffic
>>>>>> > > > > partitions-per-broker, so the fetch requests hit many
>>>>>> partitions
>>>>>> > before
>>>>>> > > > > they fill. Solving that requires changing our architecture +
>>>>>> making
>>>>>> > the
>>>>>> > > > > replication protocol more efficient (KIP-227).
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
>>>>>> litao.d...@airbnb.com>
>>>>>> > > > wrote:
>>>>>> > > >
>>>>>> > > > > Hey folks. I would like to add a feature to support the
>>>>>> quorum-based
>>>>>> > > > > acknowledgment for the producer request. We have been running
>>>>>> a
>>>>>> > > modified
>>>>>> > > > > version of Kafka on our testing cluster for weeks, the
>>>>>> improvement of
>>>>>> > > > P999
>>>>>> > > > > is significant with very stable latency. Additionally, I have
>>>>>> a
>>>>>> > > proposal
>>>>>> > > > to
>>>>>> > > > > achieve a similar data durability as with the
>>>>>> insync.replicas-based
>>>>>> > > > > acknowledgment through LEO-based leader election.
>>>>>> > > > >
>>>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>
>
> --
> -- Guozhang
>

Reply via email to