I quite like the idea that Adam brought up of having a global queue that
allows workers to pull replication jobs from. That means that on a cluster
with a lot of replications we could spin up a lot of workers for a short
period of time to complete all replications before decreasing the workers
again.

On Thu, Apr 11, 2019 at 10:16 AM Jan Lehnardt <j...@apache.org> wrote:

> An advanced implementation could maintain lists of [{node: n1, ratio:
> 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins
> can signify the ratio of jobs any given node should be handling. Of course
> this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure
> there are clever ways out of that. It might also be too complicated to get
> started with this, but it might be worth using a structured list, to allow
> extensions later.
>
> * * *
>
> Nick, I like your proposal. I was briefly worrying about Adam’s suggestion
> to add a fast-path for the replicator, because I use a similar owner model
> for the db-per-user server, and thought I wanted to advocate for a more
> general framework that can be used for various types of jobs (something
> that the Roles variable in the model already allows), but I now agree that
> the importance of making replication as efficient as possible is a
> worthwhile optimisation vs. the other jobs we already have, or could
> imagine running in the future.
>
> Best
> Jan
> —
>
> > On 11. Apr 2019, at 01:51, Robert Newson <rnew...@apache.org> wrote:
> >
> > As long as any given replicator node can grab as much work as it can
> handle, It doesn't need to be 'fair' in the way we currently do it. The
> notion of an 'owner' node drops away imo. As nick mentions, the fun part is
> recognising when jobs become unowned due to a resource failure somewhere
> but this is a very standard thing, a pool of workers competing over jobs.
> >
> > --
> >  Robert Samuel Newson
> >  rnew...@apache.org
> >
> > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
> >> Hi Nick,
> >>
> >> Good stuff. On the first topic, I wonder if it makes sense to use a
> >> dedicated code path for updates to _replicator DB docs, one that would
> >> automatically register these replication jobs in a queue in FDB as part
> >> of the update transaction. That’d save the overhead and latency of
> >> listening to _db_updates and then each _replicator DB’s _changes feed
> >> just to discover these updates (and then presumably create jobs in a
> >> job queue anyway).
> >>
> >> On the second topic — is it important for each node declaring the
> >> replicator role to receive an equal allotment of jobs and manage its
> >> own queue, or can the replication worker processes on each node just
> >> grab the next job off the global queue in FDB whenever they free up? I
> >> could see the latter approach decreasing the tail latency for job
> >> execution, and I think there are good patterns for managing high
> >> contention dequeue operations in the case where we’ve got more worker
> >> processes than jobs to run.
> >>
> >> Regardless, you make a good point about paying special attention to
> >> liveness checking now that we’re not relying on Erlang distribution for
> >> that purpose. I didn’t grok all the details of approach you have in
> >> mind for that yet because I wanted to bring up these two points above
> >> and get your perspective.
> >>
> >> Adam
> >>
> >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <vatam...@apache.org>
> wrote:
> >>>
> >>> I was thinking how replication would work with FDB and so far there
> are two
> >>> main issues I believe would need to be addressed. One deals with how we
> >>> monitor _replicator db docs for changes, and other one is how
> replication
> >>> jobs coordinate so we don't run multiple replication jobs for the same
> >>> replication document in a cluster.
> >>>
> >>> 1) Shard-level vs fabric-level notifications for _replicator db docs
> >>>
> >>> Currently replicator is monitoring and updating individual _replicator
> >>> shards. Change notifications are done via change feeds (normal,
> >>> non-continuous) and couch event server callbacks.
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180
> ,
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246
> .
> >>> With fdb we'd have to get these updates via a fabric changes feeds and
> rely
> >>> on the global _db_updates. That could result in a performance impact
> and
> >>> would be something to keep an eye on.
> >>>
> >>> 2) Replicator job coordination
> >>>
> >>> Replicator has a basic constraint that there should be only one
> replication
> >>> job running for each replicator doc per cluster.
> >>>
> >>> Each replication currently has a single "owner" node. The owner is
> picked
> >>> to be one of 3 nodes were the _replicator doc shards live. If nodes
> connect
> >>> or disconnect, replicator will reshuffle replication jobs and some
> nodes
> >>> will stop running jobs that they don't "own" anymore and then proceed
> to
> >>> "rescan" all the replicator docs to possibly start new ones. However,
> with
> >>> fdb, there are no connected erlang nodes and no shards. All
> coordination
> >>> happens via fdb, so we'd have to somehow coordinate replication job
> >>> ownership through there.
> >>>
> >>> For discussion, here is a proposal for a worker registration layer do
> that
> >>> job coordination:
> >>>
> >>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
> >>> that they can take on certain "roles". "replicator" would be one such
> role.
> >>> And so, for each role there is a list of nodes. Each node picks a
> fraction
> >>> of jobs based on how many other nodes of the same role are in the list.
> >>> When membership changes, nodes which are alive might have to pick up
> new
> >>> jobs or stop running existing jobs since they'd be started by other
> nodes.
> >>>
> >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3].
> n1 is
> >>> currently down so the membership list is [n2, n3]. If there are 60
> >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes
> online
> >>> and adds itself to the roles list, which now looks like [n1, n2, n3].
> n1
> >>> then picks 20 replication jobs. At about the same time n2 and n3
> notice n1
> >>> is online and decide to stop running the jobs that n1 would pick up and
> >>> they each would end up running roughly 20 jobs.
> >>>
> >>> The difficulty here comes from maintaining liveliness. A node could
> stop at
> >>> any time without removing itself from the membership list of its roles.
> >>> That means all of the sudden a subset of jobs would stop running
> without
> >>> anyone picking them up. So, the idea is to have nodes periodically
> update
> >>> their health status in fdb to indicate they are alive. Once a node
> doesn't
> >>> update its status often enough it will be considered dead and others
> can
> >>> pick up its share of jobs.
> >>>
> >>> To start the discussion, I sketched this data layout and pseudocode:
> >>>
> >>> Data layout:
> >>>
> >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> >>> ...])
> >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> >>> Timestamp)
> >>>
> >>> Role : In our case it would be "replicator", but it could be any other
> role.
> >>>
> >>> WId : are workers IDs. These should unique identify workers. It would
> be
> >>> nice if it could be persisted, such that a worker doing a quick restart
> >>> will end up with the same id and the membership list won't change.
> However,
> >>> a random UUID would work as well.
> >>>
> >>> Timeout : This is the timeout declared by the nodes themselves. These
> need
> >>> not be the same for all node. Some nodes might decide they run slower
> so
> >>> their timeouts would be larger. But they essentially promise to update
> >>> their health status at least that often.
> >>>
> >>> Timestamp: The time of the last health report from that node.
> Timestamps
> >>> technically might not be needed as neighbor monitors could remember the
> >>> time delta between when it saw changes to the health values' version
> stamp.
> >>>
> >>> Pseudocode:
> >>>
> >>> init(Role) ->
> >>>   Members = tx(add_to_members(self(), Role)
> >>>   spawn health_ping(Members, Role)
> >>>   spawn neighbor_check(Members, Role)
> >>>   loop()
> >>>
> >>> terminate() ->
> >>>   tx(remove_self_from_members_and_health_list())
> >>>
> >>> loop() ->
> >>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
> >>>   receive
> >>>   {Watch, NewMembers} ->
> >>>       case diff(Members, NewMembers) of
> >>>       no_diff ->
> >>>           ok;
> >>>       {Added, Removed} ->
> >>>           update_neighbor_check(NewMembers)
> >>>           fire_callbacks(Added, Removed)
> >>>   end,
> >>>   loop()
> >>>
> >>> health_ping(Members, Role) ->
> >>>  tx(update_health(Role, self(), Timestamp))
> >>>  sleep(Timeout / 3)
> >>>  health_ping(Members, Role)
> >>>
> >>> neighbor_check(Members, Role) ->
> >>> Neighbor = next_in_list(self(), Members)
> >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> >>> case now() - Timestamp > Timeout of
> >>> true ->
> >>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
> >>>     neighbor_check(NewMembers, Role)
> >>> false ->
> >>>     sleep(Timeout)
> >>>     neighbor_check(Members, Role)
> >>> end
> >>>
> >>>
> >>> Description:
> >>>
> >>> Nodes add themselves to a membership list for each role they
> participate
> >>> in. The membership list has a version stamp. It's there to ensure that
> the
> >>> watch that is created during the update would find any change occurring
> >>> since their update.
> >>>
> >>> tx(...) is pseudocode for "runs in a transaction"
> >>>
> >>> neighbor_check() is how entries for dead workers are cleaned up. Each
> node
> >>> will monitor its neighbor's status. If it sees the neighbor has stopped
> >>> responding it will remove it from the list. That will update the
> membership
> >>> list and will fire the watch. Everyone will notice and rescan their
> >>> replicator docs.
> >>>
> >>> fire_callbacks() is just reporting to the replicator app that
> membership
> >>> has changed it and might need to rescan. On top of this code currently
> >>> there is a cluster stability logic that waits a bit before rescanning
> in
> >>> case there is a flurry of node membership changes. Like say on rolling
> node
> >>> reboots or cluster startup.
> >>>
> >>> I am not entirely sure on the semantics of watches and how lightweight
> or
> >>> heavyweight they are. Creating a watch and a version stamp will
> hopefully
> >>> not lose updates. That is, all updates after that transaction's watch
> will
> >>> fire the watch. Watches seem to have limits and then I think we'd need
> to
> >>> revert to polling
> >>>
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> >>> which make sense but wondering if we should just start with polling
> first
> >>> and larger poll intervals. I guess it depends on how many other places
> we'd
> >>> have use watches and if we'd ever come close the even needing to handle
> >>> that error case.
> >>>
> >>>
> >>> What does everyone think? The idea would to be turn the proposal from
> 2)
> >>> into an RFC but wanted to open it for a general discussion and see what
> >>> everythone thought about it.
> >>
> >>
>
> --
> Professional Support for Apache CouchDB:
> https://neighbourhood.ie/couchdb-support/
>
>

Reply via email to