I quite like the idea that Adam brought up of having a global queue that allows workers to pull replication jobs from. That means that on a cluster with a lot of replications we could spin up a lot of workers for a short period of time to complete all replications before decreasing the workers again.
On Thu, Apr 11, 2019 at 10:16 AM Jan Lehnardt <j...@apache.org> wrote: > An advanced implementation could maintain lists of [{node: n1, ratio: > 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins > can signify the ratio of jobs any given node should be handling. Of course > this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure > there are clever ways out of that. It might also be too complicated to get > started with this, but it might be worth using a structured list, to allow > extensions later. > > * * * > > Nick, I like your proposal. I was briefly worrying about Adam’s suggestion > to add a fast-path for the replicator, because I use a similar owner model > for the db-per-user server, and thought I wanted to advocate for a more > general framework that can be used for various types of jobs (something > that the Roles variable in the model already allows), but I now agree that > the importance of making replication as efficient as possible is a > worthwhile optimisation vs. the other jobs we already have, or could > imagine running in the future. > > Best > Jan > — > > > On 11. Apr 2019, at 01:51, Robert Newson <rnew...@apache.org> wrote: > > > > As long as any given replicator node can grab as much work as it can > handle, It doesn't need to be 'fair' in the way we currently do it. The > notion of an 'owner' node drops away imo. As nick mentions, the fun part is > recognising when jobs become unowned due to a resource failure somewhere > but this is a very standard thing, a pool of workers competing over jobs. > > > > -- > > Robert Samuel Newson > > rnew...@apache.org > > > > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote: > >> Hi Nick, > >> > >> Good stuff. On the first topic, I wonder if it makes sense to use a > >> dedicated code path for updates to _replicator DB docs, one that would > >> automatically register these replication jobs in a queue in FDB as part > >> of the update transaction. That’d save the overhead and latency of > >> listening to _db_updates and then each _replicator DB’s _changes feed > >> just to discover these updates (and then presumably create jobs in a > >> job queue anyway). > >> > >> On the second topic — is it important for each node declaring the > >> replicator role to receive an equal allotment of jobs and manage its > >> own queue, or can the replication worker processes on each node just > >> grab the next job off the global queue in FDB whenever they free up? I > >> could see the latter approach decreasing the tail latency for job > >> execution, and I think there are good patterns for managing high > >> contention dequeue operations in the case where we’ve got more worker > >> processes than jobs to run. > >> > >> Regardless, you make a good point about paying special attention to > >> liveness checking now that we’re not relying on Erlang distribution for > >> that purpose. I didn’t grok all the details of approach you have in > >> mind for that yet because I wanted to bring up these two points above > >> and get your perspective. > >> > >> Adam > >> > >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <vatam...@apache.org> > wrote: > >>> > >>> I was thinking how replication would work with FDB and so far there > are two > >>> main issues I believe would need to be addressed. One deals with how we > >>> monitor _replicator db docs for changes, and other one is how > replication > >>> jobs coordinate so we don't run multiple replication jobs for the same > >>> replication document in a cluster. > >>> > >>> 1) Shard-level vs fabric-level notifications for _replicator db docs > >>> > >>> Currently replicator is monitoring and updating individual _replicator > >>> shards. Change notifications are done via change feeds (normal, > >>> non-continuous) and couch event server callbacks. > >>> > https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180 > , > >>> > https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246 > . > >>> With fdb we'd have to get these updates via a fabric changes feeds and > rely > >>> on the global _db_updates. That could result in a performance impact > and > >>> would be something to keep an eye on. > >>> > >>> 2) Replicator job coordination > >>> > >>> Replicator has a basic constraint that there should be only one > replication > >>> job running for each replicator doc per cluster. > >>> > >>> Each replication currently has a single "owner" node. The owner is > picked > >>> to be one of 3 nodes were the _replicator doc shards live. If nodes > connect > >>> or disconnect, replicator will reshuffle replication jobs and some > nodes > >>> will stop running jobs that they don't "own" anymore and then proceed > to > >>> "rescan" all the replicator docs to possibly start new ones. However, > with > >>> fdb, there are no connected erlang nodes and no shards. All > coordination > >>> happens via fdb, so we'd have to somehow coordinate replication job > >>> ownership through there. > >>> > >>> For discussion, here is a proposal for a worker registration layer do > that > >>> job coordination: > >>> > >>> The basic idea is erlang fabric nodes would declare, by writing to fdb, > >>> that they can take on certain "roles". "replicator" would be one such > role. > >>> And so, for each role there is a list of nodes. Each node picks a > fraction > >>> of jobs based on how many other nodes of the same role are in the list. > >>> When membership changes, nodes which are alive might have to pick up > new > >>> jobs or stop running existing jobs since they'd be started by other > nodes. > >>> > >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. > n1 is > >>> currently down so the membership list is [n2, n3]. If there are 60 > >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes > online > >>> and adds itself to the roles list, which now looks like [n1, n2, n3]. > n1 > >>> then picks 20 replication jobs. At about the same time n2 and n3 > notice n1 > >>> is online and decide to stop running the jobs that n1 would pick up and > >>> they each would end up running roughly 20 jobs. > >>> > >>> The difficulty here comes from maintaining liveliness. A node could > stop at > >>> any time without removing itself from the membership list of its roles. > >>> That means all of the sudden a subset of jobs would stop running > without > >>> anyone picking them up. So, the idea is to have nodes periodically > update > >>> their health status in fdb to indicate they are alive. Once a node > doesn't > >>> update its status often enough it will be considered dead and others > can > >>> pick up its share of jobs. > >>> > >>> To start the discussion, I sketched this data layout and pseudocode: > >>> > >>> Data layout: > >>> > >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2, > >>> ...]) > >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout, > >>> Timestamp) > >>> > >>> Role : In our case it would be "replicator", but it could be any other > role. > >>> > >>> WId : are workers IDs. These should unique identify workers. It would > be > >>> nice if it could be persisted, such that a worker doing a quick restart > >>> will end up with the same id and the membership list won't change. > However, > >>> a random UUID would work as well. > >>> > >>> Timeout : This is the timeout declared by the nodes themselves. These > need > >>> not be the same for all node. Some nodes might decide they run slower > so > >>> their timeouts would be larger. But they essentially promise to update > >>> their health status at least that often. > >>> > >>> Timestamp: The time of the last health report from that node. > Timestamps > >>> technically might not be needed as neighbor monitors could remember the > >>> time delta between when it saw changes to the health values' version > stamp. > >>> > >>> Pseudocode: > >>> > >>> init(Role) -> > >>> Members = tx(add_to_members(self(), Role) > >>> spawn health_ping(Members, Role) > >>> spawn neighbor_check(Members, Role) > >>> loop() > >>> > >>> terminate() -> > >>> tx(remove_self_from_members_and_health_list()) > >>> > >>> loop() -> > >>> {Members, Watch} = tx(add_members(self(), Role), get_watch()) > >>> receive > >>> {Watch, NewMembers} -> > >>> case diff(Members, NewMembers) of > >>> no_diff -> > >>> ok; > >>> {Added, Removed} -> > >>> update_neighbor_check(NewMembers) > >>> fire_callbacks(Added, Removed) > >>> end, > >>> loop() > >>> > >>> health_ping(Members, Role) -> > >>> tx(update_health(Role, self(), Timestamp)) > >>> sleep(Timeout / 3) > >>> health_ping(Members, Role) > >>> > >>> neighbor_check(Members, Role) -> > >>> Neighbor = next_in_list(self(), Members) > >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role)) > >>> case now() - Timestamp > Timeout of > >>> true -> > >>> NewMembers = Tx(remove_neighbor(Neighbor, Role)) > >>> neighbor_check(NewMembers, Role) > >>> false -> > >>> sleep(Timeout) > >>> neighbor_check(Members, Role) > >>> end > >>> > >>> > >>> Description: > >>> > >>> Nodes add themselves to a membership list for each role they > participate > >>> in. The membership list has a version stamp. It's there to ensure that > the > >>> watch that is created during the update would find any change occurring > >>> since their update. > >>> > >>> tx(...) is pseudocode for "runs in a transaction" > >>> > >>> neighbor_check() is how entries for dead workers are cleaned up. Each > node > >>> will monitor its neighbor's status. If it sees the neighbor has stopped > >>> responding it will remove it from the list. That will update the > membership > >>> list and will fire the watch. Everyone will notice and rescan their > >>> replicator docs. > >>> > >>> fire_callbacks() is just reporting to the replicator app that > membership > >>> has changed it and might need to rescan. On top of this code currently > >>> there is a cluster stability logic that waits a bit before rescanning > in > >>> case there is a flurry of node membership changes. Like say on rolling > node > >>> reboots or cluster startup. > >>> > >>> I am not entirely sure on the semantics of watches and how lightweight > or > >>> heavyweight they are. Creating a watch and a version stamp will > hopefully > >>> not lose updates. That is, all updates after that transaction's watch > will > >>> fire the watch. Watches seem to have limits and then I think we'd need > to > >>> revert to polling > >>> > https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789 > >>> which make sense but wondering if we should just start with polling > first > >>> and larger poll intervals. I guess it depends on how many other places > we'd > >>> have use watches and if we'd ever come close the even needing to handle > >>> that error case. > >>> > >>> > >>> What does everyone think? The idea would to be turn the proposal from > 2) > >>> into an RFC but wanted to open it for a general discussion and see what > >>> everythone thought about it. > >> > >> > > -- > Professional Support for Apache CouchDB: > https://neighbourhood.ie/couchdb-support/ > >