>>The reason for including LeaseStartTimeMs in the request is to ensure
that the time required to communicate with the controller gets included in
>>the lease time.  Since requests can potentially be delayed in the network
for a long time, this is important.
The network time will be added anyway, because the lease timer on the
active controller will start only after the heartbeat request reaches the
server. And I think, some assumption about network round trip time is
needed anyway to decide on the frequency of the heartbeat (
registration.heartbeat.interval.ms), and lease timeout (
registration.lease.timeout.ms). So I think just having a leaseTTL in the
request is easier to understand and implement.
>>>Yes, I agree that the lease timeout on the controller side should be
reset in the case of controller failover.  The alternative would be to
track the >>>lease as hard state rather than soft state, but I think that
is not really needed, and would result in more log entries.
My interpretation of the mention of BrokerRecord in the KIP was that this
record exists in the Raft log. By soft state, do you mean the broker
records exist only on the active leader and will not be replicated in the
raft log? If the live brokers list is maintained only on the active
controller (raft leader), then, in case of leader failure, there will be a
window where the new leader does not know about the live brokers, till the
brokers establish the leases again.
I think it will be safer to have leases as a hard state managed by standard
Raft replication.
Or am I misunderstanding something? (I assume that with soft state, you
mean something like zookeeper local sessions
https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)

>>Our code is single threaded as well.  I think it makes sense for the
controller, since otherwise locking becomes very messy.  I'm not sure I
>>understand your question about duplicate broker ID detection, though.
There's a section in the KIP about this -- is there a detail we should add
?>>there?
I assumed broker leases are implemented as a hard state. In that case, to
check for broker id conflict, we need to check the broker ids at two places
1. Pending broker registrations (which are yet to be committed) 2. Already
committed broker registrations.

Thanks,
Unmesh



On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe <cmcc...@apache.org> wrote:

