Hi Unmesh,

I think you're right that we should use a duration here rather than a time.  As 
you said, the clock on the controller will probably not match the one on the 
broker.  I have updated the KIP.

> > It's important to keep in mind that messages may be delayed in the
> > network, or arrive out of order.  When this happens, we will use the start
> > time specified in the request to determine if the request is stale.
> I am assuming that there will be a single TCP connection maintained between
> broker and active controller. So, there won't be any out of order requests?
> There will be a scenario of broker GC pause, which might cause connection
> timeout and broker might need to reestablish the connection. If the pause
> is too long, lease will expire and the heartbeat sent after the pause will
> be treated as a new registration (similar to restart case), and a new epoch
> number will be assigned to the broker.

I agree with the end of this paragraph, but not with the start :)

There can be out-of-order requests, since the broker will simply use a new TCP 
connection if the old one has problems.  This can happen for a variety of 
reasons.  I don't think GC pauses are the most common reason for this to 
happen.  It's more common to see issues issues in the network itself that 
result connections getting dropped from time to time.

So we have to assume that messages may arrive out of order, and possibly be 
delayed.  I added a note that heartbeat requests should be ignored if the 
metadata log offset they contain is smaller than a previous heartbeat.

> When the active controller fails, the new active controller needs to be
> sure that it considers all the known brokers as alive till the lease
> expiration interval.  Because registration.lease.timeout.ms, is configured
> on the controller, the new active controller will extend all the leases by
> registration.lease.timeout.ms. I see that it won't need last heartbeat
> time.

Agreed.

best,
Colin

