Thanks Jun. Further replies are in-lined.

On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> wrote:

> Hi, Guozhang,
>
> Thanks for the reply. A few more replies inlined below.
>
> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Jun,
> >
> > Thanks for your comments! I'm replying inline below:
> >
> > On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote:
> >
> > > 101. Bootstrapping related issues.
> > > 101.1 Currently, we support auto broker id generation. Is this
> supported
> > > for bootstrap brokers?
> > >
> >
> > The vote ids would just be the broker ids. "bootstrap.servers" would be
> > similar to what client configs have today, where "quorum.voters" would be
> > pre-defined config values.
> >
> >
> My question was on the auto generated broker id. Currently, the broker can
> choose to have its broker Id auto generated. The generation is done through
> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id is
> auto generated. "quorum.voters" also can't be set statically if broker ids
> are auto generated.
>
> Jason has explained some ideas that we've discussed so far, the reason we
intentional did not include them so far is that we feel it is out-side the
scope of KIP-595. Under the umbrella of KIP-500 we should definitely
address them though.

On the high-level, our belief is that "joining a quorum" and "joining (or
more specifically, registering brokers in) the cluster" would be
de-coupled a bit, where the former should be completed before we do the
latter. More specifically, assuming the quorum is already up and running,
after the newly started broker found the leader of the quorum it can send a
specific RegisterBroker request including its listener / protocol / etc,
and upon handling it the leader can send back the uniquely generated broker
id to the new broker, while also executing the "startNewBroker" callback as
the controller.


>
> > > 102. Log compaction. One weak spot of log compaction is for the
> consumer
> > to
> > > deal with deletes. When a key is deleted, it's retained as a tombstone
> > > first and then physically removed. If a client misses the tombstone
> > > (because it's physically removed), it may not be able to update its
> > > metadata properly. The way we solve this in Kafka is based on a
> > > configuration (log.cleaner.delete.retention.ms) and we expect a
> consumer
> > > having seen an old key to finish reading the deletion tombstone within
> > that
> > > time. There is no strong guarantee for that since a broker could be
> down
> > > for a long time. It would be better if we can have a more reliable way
> of
> > > dealing with deletes.
> > >
> >
> > We propose to capture this in the "FirstDirtyOffset" field of the quorum
> > record fetch response: the offset is the maximum offset that log
> compaction
> > has reached up to. If the follower has fetched beyond this offset it
> means
> > itself is safe hence it has seen all records up to that offset. On
> getting
> > the response, the follower can then decide if its end offset actually
> below
> > that dirty offset (and hence may miss some tombstones). If that's the
> case:
> >
> > 1) Naively, it could re-bootstrap metadata log from the very beginning to
> > catch up.
> > 2) During that time, it would refrain itself from answering
> MetadataRequest
> > from any clients.
> >
> >
> I am not sure that the "FirstDirtyOffset" field fully addresses the issue.
> Currently, the deletion tombstone is not removed immediately after a round
> of cleaning. It's removed after a delay in a subsequent round of cleaning.
> Consider an example where a key insertion is at offset 200 and a deletion
> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
> follower/observer fetches from offset 0  and fetches the key at offset 200.
> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
> tombstone at 400 is physically removed. The follower/observer continues the
> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
> and declares its metadata as ready. However, its metadata could be stale
> since it actually misses the deletion of the key.
>
> Yeah good question, I should have put more details in my explanation :)

The idea is that we will adjust the log compaction for this raft based
metadata log: before more details to be explained, since we have two types
of "watermarks" here, whereas in Kafka the watermark indicates where every
replica have replicated up to and in Raft the watermark indicates where the
majority of replicas (here only indicating voters of the quorum, not
counting observers) have replicated up to, let's call them Kafka watermark
and Raft watermark. For this special log, we would maintain both watermarks.