> On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
> > >>>Can you repeat your questions about broker leases?
> >
> > >>>>The LeaseStartTimeMs is expected to be the broker's
> > 'System.currentTimeMillis()' at the point of the request. The active
> > controller will add its lease period to this in order >>>>to compute the
> > LeaseEndTimeMs.
> >
> > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a
> > bit
> > confusing.  Monotonic Clock (System.nanoTime) on the active controller
> > should be used to track leases.
> > (For example,
> >
> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
> > )
> >
> > Then we will not need LeaseStartTimeMs?
> > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
> controller
> > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> > In this case we might just drop LeaseEndTimeMs from the response, as the
> > broker already knows about the TTL and can send heartbeats at some
> fraction
> > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
> > by System.nanoTime)
> >
>
> Hi Unmesh,
>
> I agree that the monotonic clock is probably a better idea here.  It is
> good to be robust against wall clock changes, although I think a cluster
> which had them might suffer other issues.  I will change it to specify a
> monotonic clock.
>
> The reason for including LeaseStartTimeMs in the request is to ensure that
> the time required to communicate with the controller gets included in the
> lease time.  Since requests can potentially be delayed in the network for a
> long time, this is important.
>
> >
> > I have a prototype built to demonstrate this as following:
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> >
> > The Kip631Controller itself depends on a Consensus module, to demonstrate
> > how possible interactions with the consensus module will look like
> >  (The Consensus can be pluggable really, with an API to allow reading
> > replicated log upto HighWaterMark)
> >
> > It has an implementation of LeaseTracker
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
> > to demonstrate LeaseTracker's interaction with the consensus module.
> >
> > The implementation has the following aspects:
> > 1. The lease tracking happens only on the active controller (raft
> > leader)
> > 2. Once the lease expires, it needs to propose and commit a FenceBroker
> > record for that lease.
> > 3. In case of active controller failure, the lease will be tracked by
> > the
> > newly raft leader. The new raft leader starts the lease timer again, (as
> > implemented in onBecomingLeader method of
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> > )
> > in effect extending the lease by the time spent in the leader election
> > and
> > whatever time was elapsed on the old leader.
>
> Yes, I agree that the lease timeout on the controller side should be reset
> in the case of controller failover.  The alternative would be to track the
> lease as hard state rather than soft state, but I think that is not really
> needed, and would result in more log entries.
>
> >
> > There are working tests for this implementation here.
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
> > and an end to end test here
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
> > <
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
> >
> > >>'m not sure what you mean by "de-duplication of the broker."  Can you
> > give a little more context?
> > Apologies for using the confusing term deduplication. I meant broker id
> > conflict.
> > As you can see in the prototype handleRequest of KIP631Controller
> > <
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> >,
> > the duplicate broker id needs to be detected before the BrokerRecord is
> > submitted to the raft module.
> > Also as implemented in the prototype, the KIP631Controller is single
> > threaded, handling requests one at a time. (an example of
> >
> https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
> > )
>
> Our code is single threaded as well.  I think it makes sense for the
> controller, since otherwise locking becomes very messy.  I'm not sure I
> understand your question about duplicate broker ID detection, though.
> There's a section in the KIP about this -- is there a detail we should add
> there?
>
> best,
> Colin
>
>
> >
> > Thanks,
> > Unmesh
> >
> > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz> wrote:
> >
> > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > > > Hi Colin,
> > > >
> > > > There were a few of questions I had..
> > >
> > > Hi Unmesh,
> > >
> > > Thanks for the response.
> > >
> > > >
> > > > 1. Were my comments on the broker lease implementation (and
> corresponding
> > > > prototype) appropriate and do we need to change the KIP
> > > > description accordingly?.
> > > >
> > >
> > > Can you repeat your questions about broker leases?
> > >
> > > >
> > > > 2. How will broker epochs be generated? I am assuming it can be the
> > > > committed log offset (like zxid?)
> > > >
> > >
> > > There isn't any need to use a log offset.  We can just look at an
> > > in-memory hash table and see what the latest number is, and add one, to
> > > generate a new broker epoch.
> > >
> > > >
> > > > 3. How will producer registration happen? I am assuming it should be
> > > > similar to broker registration, with a similar way to generate
> producer
> > > id.
> > > >
> > >
> > > For the EOS stuff, we will need a few new RPCs to the controller.  I
> think
> > > we should do that in a follow-on KIP, though, since this one is already
> > > pretty big.
> > >
> > > >
> > > > 4. Because we expose Raft log to all the brokers, any de-duplication
> of
> > > the
> > > > broker needs to happen before the requests are proposed to Raft. For
> this
> > > > the controller needs to be single threaded, and should do validation
> > > > against the in-process or pending requests and the final state. I
> read a
> > > > mention of this, in the responses in this thread.Will it be useful to
> > > > mention this in the KIP?
> > > >
> > >
> > > I'm not sure what you mean by "de-duplication of the broker."  Can you
> > > give a little more context?
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Unmesh
> > > >
> > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cmcc...@apache.org>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I'm thinking of calling a vote on KIP-631 on Monday.  Let me know
> if
> > > > > there's any more comments I should address before I start the vote.
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
> > > > > > >>Hi Unmesh,
> > > > > > >>Thanks, I'll take a look.
> > > > > > Thanks. I will be adding more to the prototype and will be happy
> to
> > > help
> > > > > > and collaborate.
> > > > > >
> > > > > > Thanks,
> > > > > > Unmesh
> > > > > >
> > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe <
> cmcc...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Jose,
> > > > > > >
> > > > > > > That'a s good point that I hadn't considered.  It's probably
> worth
> > > > > having
> > > > > > > a separate leader change message, as you mentioned.
> > > > > > >
> > > > > > > Hi Unmesh,
> > > > > > >
> > > > > > > Thanks, I'll take a look.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > > > > > > > Hi Unmesh,
> > > > > > > >
> > > > > > > > Very cool prototype!
> > > > > > > >
> > > > > > > > Hi Colin,
> > > > > > > >
> > > > > > > > The KIP proposes a record called IsrChange which includes the
> > > > > > > > partition, topic, isr, leader and leader epoch. During normal
> > > > > > > > operation ISR changes do not result in leader changes.
> Similarly,
> > > > > > > > leader changes do not necessarily involve ISR changes. The
> > > controller
> > > > > > > > implementation that uses ZK modeled them together because
> > > > > > > > 1. All of this information is stored in one znode.
> > > > > > > > 2. ZK's optimistic lock requires that you specify the new
> value
> > > > > > > completely
> > > > > > > > 3. The change to that znode was being performed by both the
> > > > > controller
> > > > > > > > and the leader.
> > > > > > > >
> > > > > > > > None of these reasons are true in KIP-500. Have we considered
> > > having
> > > > > > > > two different records? For example
> > > > > > > >
> > > > > > > > 1. IsrChange record which includes topic, partition, isr
> > > > > > > > 2. LeaderChange record which includes topic, partition,
> leader
> > > and
> > > > > > > leader epoch.
> > > > > > > >
> > > > > > > > I suspect that making this change will also require changing
> the
> > > > > > > > message AlterIsrRequest introduced in KIP-497: Add
> inter-broker
> > > API
> > > > > to
> > > > > > > > alter ISR.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > -Jose
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to