An advanced implementation could maintain lists of [{node: n1, ratio: 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins can signify the ratio of jobs any given node should be handling. Of course this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure there are clever ways out of that. It might also be too complicated to get started with this, but it might be worth using a structured list, to allow extensions later.
* * * Nick, I like your proposal. I was briefly worrying about Adam’s suggestion to add a fast-path for the replicator, because I use a similar owner model for the db-per-user server, and thought I wanted to advocate for a more general framework that can be used for various types of jobs (something that the Roles variable in the model already allows), but I now agree that the importance of making replication as efficient as possible is a worthwhile optimisation vs. the other jobs we already have, or could imagine running in the future. Best Jan — > On 11. Apr 2019, at 01:51, Robert Newson <rnew...@apache.org> wrote: > > As long as any given replicator node can grab as much work as it can handle, > It doesn't need to be 'fair' in the way we currently do it. The notion of an > 'owner' node drops away imo. As nick mentions, the fun part is recognising > when jobs become unowned due to a resource failure somewhere but this is a > very standard thing, a pool of workers competing over jobs. > > -- > Robert Samuel Newson > rnew...@apache.org > > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote: >> Hi Nick, >> >> Good stuff. On the first topic, I wonder if it makes sense to use a >> dedicated code path for updates to _replicator DB docs, one that would >> automatically register these replication jobs in a queue in FDB as part >> of the update transaction. That’d save the overhead and latency of >> listening to _db_updates and then each _replicator DB’s _changes feed >> just to discover these updates (and then presumably create jobs in a >> job queue anyway). >> >> On the second topic — is it important for each node declaring the >> replicator role to receive an equal allotment of jobs and manage its >> own queue, or can the replication worker processes on each node just >> grab the next job off the global queue in FDB whenever they free up? I >> could see the latter approach decreasing the tail latency for job >> execution, and I think there are good patterns for managing high >> contention dequeue operations in the case where we’ve got more worker >> processes than jobs to run. >> >> Regardless, you make a good point about paying special attention to >> liveness checking now that we’re not relying on Erlang distribution for >> that purpose. I didn’t grok all the details of approach you have in >> mind for that yet because I wanted to bring up these two points above >> and get your perspective. >> >> Adam >> >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <vatam...@apache.org> wrote: >>> >>> I was thinking how replication would work with FDB and so far there are two >>> main issues I believe would need to be addressed. One deals with how we >>> monitor _replicator db docs for changes, and other one is how replication >>> jobs coordinate so we don't run multiple replication jobs for the same >>> replication document in a cluster. >>> >>> 1) Shard-level vs fabric-level notifications for _replicator db docs >>> >>> Currently replicator is monitoring and updating individual _replicator >>> shards. Change notifications are done via change feeds (normal, >>> non-continuous) and couch event server callbacks. >>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180, >>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246. >>> With fdb we'd have to get these updates via a fabric changes feeds and rely >>> on the global _db_updates. That could result in a performance impact and >>> would be something to keep an eye on. >>> >>> 2) Replicator job coordination >>> >>> Replicator has a basic constraint that there should be only one replication >>> job running for each replicator doc per cluster. >>> >>> Each replication currently has a single "owner" node. The owner is picked >>> to be one of 3 nodes were the _replicator doc shards live. If nodes connect >>> or disconnect, replicator will reshuffle replication jobs and some nodes >>> will stop running jobs that they don't "own" anymore and then proceed to >>> "rescan" all the replicator docs to possibly start new ones. However, with >>> fdb, there are no connected erlang nodes and no shards. All coordination >>> happens via fdb, so we'd have to somehow coordinate replication job >>> ownership through there. >>> >>> For discussion, here is a proposal for a worker registration layer do that >>> job coordination: >>> >>> The basic idea is erlang fabric nodes would declare, by writing to fdb, >>> that they can take on certain "roles". "replicator" would be one such role. >>> And so, for each role there is a list of nodes. Each node picks a fraction >>> of jobs based on how many other nodes of the same role are in the list. >>> When membership changes, nodes which are alive might have to pick up new >>> jobs or stop running existing jobs since they'd be started by other nodes. >>> >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is >>> currently down so the membership list is [n2, n3]. If there are 60 >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes online >>> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1 >>> then picks 20 replication jobs. At about the same time n2 and n3 notice n1 >>> is online and decide to stop running the jobs that n1 would pick up and >>> they each would end up running roughly 20 jobs. >>> >>> The difficulty here comes from maintaining liveliness. A node could stop at >>> any time without removing itself from the membership list of its roles. >>> That means all of the sudden a subset of jobs would stop running without >>> anyone picking them up. So, the idea is to have nodes periodically update >>> their health status in fdb to indicate they are alive. Once a node doesn't >>> update its status often enough it will be considered dead and others can >>> pick up its share of jobs. >>> >>> To start the discussion, I sketched this data layout and pseudocode: >>> >>> Data layout: >>> >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2, >>> ...]) >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout, >>> Timestamp) >>> >>> Role : In our case it would be "replicator", but it could be any other role. >>> >>> WId : are workers IDs. These should unique identify workers. It would be >>> nice if it could be persisted, such that a worker doing a quick restart >>> will end up with the same id and the membership list won't change. However, >>> a random UUID would work as well. >>> >>> Timeout : This is the timeout declared by the nodes themselves. These need >>> not be the same for all node. Some nodes might decide they run slower so >>> their timeouts would be larger. But they essentially promise to update >>> their health status at least that often. >>> >>> Timestamp: The time of the last health report from that node. Timestamps >>> technically might not be needed as neighbor monitors could remember the >>> time delta between when it saw changes to the health values' version stamp. >>> >>> Pseudocode: >>> >>> init(Role) -> >>> Members = tx(add_to_members(self(), Role) >>> spawn health_ping(Members, Role) >>> spawn neighbor_check(Members, Role) >>> loop() >>> >>> terminate() -> >>> tx(remove_self_from_members_and_health_list()) >>> >>> loop() -> >>> {Members, Watch} = tx(add_members(self(), Role), get_watch()) >>> receive >>> {Watch, NewMembers} -> >>> case diff(Members, NewMembers) of >>> no_diff -> >>> ok; >>> {Added, Removed} -> >>> update_neighbor_check(NewMembers) >>> fire_callbacks(Added, Removed) >>> end, >>> loop() >>> >>> health_ping(Members, Role) -> >>> tx(update_health(Role, self(), Timestamp)) >>> sleep(Timeout / 3) >>> health_ping(Members, Role) >>> >>> neighbor_check(Members, Role) -> >>> Neighbor = next_in_list(self(), Members) >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role)) >>> case now() - Timestamp > Timeout of >>> true -> >>> NewMembers = Tx(remove_neighbor(Neighbor, Role)) >>> neighbor_check(NewMembers, Role) >>> false -> >>> sleep(Timeout) >>> neighbor_check(Members, Role) >>> end >>> >>> >>> Description: >>> >>> Nodes add themselves to a membership list for each role they participate >>> in. The membership list has a version stamp. It's there to ensure that the >>> watch that is created during the update would find any change occurring >>> since their update. >>> >>> tx(...) is pseudocode for "runs in a transaction" >>> >>> neighbor_check() is how entries for dead workers are cleaned up. Each node >>> will monitor its neighbor's status. If it sees the neighbor has stopped >>> responding it will remove it from the list. That will update the membership >>> list and will fire the watch. Everyone will notice and rescan their >>> replicator docs. >>> >>> fire_callbacks() is just reporting to the replicator app that membership >>> has changed it and might need to rescan. On top of this code currently >>> there is a cluster stability logic that waits a bit before rescanning in >>> case there is a flurry of node membership changes. Like say on rolling node >>> reboots or cluster startup. >>> >>> I am not entirely sure on the semantics of watches and how lightweight or >>> heavyweight they are. Creating a watch and a version stamp will hopefully >>> not lose updates. That is, all updates after that transaction's watch will >>> fire the watch. Watches seem to have limits and then I think we'd need to >>> revert to polling >>> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789 >>> which make sense but wondering if we should just start with polling first >>> and larger poll intervals. I guess it depends on how many other places we'd >>> have use watches and if we'd ever come close the even needing to handle >>> that error case. >>> >>> >>> What does everyone think? The idea would to be turn the proposal from 2) >>> into an RFC but wanted to open it for a general discussion and see what >>> everythone thought about it. >> >> -- Professional Support for Apache CouchDB: https://neighbourhood.ie/couchdb-support/