On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
> >>>Can you repeat your questions about broker leases?
> 
> >>>>The LeaseStartTimeMs is expected to be the broker's
> 'System.currentTimeMillis()' at the point of the request. The active
> controller will add its lease period to this in order >>>>to compute the
> LeaseEndTimeMs.
> 
> I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a 
> bit
> confusing.  Monotonic Clock (System.nanoTime) on the active controller
> should be used to track leases.
> (For example,
> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
> )
> 
> Then we will not need LeaseStartTimeMs?
> Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active controller
> can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> In this case we might just drop LeaseEndTimeMs from the response, as the
> broker already knows about the TTL and can send heartbeats at some fraction
> of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
> by System.nanoTime)
> 

Hi Unmesh,

I agree that the monotonic clock is probably a better idea here.  It is good to 
be robust against wall clock changes, although I think a cluster which had them 
might suffer other issues.  I will change it to specify a monotonic clock.

The reason for including LeaseStartTimeMs in the request is to ensure that the 
time required to communicate with the controller gets included in the lease 
time.  Since requests can potentially be delayed in the network for a long 
time, this is important.

>
> I have a prototype built to demonstrate this as following:
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> 
> The Kip631Controller itself depends on a Consensus module, to demonstrate
> how possible interactions with the consensus module will look like
>  (The Consensus can be pluggable really, with an API to allow reading
> replicated log upto HighWaterMark)
> 
> It has an implementation of LeaseTracker
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
> to demonstrate LeaseTracker's interaction with the consensus module.
> 
> The implementation has the following aspects:
> 1. The lease tracking happens only on the active controller (raft 
> leader)
> 2. Once the lease expires, it needs to propose and commit a FenceBroker
> record for that lease.
> 3. In case of active controller failure, the lease will be tracked by 
> the
> newly raft leader. The new raft leader starts the lease timer again, (as
> implemented in onBecomingLeader method of
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> )
> in effect extending the lease by the time spent in the leader election 
> and
> whatever time was elapsed on the old leader.

Yes, I agree that the lease timeout on the controller side should be reset in 
the case of controller failover.  The alternative would be to track the lease 
as hard state rather than soft state, but I think that is not really needed, 
and would result in more log entries.

> 
> There are working tests for this implementation here.
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
> and an end to end test here
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
> <https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala>
> >>'m not sure what you mean by "de-duplication of the broker."  Can you
> give a little more context?
> Apologies for using the confusing term deduplication. I meant broker id
> conflict.
> As you can see in the prototype handleRequest of KIP631Controller
> <https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala>,
> the duplicate broker id needs to be detected before the BrokerRecord is
> submitted to the raft module.
> Also as implemented in the prototype, the KIP631Controller is single
> threaded, handling requests one at a time. (an example of
> https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
> )

Our code is single threaded as well.  I think it makes sense for the 
controller, since otherwise locking becomes very messy.  I'm not sure I 
understand your question about duplicate broker ID detection, though.  There's 
a section in the KIP about this -- is there a detail we should add there?

best,
Colin


> 
> Thanks,
> Unmesh
> 
> On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz> wrote:
> 
> > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > > Hi Colin,
> > >
> > > There were a few of questions I had..
> >
> > Hi Unmesh,
> >
> > Thanks for the response.
> >
> > >
> > > 1. Were my comments on the broker lease implementation (and corresponding
> > > prototype) appropriate and do we need to change the KIP
> > > description accordingly?.
> > >
> >
> > Can you repeat your questions about broker leases?
> >
> > >
> > > 2. How will broker epochs be generated? I am assuming it can be the
> > > committed log offset (like zxid?)
> > >
> >
> > There isn't any need to use a log offset.  We can just look at an
> > in-memory hash table and see what the latest number is, and add one, to
> > generate a new broker epoch.
> >
> > >
> > > 3. How will producer registration happen? I am assuming it should be
> > > similar to broker registration, with a similar way to generate producer
> > id.
> > >
> >
> > For the EOS stuff, we will need a few new RPCs to the controller.  I think
> > we should do that in a follow-on KIP, though, since this one is already
> > pretty big.
> >
> > >
> > > 4. Because we expose Raft log to all the brokers, any de-duplication of
> > the
> > > broker needs to happen before the requests are proposed to Raft. For this
> > > the controller needs to be single threaded, and should do validation
> > > against the in-process or pending requests and the final state. I read a
> > > mention of this, in the responses in this thread.Will it be useful to
> > > mention this in the KIP?
> > >
> >
> > I'm not sure what you mean by "de-duplication of the broker."  Can you
> > give a little more context?
> >
> > best,
> > Colin
> >
> > >
> > > Thanks,
> > > Unmesh
> > >
> > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cmcc...@apache.org> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'm thinking of calling a vote on KIP-631 on Monday.  Let me know if
> > > > there's any more comments I should address before I start the vote.
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
> > > > > >>Hi Unmesh,
> > > > > >>Thanks, I'll take a look.
> > > > > Thanks. I will be adding more to the prototype and will be happy to
> > help
> > > > > and collaborate.
> > > > >
> > > > > Thanks,
> > > > > Unmesh
> > > > >
> > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe <cmcc...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Jose,
> > > > > >
> > > > > > That'a s good point that I hadn't considered.  It's probably worth
> > > > having
> > > > > > a separate leader change message, as you mentioned.
> > > > > >
> > > > > > Hi Unmesh,
> > > > > >
> > > > > > Thanks, I'll take a look.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > > > > > > Hi Unmesh,
> > > > > > >
> > > > > > > Very cool prototype!
> > > > > > >
> > > > > > > Hi Colin,
> > > > > > >
> > > > > > > The KIP proposes a record called IsrChange which includes the
> > > > > > > partition, topic, isr, leader and leader epoch. During normal
> > > > > > > operation ISR changes do not result in leader changes. Similarly,
> > > > > > > leader changes do not necessarily involve ISR changes. The
> > controller
> > > > > > > implementation that uses ZK modeled them together because
> > > > > > > 1. All of this information is stored in one znode.
> > > > > > > 2. ZK's optimistic lock requires that you specify the new value
> > > > > > completely
> > > > > > > 3. The change to that znode was being performed by both the
> > > > controller
> > > > > > > and the leader.
> > > > > > >
> > > > > > > None of these reasons are true in KIP-500. Have we considered
> > having
> > > > > > > two different records? For example
> > > > > > >
> > > > > > > 1. IsrChange record which includes topic, partition, isr
> > > > > > > 2. LeaderChange record which includes topic, partition, leader
> > and
> > > > > > leader epoch.
> > > > > > >
> > > > > > > I suspect that making this change will also require changing the
> > > > > > > message AlterIsrRequest introduced in KIP-497: Add inter-broker
> > API
> > > > to
> > > > > > > alter ISR.
> > > > > > >
> > > > > > > Thanks
> > > > > > > -Jose
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to