When log compacting on the leader, we would only compact up to the Kafka
watermark, i.e. if there is at least one voter who have not replicated an
entry, it would not be compacted. The "dirty-offset" is the offset that
we've compacted up to and is communicated to other voters, and the other
voters would also compact up to this value --- i.e. the difference here is
that instead of letting each replica doing log compaction independently,
we'll have the leader to decide upon which offset to compact to, and
propagate this value to others to follow, in a more coordinated manner.
Also note when there are new voters joining the quorum who has not
replicated up to the dirty-offset, of because of other issues they
truncated their logs to below the dirty-offset, they'd have to re-bootstrap
from the beginning, and during this period of time the leader learned about
this lagging voter would not advance the watermark (also it would not
decrement it), and hence not compacting either, until the voter(s) has
caught up to that dirty-offset.

So back to your example above, before the bootstrap voter gets to 300 no
log compaction would happen on the leader; and until later when the voter
have got to beyond 400 and hence replicated that tombstone, the log
compaction would possibly get to that tombstone and remove it. Say later it
the leader's log compaction reaches 500, it can send this back to the voter
who can then also compact locally up to 500.


> > > 105. Quorum State: In addition to VotedId, do we need the epoch
> > > corresponding to VotedId? Over time, the same broker Id could be voted
> in
> > > different generations with different epoch.
> > >
> > >
> > Hmm, this is a good point. Originally I think the "LeaderEpoch" field in
> > that file is corresponding to the "latest known leader epoch", not the
> > "current leader epoch". For example, if the current epoch is N, and then
> a
> > vote-request with epoch N+1 is received and the voter granted the vote
> for
> > it, then it means for this voter it knows the "latest epoch" is N + 1
> > although it is unknown if that sending candidate will indeed become the
> new
> > leader (which would only be notified via begin-quorum request). However,
> > when persisting the quorum state, we would encode leader-epoch to N+1,
> > while the leaderId to be the older leader.
> >
> > But now thinking about this a bit more, I feel we should use two separate
> > epochs, one for the "lates known" and one for the "current" to pair with
> > the leaderId. I will update the wiki page.
> >
> >
> Hmm, it's kind of weird to bump up the leader epoch before the new leader
> is actually elected, right.
>
>
> > > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to
> indicate
> > > that the follower has fetched from an invalid offset and should
> truncate
> > to
> > > the offset/epoch indicated in the response." Observers can't truncate
> > their
> > > logs. What should they do with OFFSET_OUT_OF_RANGE?
> > >
> > >
> > I'm not sure if I understand your question? Observers should still be
> able
> > to truncate their logs as well.
> >
> >
> Hmm, I thought only the quorum nodes have local logs and observers don't?
>
> > 107. "The leader will continue sending BeginQuorumEpoch to each known
> > voter
> > > until it has received its endorsement." If a voter is down for a long
> > time,
> > > sending BeginQuorumEpoch seems to add unnecessary overhead. Similarly,
> > if a
> > > follower stops sending FetchQuorumRecords, does the leader keep sending
> > > BeginQuorumEpoch?
> > >
> >
> > Regarding BeginQuorumEpoch: that is a good point. The begin-quorum-epoch
> > request is for voters to quickly get the new leader information; however
> > even if they do not get them they can still eventually learn about that
> > from others via gossiping FindQuorum. I think we can adjust the logic to
> > e.g. exponential back-off or with a limited num.retries.
> >
> > Regarding FetchQuorumRecords: if the follower sends FetchQuorumRecords
> > already, it means that follower already knows that the broker is the
> > leader, and hence we can stop retrying BeginQuorumEpoch; however it is
> > possible that after a follower sends FetchQuorumRecords already, suddenly
> > it stops send it (possibly because it learned about a higher epoch
> leader),
> > and hence this broker may be a "zombie" leader and we propose to use the
> > fetch.timeout to let the leader to try to verify if it has already been
> > stale.
> >
> >
> It just seems that we should handle these two cases in a consistent way?
>
> Yes I agree, on the leader's side, the FetchQuorumRecords from a follower
could mean that we no longer needs to send BeginQuorumEpoch anymore --- and
it is already part of our current implementations in
https://github.com/confluentinc/kafka/commits/kafka-raft


