Hello Tao,

No I was not proposing to change the mechanism of acks=all, and only
mentioning that today even with acks=all the tolerance of failures is
theoretically still bounded by min.isr settings though we do require all
replicas in ISR (which may be larger than min.isr) to replicate before
responding; this is what Jun mentioned may surprise many users today. I
think with an additional "acks=quorum" can help resolve this, by requiring
the num.acks >= majority (to make sure consistency is guaranteed with at
most (X-1) / 2 failures with X number of replicas) AND num.acks >= min.isr
(to specify if we want tolerate more failures than (X-1) / 2).

The question then is, whether acks=all is still useful with introduced
"quorum": if it is not, we can just replace the current semantics of "all"
and document it. The example that we gave above, demonstrate that
"acks=all" itself may still be useful even with the introduction of
"quorum" since that scenario can be avoided by acks=all, but not
acks=quorum as it requires ALL ISR replicas to replicate even if that
number is larger than {min.isr} and also larger than the majority number
(and if A is trying to shrink its ISR from {A,B,C} to {A,B} it will fail
the ZK write since epoch has been incremented). Hence my proposal is to add
a new config than replacing current semantics of "all".


Guozhang


On Sat, Feb 3, 2018 at 2:45 AM, tao xiao <xiaotao...@gmail.com> wrote:

