An advanced implementation could maintain lists of [{node: n1, ratio: 0.2}, 
{node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins can 
signify the ratio of jobs any given node should be handling. Of course this 
gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure there 
are clever ways out of that. It might also be too complicated to get started 
with this, but it might be worth using a structured list, to allow extensions 
later.

* * *

Nick, I like your proposal. I was briefly worrying about Adam’s suggestion to 
add a fast-path for the replicator, because I use a similar owner model for the 
db-per-user server, and thought I wanted to advocate for a more general 
framework that can be used for various types of jobs (something that the Roles 
variable in the model already allows), but I now agree that the importance of 
making replication as efficient as possible is a worthwhile optimisation vs. 
the other jobs we already have, or could imagine running in the future.

Best
Jan
—

> On 11. Apr 2019, at 01:51, Robert Newson <rnew...@apache.org> wrote:
> 
> As long as any given replicator node can grab as much work as it can handle, 
> It doesn't need to be 'fair' in the way we currently do it. The notion of an 
> 'owner' node drops away imo. As nick mentions, the fun part is recognising 
> when jobs become unowned due to a resource failure somewhere but this is a 
> very standard thing, a pool of workers competing over jobs.
> 
> -- 
>  Robert Samuel Newson
>  rnew...@apache.org
> 
> On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
>> Hi Nick,
>> 
>> Good stuff. On the first topic, I wonder if it makes sense to use a 
>> dedicated code path for updates to _replicator DB docs, one that would 
>> automatically register these replication jobs in a queue in FDB as part 
>> of the update transaction. That’d save the overhead and latency of 
>> listening to _db_updates and then each _replicator DB’s _changes feed 
>> just to discover these updates (and then presumably create jobs in a 
>> job queue anyway).
>> 
>> On the second topic — is it important for each node declaring the 
>> replicator role to receive an equal allotment of jobs and manage its 
>> own queue, or can the replication worker processes on each node just 
>> grab the next job off the global queue in FDB whenever they free up? I 
>> could see the latter approach decreasing the tail latency for job 
>> execution, and I think there are good patterns for managing high 
>> contention dequeue operations in the case where we’ve got more worker 
>> processes than jobs to run.
>> 
>> Regardless, you make a good point about paying special attention to 
>> liveness checking now that we’re not relying on Erlang distribution for 
>> that purpose. I didn’t grok all the details of approach you have in 
>> mind for that yet because I wanted to bring up these two points above 
>> and get your perspective.
>> 
>> Adam
>> 
>>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <vatam...@apache.org> wrote:
>>> 
>>> I was thinking how replication would work with FDB and so far there are two
>>> main issues I believe would need to be addressed. One deals with how we
>>> monitor _replicator db docs for changes, and other one is how replication
>>> jobs coordinate so we don't run multiple replication jobs for the same
>>> replication document in a cluster.
>>> 
>>> 1) Shard-level vs fabric-level notifications for _replicator db docs
>>> 
>>> Currently replicator is monitoring and updating individual _replicator
>>> shards. Change notifications are done via change feeds (normal,
>>> non-continuous) and couch event server callbacks.
>>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
>>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
>>> With fdb we'd have to get these updates via a fabric changes feeds and rely
>>> on the global _db_updates. That could result in a performance impact and
>>> would be something to keep an eye on.
>>> 
>>> 2) Replicator job coordination
>>> 
>>> Replicator has a basic constraint that there should be only one replication
>>> job running for each replicator doc per cluster.
>>> 
>>> Each replication currently has a single "owner" node. The owner is picked
>>> to be one of 3 nodes were the _replicator doc shards live. If nodes connect
>>> or disconnect, replicator will reshuffle replication jobs and some nodes
>>> will stop running jobs that they don't "own" anymore and then proceed to
>>> "rescan" all the replicator docs to possibly start new ones. However, with
>>> fdb, there are no connected erlang nodes and no shards. All coordination
>>> happens via fdb, so we'd have to somehow coordinate replication job
>>> ownership through there.
>>> 
>>> For discussion, here is a proposal for a worker registration layer do that
>>> job coordination:
>>> 
>>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
>>> that they can take on certain "roles". "replicator" would be one such role.
>>> And so, for each role there is a list of nodes. Each node picks a fraction
>>> of jobs based on how many other nodes of the same role are in the list.
>>> When membership changes, nodes which are alive might have to pick up new
>>> jobs or stop running existing jobs since they'd be started by other nodes.
>>> 
>>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
>>> currently down so the membership list is [n2, n3]. If there are 60
>>> replication jobs then n2 might run 30, and n3 another 30. n1 comes online
>>> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
>>> then picks 20 replication jobs. At about the same time n2 and n3 notice n1
>>> is online and decide to stop running the jobs that n1 would pick up and
>>> they each would end up running roughly 20 jobs.
>>> 
>>> The difficulty here comes from maintaining liveliness. A node could stop at
>>> any time without removing itself from the membership list of its roles.
>>> That means all of the sudden a subset of jobs would stop running without
>>> anyone picking them up. So, the idea is to have nodes periodically update
>>> their health status in fdb to indicate they are alive. Once a node doesn't
>>> update its status often enough it will be considered dead and others can
>>> pick up its share of jobs.
>>> 
>>> To start the discussion, I sketched this data layout and pseudocode:
>>> 
>>> Data layout:
>>> 
>>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
>>> ...])
>>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
>>> Timestamp)
>>> 
>>> Role : In our case it would be "replicator", but it could be any other role.
>>> 
>>> WId : are workers IDs. These should unique identify workers. It would be
>>> nice if it could be persisted, such that a worker doing a quick restart
>>> will end up with the same id and the membership list won't change. However,
>>> a random UUID would work as well.
>>> 
>>> Timeout : This is the timeout declared by the nodes themselves. These need
>>> not be the same for all node. Some nodes might decide they run slower so
>>> their timeouts would be larger. But they essentially promise to update
>>> their health status at least that often.
>>> 
>>> Timestamp: The time of the last health report from that node. Timestamps
>>> technically might not be needed as neighbor monitors could remember the
>>> time delta between when it saw changes to the health values' version stamp.
>>> 
>>> Pseudocode:
>>> 
>>> init(Role) ->
>>>   Members = tx(add_to_members(self(), Role)
>>>   spawn health_ping(Members, Role)
>>>   spawn neighbor_check(Members, Role)
>>>   loop()
>>> 
>>> terminate() ->
>>>   tx(remove_self_from_members_and_health_list())
>>> 
>>> loop() ->
>>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
>>>   receive
>>>   {Watch, NewMembers} ->
>>>       case diff(Members, NewMembers) of
>>>       no_diff ->
>>>           ok;
>>>       {Added, Removed} ->
>>>           update_neighbor_check(NewMembers)
>>>           fire_callbacks(Added, Removed)
>>>   end,
>>>   loop()
>>> 
>>> health_ping(Members, Role) ->
>>>  tx(update_health(Role, self(), Timestamp))
>>>  sleep(Timeout / 3)
>>>  health_ping(Members, Role)
>>> 
>>> neighbor_check(Members, Role) ->
>>> Neighbor = next_in_list(self(), Members)
>>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
>>> case now() - Timestamp > Timeout of
>>> true ->
>>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
>>>     neighbor_check(NewMembers, Role)
>>> false ->
>>>     sleep(Timeout)
>>>     neighbor_check(Members, Role)
>>> end
>>> 
>>> 
>>> Description:
>>> 
>>> Nodes add themselves to a membership list for each role they participate
>>> in. The membership list has a version stamp. It's there to ensure that the
>>> watch that is created during the update would find any change occurring
>>> since their update.
>>> 
>>> tx(...) is pseudocode for "runs in a transaction"
>>> 
>>> neighbor_check() is how entries for dead workers are cleaned up. Each node
>>> will monitor its neighbor's status. If it sees the neighbor has stopped
>>> responding it will remove it from the list. That will update the membership
>>> list and will fire the watch. Everyone will notice and rescan their
>>> replicator docs.
>>> 
>>> fire_callbacks() is just reporting to the replicator app that membership
>>> has changed it and might need to rescan. On top of this code currently
>>> there is a cluster stability logic that waits a bit before rescanning in
>>> case there is a flurry of node membership changes. Like say on rolling node
>>> reboots or cluster startup.
>>> 
>>> I am not entirely sure on the semantics of watches and how lightweight or
>>> heavyweight they are. Creating a watch and a version stamp will hopefully
>>> not lose updates. That is, all updates after that transaction's watch will
>>> fire the watch. Watches seem to have limits and then I think we'd need to
>>> revert to polling
>>> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
>>> which make sense but wondering if we should just start with polling first
>>> and larger poll intervals. I guess it depends on how many other places we'd
>>> have use watches and if we'd ever come close the even needing to handle
>>> that error case.
>>> 
>>> 
>>> What does everyone think? The idea would to be turn the proposal from 2)
>>> into an RFC but wanted to open it for a general discussion and see what
>>> everythone thought about it.
>> 
>> 

-- 
Professional Support for Apache CouchDB:
https://neighbourhood.ie/couchdb-support/

Reply via email to