> 
> Thanks,
> Unmesh
> 
> On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe <cmcc...@apache.org> wrote:
> 
> > > Colin wrote:
> > > > The reason for including LeaseStartTimeMs in the request is to ensure
> > > > that the time required to communicate with the controller gets
> > included in
> > > > the lease time.  Since requests can potentially be delayed in the
> > network
> > > > for a long time, this is important.
> >
> > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > > The network time will be added anyway, because the lease timer on the
> > > active controller will start only after the heartbeat request reaches the
> > > server.
> >
> > Hi Unmesh,
> >
> > If the start time is not specified in the request, then the network time
> > is excluded from the heartbeat time.
> >
> > Here's an example:
> > Let's say broker A sends a heartbeat at time 100, and it arrives on the
> > controller at time 200, and the lease duration is 1000.
> >
> > The controller looks at the start time in the request, which is 100, and
> > adds 1000 to it, getting 1100.  On the other hand, if start time is not
> > specified in the request, then the expiration will be at time 1200.
> > That is what I mean by "the network time is included in the expiration
> > time."
> >
> > > And I think, some assumption about network round trip time is
> > > needed anyway to decide on the frequency of the heartbeat (
> > > registration.heartbeat.interval.ms), and lease timeout (
> > > registration.lease.timeout.ms). So I think just having a leaseTTL in the
> > > request is easier to understand and implement.
> >
> > It's important to keep in mind that messages may be delayed in the
> > network, or arrive out of order.  When this happens, we will use the start
> > time specified in the request to determine if the request is stale.
> >
> > > > Yes, I agree that the lease timeout on the controller side should be
> > > > reset in the case of controller failover.  The alternative would be to
> > > > track the lease as hard state rather than soft state, but I think that
> > > > is not really needed, and would result in more log entries.
> > > My interpretation of the mention of BrokerRecord in the KIP was that this
> > > record exists in the Raft log.
> >
> > BrokerRecord does exist in the Raft log, but does not include the last
> > heartbeat time.
> >
> > > By soft state, do you mean the broker
> > > records exist only on the active leader and will not be replicated in the
> > > raft log? If the live brokers list is maintained only on the active
> > > controller (raft leader), then, in case of leader failure, there will be
> > a
> > > window where the new leader does not know about the live brokers, till
> > the
> > > brokers establish the leases again.
> > > I think it will be safer to have leases as a hard state managed by
> > standard
> > > Raft replication.
> >
> > Leases are short, so the need to re-establish them after a controller
> > failover doesn't seem like a big problem.  But this is something we can
> > tweak if it becomes an issue.  One option would be to have a separate log
> > which is only used by the controller nodes for this (since, after all,
> > brokers don't care about registration renewals).
> >
> > > Or am I misunderstanding something? (I assume that with soft state, you
> > > mean something like zookeeper local sessions
> > > https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)
> > >
> > > > Our code is single threaded as well.  I think it makes sense for the
> > > > controller, since otherwise locking becomes very messy.  I'm not sure I
> > > > understand your question about duplicate broker ID detection, though.
> > > There's a section in the KIP about this -- is there a detail we should
> > add
> > > there?
> >
> > This is an implementation detail that doesn't need to be in the KIP.
> >
> > best,
> > Colin
> >
> > > I assumed broker leases are implemented as a hard state. In that case, to
> > > check for broker id conflict, we need to check the broker ids at two
> > places
> > > 1. Pending broker registrations (which are yet to be committed) 2.
> > Already
> > > committed broker registrations.
> > >
> > > Thanks,
> > > Unmesh
> > >
> > >
> > >
> > > On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe <cmcc...@apache.org> wrote:
> > >
> > > > On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
> > > > > >>>Can you repeat your questions about broker leases?
> > > > >
> > > > > >>>>The LeaseStartTimeMs is expected to be the broker's
> > > > > 'System.currentTimeMillis()' at the point of the request. The active
> > > > > controller will add its lease period to this in order >>>>to compute
> > the
> > > > > LeaseEndTimeMs.
> > > > >
> > > > > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is
> > a
> > > > > bit
> > > > > confusing.  Monotonic Clock (System.nanoTime) on the active
> > controller
> > > > > should be used to track leases.
> > > > > (For example,
> > > > >
> > > >
> > https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
> > > > > )
> > > > >
> > > > > Then we will not need LeaseStartTimeMs?
> > > > > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
> > > > controller
> > > > > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> > > > > In this case we might just drop LeaseEndTimeMs from the response, as
> > the
> > > > > broker already knows about the TTL and can send heartbeats at some
> > > > fraction
> > > > > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker
> > measured
> > > > > by System.nanoTime)
> > > > >
> > > >
> > > > Hi Unmesh,
> > > >
> > > > I agree that the monotonic clock is probably a better idea here.  It is
> > > > good to be robust against wall clock changes, although I think a
> > cluster
> > > > which had them might suffer other issues.  I will change it to specify
> > a
> > > > monotonic clock.
> > > >
> > > > The reason for including LeaseStartTimeMs in the request is to ensure
> > that
> > > > the time required to communicate with the controller gets included in
> > the
> > > > lease time.  Since requests can potentially be delayed in the network
> > for a
> > > > long time, this is important.
> > > >
> > > > >
> > > > > I have a prototype built to demonstrate this as following:
> > > > >
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> > > > >
> > > > > The Kip631Controller itself depends on a Consensus module, to
> > demonstrate
> > > > > how possible interactions with the consensus module will look like
> > > > >  (The Consensus can be pluggable really, with an API to allow reading
> > > > > replicated log upto HighWaterMark)
> > > > >
> > > > > It has an implementation of LeaseTracker
> > > > >
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
> > > > > to demonstrate LeaseTracker's interaction with the consensus module.
> > > > >
> > > > > The implementation has the following aspects:
> > > > > 1. The lease tracking happens only on the active controller (raft
> > > > > leader)
> > > > > 2. Once the lease expires, it needs to propose and commit a
> > FenceBroker
> > > > > record for that lease.
> > > > > 3. In case of active controller failure, the lease will be tracked by
> > > > > the
> > > > > newly raft leader. The new raft leader starts the lease timer again,
> > (as
> > > > > implemented in onBecomingLeader method of
> > > > >
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> > > > > )
> > > > > in effect extending the lease by the time spent in the leader
> > election
> > > > > and
> > > > > whatever time was elapsed on the old leader.
> > > >
> > > > Yes, I agree that the lease timeout on the controller side should be
> > reset
> > > > in the case of controller failover.  The alternative would be to track
> > the
> > > > lease as hard state rather than soft state, but I think that is not
> > really
> > > > needed, and would result in more log entries.
> > > >
> > > > >
> > > > > There are working tests for this implementation here.
> > > > >
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
> > > > > and an end to end test here
> > > > >
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
> > > > > <
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
> > > > >
> > > > > >>'m not sure what you mean by "de-duplication of the broker."  Can
> > you
> > > > > give a little more context?
> > > > > Apologies for using the confusing term deduplication. I meant broker
> > id
> > > > > conflict.
> > > > > As you can see in the prototype handleRequest of KIP631Controller
> > > > > <
> > > >
> > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> > > > >,
> > > > > the duplicate broker id needs to be detected before the BrokerRecord
> > is
> > > > > submitted to the raft module.
> > > > > Also as implemented in the prototype, the KIP631Controller is single
> > > > > threaded, handling requests one at a time. (an example of
> > > > >
> > > >
> > https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
> > > > > )
> > > >
> > > > Our code is single threaded as well.  I think it makes sense for the
> > > > controller, since otherwise locking becomes very messy.  I'm not sure I
> > > > understand your question about duplicate broker ID detection, though.
> > > > There's a section in the KIP about this -- is there a detail we should
> > add
> > > > there?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > >
> > > > > Thanks,
> > > > > Unmesh
> > > > >
> > > > > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz>
> > wrote:
> > > > >
> > > > > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > > > > > > Hi Colin,
> > > > > > >
> > > > > > > There were a few of questions I had..
> > > > > >
> > > > > > Hi Unmesh,
> > > > > >
> > > > > > Thanks for the response.
> > > > > >
> > > > > > >
> > > > > > > 1. Were my comments on the broker lease implementation (and
> > > > corresponding
> > > > > > > prototype) appropriate and do we need to change the KIP
> > > > > > > description accordingly?.
> > > > > > >
> > > > > >
> > > > > > Can you repeat your questions about broker leases?
> > > > > >
> > > > > > >
> > > > > > > 2. How will broker epochs be generated? I am assuming it can be
> > the
> > > > > > > committed log offset (like zxid?)
> > > > > > >
> > > > > >
> > > > > > There isn't any need to use a log offset.  We can just look at an
> > > > > > in-memory hash table and see what the latest number is, and add
> > one, to
> > > > > > generate a new broker epoch.
> > > > > >
> > > > > > >
> > > > > > > 3. How will producer registration happen? I am assuming it
> > should be
> > > > > > > similar to broker registration, with a similar way to generate
> > > > producer
> > > > > > id.
> > > > > > >
> > > > > >
> > > > > > For the EOS stuff, we will need a few new RPCs to the controller.
> > I
> > > > think
> > > > > > we should do that in a follow-on KIP, though, since this one is
> > already
> > > > > > pretty big.
> > > > > >
> > > > > > >
> > > > > > > 4. Because we expose Raft log to all the brokers, any
> > de-duplication
> > > > of
> > > > > > the
> > > > > > > broker needs to happen before the requests are proposed to Raft.
> > For
> > > > this
> > > > > > > the controller needs to be single threaded, and should do
> > validation
> > > > > > > against the in-process or pending requests and the final state. I
> > > > read a
> > > > > > > mention of this, in the responses in this thread.Will it be
> > useful to
> > > > > > > mention this in the KIP?
> > > > > > >
> > > > > >
> > > > > > I'm not sure what you mean by "de-duplication of the broker."  Can
> > you
> > > > > > give a little more context?
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Unmesh
> > > > > > >
> > > > > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cmcc...@apache.org
> > >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I'm thinking of calling a vote on KIP-631 on Monday.  Let me
> > know
> > > > if
> > > > > > > > there's any more comments I should address before I start the
> > vote.
> > > > > > > >
> > > > > > > > cheers,
> > > > > > > > Colin
> > > > > > > >
> > > > > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
> > > > > > > > > >>Hi Unmesh,
> > > > > > > > > >>Thanks, I'll take a look.
> > > > > > > > > Thanks. I will be adding more to the prototype and will be
> > happy
> > > > to
> > > > > > help
> > > > > > > > > and collaborate.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Unmesh
> > > > > > > > >
> > > > > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe <
> > > > cmcc...@apache.org>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jose,
> > > > > > > > > >
> > > > > > > > > > That'a s good point that I hadn't considered.  It's
> > probably
> > > > worth
> > > > > > > > having
> > > > > > > > > > a separate leader change message, as you mentioned.
> > > > > > > > > >
> > > > > > > > > > Hi Unmesh,
> > > > > > > > > >
> > > > > > > > > > Thanks, I'll take a look.
> > > > > > > > > >
> > > > > > > > > > best,
> > > > > > > > > > Colin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > > > > > > > > > > Hi Unmesh,
> > > > > > > > > > >
> > > > > > > > > > > Very cool prototype!
> > > > > > > > > > >
> > > > > > > > > > > Hi Colin,
> > > > > > > > > > >
> > > > > > > > > > > The KIP proposes a record called IsrChange which
> > includes the
> > > > > > > > > > > partition, topic, isr, leader and leader epoch. During
> > normal
> > > > > > > > > > > operation ISR changes do not result in leader changes.
> > > > Similarly,
> > > > > > > > > > > leader changes do not necessarily involve ISR changes.
> > The
> > > > > > controller
> > > > > > > > > > > implementation that uses ZK modeled them together because
> > > > > > > > > > > 1. All of this information is stored in one znode.
> > > > > > > > > > > 2. ZK's optimistic lock requires that you specify the new
> > > > value
> > > > > > > > > > completely
> > > > > > > > > > > 3. The change to that znode was being performed by both
> > the
> > > > > > > > controller
> > > > > > > > > > > and the leader.
> > > > > > > > > > >
> > > > > > > > > > > None of these reasons are true in KIP-500. Have we
> > considered
> > > > > > having
> > > > > > > > > > > two different records? For example
> > > > > > > > > > >
> > > > > > > > > > > 1. IsrChange record which includes topic, partition, isr
> > > > > > > > > > > 2. LeaderChange record which includes topic, partition,
> > > > leader
> > > > > > and
> > > > > > > > > > leader epoch.
> > > > > > > > > > >
> > > > > > > > > > > I suspect that making this change will also require
> > changing
> > > > the
> > > > > > > > > > > message AlterIsrRequest introduced in KIP-497: Add
> > > > inter-broker
> > > > > > API
> > > > > > > > to
> > > > > > > > > > > alter ISR.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > > -Jose
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to