> Thanks,
>
> Jun
>
> >
> > >
> > > Jun
> > >
> > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Leonard,
> > > >
> > > > Thanks for your comments, I'm relying in line below:
> > > >
> > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <w...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Kafka developers,
> > > > >
> > > > > It's great to see this proposal and it took me some time to finish
> > > > reading
> > > > > it.
> > > > >
> > > > > And I have the following questions about the Proposal:
> > > > >
> > > > >    - How do we plan to test this design to ensure its correctness?
> Or
> > > > more
> > > > >    broadly, how do we ensure that our new ‘pull’ based model is
> > > > functional
> > > > > and
> > > > >    correct given that it is different from the original RAFT
> > > > implementation
> > > > >    which has formal proof of correctness?
> > > > >
> > > >
> > > > We have two planned verifications on the correctness and liveness of
> > the
> > > > design. One is via model verification (TLA+)
> > > > https://github.com/guozhangwang/kafka-specification
> > > >
> > > > Another is via the concurrent simulation tests
> > > >
> > > >
> > >
> >
> https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91
> > > >
> > > >    - Have we considered any sensible defaults for the configuration,
> > i.e.
> > > > >    all the election timeout, fetch time out, etc.? Or we want to
> > leave
> > > > > this to
> > > > >    a later stage when we do the performance testing, etc.
> > > > >
> > > >
> > > > This is a good question, the reason we did not set any default values
> > for
> > > > the timeout configurations is that we think it may take some
> > benchmarking
> > > > experiments to get these defaults right. Some high-level principles
> to
> > > > consider: 1) the fetch.timeout should be around the same scale with
> zk
> > > > session timeout, which is now 18 seconds by default -- in practice
> > we've
> > > > seen unstable networks having more than 10 secs of transient
> > > connectivity,
> > > > 2) the election.timeout, however, should be smaller than the fetch
> > > timeout
> > > > as is also suggested as a practical optimization in literature:
> > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf
> > > >
> > > > Some more discussions can be found here:
> > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081
> > > >
> > > >
> > > > >    - Have we considered piggybacking `BeginQuorumEpoch` with the `
> > > > >    FetchQuorumRecords`? I might be missing something obvious but I
> am
> > > > just
> > > > >    wondering why don’t we just use the `FindQuorum` and
> > > > > `FetchQuorumRecords`
> > > > >    APIs and remove the `BeginQuorumEpoch` API?
> > > > >
> > > >
> > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter
> > > > followers, while FindQuorum / Fetch are sent from follower to leader.
> > > > Arguably one can eventually realize the new leader and epoch via
> > > gossiping
> > > > FindQuorum, but that could in practice require a long delay. Having a
> > > > leader -> other voters request helps the new leader epoch to be
> > > propagated
> > > > faster under a pull model.
> > > >
> > > >
> > > > >    - And about the `FetchQuorumRecords` response schema, in the
> > > `Records`
> > > > >    field of the response, is it just one record or all the records
> > > > starting
> > > > >    from the FetchOffset? It seems a lot more efficient if we sent
> all
> > > the
> > > > >    records during the bootstrapping of the brokers.
> > > > >
> > > >
> > > > Yes the fetching is batched: FetchOffset is just the starting offset
> of
> > > the
> > > > batch of records.
> > > >
> > > >
> > > > >    - Regarding the disruptive broker issues, does our pull based
> > model
> > > > >    suffer from it? If so, have we considered the Pre-Vote stage? If
> > > not,
> > > > > why?
> > > > >
> > > > >
> > > > The disruptive broker is stated in the original Raft paper which is
> the
> > > > result of the push model design. Our analysis showed that with the
> pull
> > > > model it is no longer an issue.
> > > >
> > > >
> > > > > Thanks a lot for putting this up, and I hope that my questions can
> be
> > > of
> > > > > some value to make this KIP better.
> > > > >
> > > > > Hope to hear from you soon!
> > > > >
> > > > > Best wishes,
> > > > > Leonard
> > > > >
> > > > >
> > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > It's amazing to see this coming together :)
> > > > > >
> > > > > > I haven't had a chance to read in detail, but I read the outline
> > and
> > > a
> > > > > few
> > > > > > things jumped out at me.
> > > > > >
> > > > > > First, for every epoch that is 32 bits rather than 64, I sort of
> > > wonder
> > > > > if
> > > > > > that's a good long-term choice.  I keep reading about stuff like
> > > this:
> > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 .
> Obviously,
> > > > that
> > > > > > JIRA is about zxid, which increments much faster than we expect
> > these
> > > > > > leader epochs to, but it would still be good to see some rough
> > > > > calculations
> > > > > > about how long 32 bits (or really, 31 bits) will last us in the
> > cases
> > > > > where
> > > > > > we're using it, and what the space savings we're getting really
> is.
> > > It
> > > > > > seems like in most cases the tradeoff may not be worth it?
> > > > > >
> > > > > > Another thing I've been thinking about is how we do
> > bootstrapping.  I
> > > > > > would prefer to be in a world where formatting a new Kafka node
> > was a
> > > > > first
> > > > > > class operation explicitly initiated by the admin, rather than
> > > > something
> > > > > > that happened implicitly when you started up the broker and
> things
> > > > > "looked
> > > > > > blank."
> > > > > >
> > > > > > The first problem is that things can "look blank" accidentally if
> > the
> > > > > > storage system is having a bad day.  Clearly in the non-Raft
> world,
> > > > this
> > > > > > leads to data loss if the broker that is (re)started this way was
> > the
> > > > > > leader for some partitions.
> > > > > >
> > > > > > The second problem is that we have a bit of a chicken and egg
> > problem
> > > > > with
> > > > > > certain configuration keys.  For example, maybe you want to
> > configure
> > > > > some
> > > > > > connection security settings in your cluster, but you don't want
> > them
> > > > to
> > > > > > ever be stored in a plaintext config file.  (For example, SCRAM
> > > > > passwords,
> > > > > > etc.)  You could use a broker API to set the configuration, but
> > that
> > > > > brings
> > > > > > up the chicken and egg problem.  The broker needs to be
> configured
> > to
> > > > > know
> > > > > > how to talk to you, but you need to configure it before you can
> > talk
> > > to
> > > > > > it.  Using an external secret manager like Vault is one way to
> > solve
> > > > > this,
> > > > > > but not everyone uses an external secret manager.
> > > > > >
> > > > > > quorum.voters seems like a similar configuration key.  In the
> > current
> > > > > KIP,
> > > > > > this is only read if there is no other configuration specifying
> the
> > > > > quorum
> > > > > > voter set.  If we had a kafka.mkfs command, we wouldn't need this
> > key
> > > > > > because we could assume that there was always quorum information
> > > stored
> > > > > > locally.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote:
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I'd like to start a discussion on KIP-595:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum
> > > > > > .
> > > > > > > This proposal specifies a Raft protocol to ultimately replace
> > > > Zookeeper
> > > > > > > as
> > > > > > > documented in KIP-500. Please take a look and share your
> > thoughts.
> > > > > > >
> > > > > > > A few minor notes to set the stage a little bit:
> > > > > > >
> > > > > > > - This KIP does not specify the structure of the messages used
> to
> > > > > > represent
> > > > > > > metadata in Kafka, nor does it specify the internal API that
> will
> > > be
> > > > > used
> > > > > > > by the controller. Expect these to come in later proposals.
> Here
> > we
> > > > are
> > > > > > > primarily concerned with the replication protocol and basic
> > > > operational
> > > > > > > mechanics.
> > > > > > > - We expect many details to change as we get closer to
> > integration
> > > > with
> > > > > > > the controller. Any changes we make will be made either as
> > > amendments
> > > > > to
> > > > > > > this KIP or, in the case of larger changes, as new proposals.
> > > > > > > - We have a prototype implementation which I will put online
> > within
> > > > the
> > > > > > > next week which may help in understanding some details. It has
> > > > > diverged a
> > > > > > > little bit from our proposal, so I am taking a little time to
> > bring
> > > > it
> > > > > in
> > > > > > > line. I'll post an update to this thread when it is available
> for
> > > > > review.
> > > > > > >
> > > > > > > Finally, I want to mention that this proposal was drafted by
> > > myself,
> > > > > > Boyang
> > > > > > > Chen, and Guozhang Wang.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jason
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Leonard Ge
> > > > > Software Engineer Intern - Confluent
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to