> Hi Guozhang,
>
> Are you proposing changing semantic of ack=all to acknowledge message only
> after all replicas (not all ISRs, which is what Kafka currently is doing)
> have committed the message? This is equivalent to setting min.isr=number of
> replicas, which makes ack=all much stricter than what Kafka has right now.
> I think this may introduce surprise to users too as producer will not
> succeed in producing a message to Kafka when one of the followers is down
>
> On Sat, 3 Feb 2018 at 15:26 Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Dong,
> >
> > Could you elaborate a bit more how controller could affect leaders to
> > switch between all and quorum?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Guazhang,
> > >
> > > Got it. Thanks for the detailed explanation. I guess my point is that
> we
> > > can probably achieve the best of both worlds, i.e. maintain the
> existing
> > > behavior of ack="all" while improving the tail latency.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Yes, in terms of fault tolerance "quorum" does not do better than
> "all",
> > >> as I said, with {min.isr} to X+1 Kafka is able to tolerate X failures
> > only.
> > >> So if A and B are partitioned off at the same time, then there are two
> > >> concurrent failures and we do not guarantee all acked messages will be
> > >> retained.
> > >>
> > >> The goal of my approach is to maintain the behavior of ack="all",
> which
> > >> happen to do better than what Kafka is actually guaranteed: when both
> A
> > and
> > >> B are partitioned off, produced records will not be acked since "all"
> > >> requires all replicas (not only ISRs, my previous email has an
> incorrect
> > >> term) are required. This is doing better than tolerating X failures,
> > which
> > >> I was proposing to keep, so that we would not introduce any regression
> > >> "surprises" to users who are already using "all". In other words,
> > "quorum"
> > >> is trading a bit of failure tolerance that is strictly defined on
> > min.isr
> > >> for better tail latency.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >>
> > >>> Hey Guozhang,
> > >>>
> > >>> According to the new proposal, with 3 replicas, min.isr=2 and
> > >>> acks="quorum", it seems that acknowledged messages can still be
> > truncated
> > >>> in the network partition scenario you mentioned, right? So I guess
> the
> > goal
> > >>> is for some user to achieve better tail latency at the cost of
> > potential
> > >>> message loss?
> > >>>
> > >>> If this is the case, then I think it may be better to adopt an
> approach
> > >>> where controller dynamically turn on/off this optimization. This
> > provides
> > >>> user with peace of mind (i.e. no message loss) while still reducing
> > tail
> > >>> latency. What do you think?
> > >>>
> > >>> Thanks,
> > >>> Dong
> > >>>
> > >>>
> > >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hello Litao,
> > >>>>
> > >>>> Just double checking on the leader election details, do you have
> time
> > >>>> to complete the proposal on that part?
> > >>>>
> > >>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
> > >>>> discussion thread that Dong is working on, I figured it is worth
> > pointing
> > >>>> out here with a tentative solution:
> > >>>>
> > >>>>
> > >>>> ```
> > >>>> Currently, if the producer uses acks=-1, a write will only succeed
> if
> > >>>> the write is received by all in-sync replicas (i.e., committed).
> This
> > >>>> is true even when min.isr is set since we first wait for a message
> to
> > >>>> be committed and then check the min.isr requirement. KIP-250 may
> > >>>> change that, but we can discuss the implication there.
> > >>>> ```
> > >>>>
> > >>>> The caveat is that, if we change the acking semantics in KIP-250
> that
> > >>>> we will only requires num of {min.isr} to acknowledge a produce,
> then
> > the
> > >>>> above scenario will have a caveat: imagine you have {A, B, C}
> > replicas of a
> > >>>> partition with A as the leader, all in the isr list, and min.isr is
> 2.
> > >>>>
> > >>>> 1. Say there is a network partition and both A and B are fenced
> off. C
> > >>>> is elected as the new leader, it shrinks its isr list to only {C};
> > from A's
> > >>>> point of view it does not know it becomes the "ghost" and no longer
> > the
> > >>>> leader, all it does is shrinking the isr list to {A, B}.
> > >>>>
> > >>>> 2. At this time, any new writes with ack=-1 to C will not be acked,
> > >>>> since from C's pov there is only one replica. This is correct.
> > >>>>
> > >>>> 3. However, any writes that are send to A (NOTE this is totally
> > >>>> possible, since producers would only refresh metadata periodically,
> > >>>> additionally if they happen to ask A or B they will get the stale
> > metadata
> > >>>> that A's still the leader), since A thinks that isr list is {A, B}
> > and as
> > >>>> long as B has replicated the message, A can acked the produce.
> > >>>>
> > >>>>     This is not correct behavior, since when network heals, A would
> > >>>> realize it is not the leader and will truncate its log. And hence
> as a
> > >>>> result the acked records are lost, violating Kafka's guarantees. And
> > >>>> KIP-232 would not help preventing this scenario.
> > >>>>
> > >>>>
> > >>>> Although one can argue that, with 3 replicas and min.isr set to 2,
> > >>>> Kafka is guaranteeing to tolerate only one failure, while the above
> > >>>> scenario is actually two concurrent failures (both A and B are
> > considered
> > >>>> wedged), this is still a regression to the current version.
> > >>>>
> > >>>> So to resolve this issue, I'd propose we can change the semantics in
> > >>>> the following way (this is only slightly different from your
> > proposal):
> > >>>>
> > >>>>
> > >>>> 1. Add one more value to client-side acks config:
> > >>>>
> > >>>>    0: no acks needed at all.
> > >>>>    1: ack from the leader.
> > >>>>    all: ack from ALL the ISR replicas AND that current number of isr
> > >>>> replicas has to be no smaller than {min.isr} (i.e. not changing this
> > >>>> semantic).
> > >>>>    quorum: this is the new value, it requires ack from enough number
> > of
> > >>>> ISR replicas no smaller than majority of the replicas AND no smaller
> > than
> > >>>> {min.isr}.
> > >>>>
> > >>>> 2. Clarify in the docs that if a user wants to tolerate X failures,
> > she
> > >>>> needs to set client acks=all or acks=quorum (better tail latency
> than
> > >>>> "all") with broker {min.sir} to be X+1; however, "all" is not
> > necessarily
> > >>>> stronger than "quorum":
> > >>>>
> > >>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list
> > of
> > >>>> scenarios:
> > >>>>
> > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of
> > them.
> > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> > >>>> c. ISR list has 1: "all" and "quorum" would not ack.
> > >>>>
> > >>>> If {min.isr} is set to 1, interestingly, here would be the list of
> > >>>> scenarios:
> > >>>>
> > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of
> > them.
> > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> > >>>> c. ISR list has 1: "all" waits for leader to return, while "quorum"
> > >>>> would not ack (because it requires that number > {min.isr}, AND >=
> > >>>> {majority of num.replicas}, so its actually stronger than "all").
> > >>>>
> > >>>>
> > >>>> WDYT?
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> Hey Litao,
> > >>>>>
> > >>>>> Not sure there will be an easy way to select the broker with
> highest
> > >>>>> LEO
> > >>>>> without losing acknowledged message. In case it is useful, here is
> > >>>>> another
> > >>>>> idea. Maybe we can have a mechanism to turn switch between the
> > min.isr
> > >>>>> and
> > >>>>> isr set for determining when to acknowledge a message. Controller
> can
> > >>>>> probably use RPC to request the current leader to use isr set
> before
> > it
> > >>>>> sends LeaderAndIsrRequest for leadership change.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Dong
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
> > >>>>> <litao.d...@airbnb.com.invalid>
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Thanks Jun for the detailed feedback.
> > >>>>> >
> > >>>>> > Yes, for #1, I mean the live replicas from the ISR.
> > >>>>> >
> > >>>>> > Actually, I believe for all of the 4 new leader election
> strategies
> > >>>>> > (offline, reassign, preferred replica and controlled shutdown),
> we
> > >>>>> need to
> > >>>>> > make corresponding changes. Will document the details in the KIP.
> > >>>>> >
> > >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io>
> wrote:
> > >>>>> >
> > >>>>> > > Hi, Litao,
> > >>>>> > >
> > >>>>> > > Thanks for the KIP. Good proposal. A few comments below.
> > >>>>> > >
> > >>>>> > > 1. The KIP says "select the live replica with the largest LEO".
> > I
> > >>>>> guess
> > >>>>> > > what you meant is selecting the live replicas in ISR with the
> > >>>>> largest
> > >>>>> > LEO?
> > >>>>> > >
> > >>>>> > > 2. I agree that we can probably just reuse the current min.isr
> > >>>>> > > configuration, but with a slightly different semantics.
> > Currently,
> > >>>>> if
> > >>>>> > > min.isr is set, a user expects the record to be in at least
> > min.isr
> > >>>>> > > replicas on successful ack. This KIP guarantees this too. Most
> > >>>>> people are
> > >>>>> > > probably surprised that currently the ack is only sent back
> after
> > >>>>> all
> > >>>>> > > replicas in ISR receive the record. This KIP will change the
> ack
> > >>>>> to only
> > >>>>> > > wait on min.isr replicas, which matches the user's expectation
> > and
> > >>>>> gives
> > >>>>> > > better latency. Currently, we guarantee no data loss if there
> are
> > >>>>> fewer
> > >>>>> > > than replication factor failures. The KIP changes that to fewer
> > >>>>> than
> > >>>>> > > min.isr failures. The latter probably matches the user
> > expectation.
> > >>>>> > >
> > >>>>> > > 3. I agree that the new leader election process is a bit more
> > >>>>> > complicated.
> > >>>>> > > The controller now needs to contact all replicas in ISR to
> > >>>>> determine who
> > >>>>> > > has the longest log. However, this happens infrequently. So,
> it's
> > >>>>> > probably
> > >>>>> > > worth doing for the better latency in #2.
> > >>>>> > >
> > >>>>> > > 4. We have to think through the preferred leader election
> > process.
> > >>>>> > > Currently, the first assigned replica is preferred for load
> > >>>>> balancing.
> > >>>>> > > There is a process to automatically move the leader to the
> > >>>>> preferred
> > >>>>> > > replica when it's in sync. The issue is that the preferred
> > replica
> > >>>>> may no
> > >>>>> > > be the replica with the longest log. Naively switching to the
> > >>>>> preferred
> > >>>>> > > replica may cause data loss when there are actually fewer
> > failures
> > >>>>> than
> > >>>>> > > configured min.isr. One way to address this issue is to do the
> > >>>>> following
> > >>>>> > > steps during preferred leader election: (a) controller sends an
> > RPC
> > >>>>> > request
> > >>>>> > > to the current leader; (b) the current leader stops taking new
> > >>>>> writes
> > >>>>> > > (sending a new error code to the clients) and returns its LEO
> > >>>>> (call it L)
> > >>>>> > > to the controller; (c) the controller issues an RPC request to
> > the
> > >>>>> > > preferred replica and waits its LEO to reach L; (d) the
> > controller
> > >>>>> > changes
> > >>>>> > > the leader to the preferred replica.
> > >>>>> > >
> > >>>>> > > Jun
> > >>>>> > >
> > >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
> > >>>>> > <litao.d...@airbnb.com.invalid
> > >>>>> > > >
> > >>>>> > > wrote:
> > >>>>> > >
> > >>>>> > > > Sorry folks, just realized I didn't use the correct thread
> > >>>>> format for
> > >>>>> > the
> > >>>>> > > > discussion. I started this new one and copied all of the
> > >>>>> responses from
> > >>>>> > > the
> > >>>>> > > > old one.
> > >>>>> > > >
> > >>>>> > > > @Dong
> > >>>>> > > > It makes sense to just use the min.insync.replicas instead of
> > >>>>> > > introducing a
> > >>>>> > > > new config, and we must make this change together with the
> > >>>>> LEO-based
> > >>>>> > new
> > >>>>> > > > leader election.
> > >>>>> > > >
> > >>>>> > > > @Xi
> > >>>>> > > > I thought about embedding the LEO information to the
> > >>>>> ControllerContext,
> > >>>>> > > > didn't find a way. Using RPC will make the leader election
> > period
> > >>>>> > longer
> > >>>>> > > > and this should happen in very rare cases (broker failure,
> > >>>>> controlled
> > >>>>> > > > shutdown, preferred leader election and partition
> > reassignment).
> > >>>>> > > >
> > >>>>> > > > @Jeff
> > >>>>> > > > The current leader election is to pick the first replica from
> > AR
> > >>>>> which
> > >>>>> > > > exists both in the live brokers and ISR sets. I agree with
> you
> > >>>>> about
> > >>>>> > > > changing the current/default behavior will cause many
> > >>>>> confusions, and
> > >>>>> > > > that's the reason the title is "Add Support ...". In this
> case,
> > >>>>> we
> > >>>>> > > wouldn't
> > >>>>> > > > break any current promises and provide a separate option for
> > our
> > >>>>> user.
> > >>>>> > > > In terms of KIP-250, I feel it is more like the
> > "Semisynchronous
> > >>>>> > > > Replication" in the MySQL world, and yes it is something
> > between
> > >>>>> acks=1
> > >>>>> > > and
> > >>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and
> KIP-227
> > >>>>> are
> > >>>>> > > > two orthogonal improvements. KIP-227 is to improve the
> > >>>>> replication
> > >>>>> > > protocol
> > >>>>> > > > (like the introduction of parallel replication in MySQL), and
> > >>>>> KIP-250
> > >>>>> > is
> > >>>>> > > an
> > >>>>> > > > enhancement for the replication architecture (sync,
> semi-sync,
> > >>>>> and
> > >>>>> > > async).
> > >>>>> > > >
> > >>>>> > > >
> > >>>>> > > > Dong Lin
> > >>>>> > > >
> > >>>>> > > > > Thanks for the KIP. I have one quick comment before you
> > >>>>> provide more
> > >>>>> > > > detail
> > >>>>> > > > > on how to select the leader with the largest LEO.
> > >>>>> > > > > Do you think it would make sense to change the default
> > >>>>> behavior of
> > >>>>> > > > acks=-1,
> > >>>>> > > > > such that broker will acknowledge the message once the
> > message
> > >>>>> has
> > >>>>> > been
> > >>>>> > > > > replicated to min.insync.replicas brokers? This would allow
> > us
> > >>>>> to
> > >>>>> > keep
> > >>>>> > > > the
> > >>>>> > > > > same durability guarantee, improve produce request latency
> > >>>>> without
> > >>>>> > > > having a
> > >>>>> > > > > new config.
> > >>>>> > > >
> > >>>>> > > >
> > >>>>> > > > Hu Xi
> > >>>>> > > >
> > >>>>> > > > > Currently,  with holding the assigned replicas(AR) for all
> > >>>>> > partitions,
> > >>>>> > > > > controller is now able to elect new leaders by selecting
> the
> > >>>>> first
> > >>>>> > > > replica
> > >>>>> > > > > of AR which occurs in both live replica set and ISR. If
> > >>>>> switching to
> > >>>>> > > the
> > >>>>> > > > > LEO-based strategy, controller context might need to be
> > >>>>> enriched or
> > >>>>> > > > > augmented to store those values.  If retrieving those LEOs
> > >>>>> real-time,
> > >>>>> > > > > several rounds of RPCs are unavoidable which seems to
> violate
> > >>>>> the
> > >>>>> > > > original
> > >>>>> > > > > intention of this KIP.​
> > >>>>> > > >
> > >>>>> > > >
> > >>>>> > > > Jeff Widman
> > >>>>> > > >
> > >>>>> > > > > I agree with Dong, we should see if it's possible to change
> > the
> > >>>>> > default
> > >>>>> > > > > behavior so that as soon as min.insync.replicas brokers
> > >>>>> respond than
> > >>>>> > > the
> > >>>>> > > > > broker acknowledges the message back to the client without
> > >>>>> waiting
> > >>>>> > for
> > >>>>> > > > > additional brokers who are in the in-sync replica list to
> > >>>>> respond. (I
> > >>>>> > > > > actually thought it already worked this way).
> > >>>>> > > > > As you implied in the KIP though, changing this default
> > >>>>> introduces a
> > >>>>> > > > weird
> > >>>>> > > > > state where an in-sync follower broker is not guaranteed to
> > >>>>> have a
> > >>>>> > > > > message...
> > >>>>> > > > > So at a minimum, the leadership failover algorithm would
> need
> > >>>>> to be
> > >>>>> > > sure
> > >>>>> > > > to
> > >>>>> > > > > pick the most up-to-date follower... I thought it already
> did
> > >>>>> this?
> > >>>>> > > > > But if multiple brokers fail in quick succession, then a
> > >>>>> broker that
> > >>>>> > > was
> > >>>>> > > > in
> > >>>>> > > > > the ISR could become a leader without ever receiving the
> > >>>>> message...
> > >>>>> > > > > violating the current promises of unclean.leader.election.
> > >>>>> > > > enable=False...
> > >>>>> > > > > so changing the default might be not be a tenable solution.
> > >>>>> > > > > What also jumped out at me in the KIP was the goal of
> > reducing
> > >>>>> p999
> > >>>>> > > when
> > >>>>> > > > > setting replica lag time at 10 seconds(!!)... I understand
> > the
> > >>>>> desire
> > >>>>> > > to
> > >>>>> > > > > minimize frequent ISR shrink/expansion, as I face this same
> > >>>>> issue at
> > >>>>> > my
> > >>>>> > > > day
> > >>>>> > > > > job. But what you're essentially trying to do here is
> create
> > an
> > >>>>> > > > additional
> > >>>>> > > > > replication state that is in-between acks=1 and acks = ISR
> to
> > >>>>> paper
> > >>>>> > > over
> > >>>>> > > > a
> > >>>>> > > > > root problem of ISR shrink/expansion...
> > >>>>> > > > > I'm just wary of shipping more features (and more
> operational
> > >>>>> > > confusion)
> > >>>>> > > > if
> > >>>>> > > > > it's only addressing the symptom rather than the root
> cause.
> > >>>>> For
> > >>>>> > > example,
> > >>>>> > > > > my day job's problem is we run a very high number of
> > >>>>> low-traffic
> > >>>>> > > > > partitions-per-broker, so the fetch requests hit many
> > >>>>> partitions
> > >>>>> > before
> > >>>>> > > > > they fill. Solving that requires changing our architecture
> +
> > >>>>> making
> > >>>>> > the
> > >>>>> > > > > replication protocol more efficient (KIP-227).
> > >>>>> > > >
> > >>>>> > > >
> > >>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
> > >>>>> litao.d...@airbnb.com>
> > >>>>> > > > wrote:
> > >>>>> > > >
> > >>>>> > > > > Hey folks. I would like to add a feature to support the
> > >>>>> quorum-based
> > >>>>> > > > > acknowledgment for the producer request. We have been
> > running a
> > >>>>> > > modified
> > >>>>> > > > > version of Kafka on our testing cluster for weeks, the
> > >>>>> improvement of
> > >>>>> > > > P999
> > >>>>> > > > > is significant with very stable latency. Additionally, I
> > have a
> > >>>>> > > proposal
> > >>>>> > > > to
> > >>>>> > > > > achieve a similar data durability as with the
> > >>>>> insync.replicas-based
> > >>>>> > > > > acknowledgment through LEO-based leader election.
> > >>>>> > > > >
> > >>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
> > >>>>> > > > >
> > >>>>> > > >
> > >>>>> > >
> > >>>>> >
> > >>>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to