I was thinking how replication would work with FDB and so far there are two
main issues I believe would need to be addressed. One deals with how we
monitor _replicator db docs for changes, and other one is how replication
jobs coordinate so we don't run multiple replication jobs for the same
replication document in a cluster.

1) Shard-level vs fabric-level notifications for _replicator db docs

Currently replicator is monitoring and updating individual _replicator
shards. Change notifications are done via change feeds (normal,
non-continuous) and couch event server callbacks.
https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
With fdb we'd have to get these updates via a fabric changes feeds and rely
on the global _db_updates. That could result in a performance impact and
would be something to keep an eye on.

2) Replicator job coordination

Replicator has a basic constraint that there should be only one replication
job running for each replicator doc per cluster.

Each replication currently has a single "owner" node. The owner is picked
to be one of 3 nodes were the _replicator doc shards live. If nodes connect
or disconnect, replicator will reshuffle replication jobs and some nodes
will stop running jobs that they don't "own" anymore and then proceed to
"rescan" all the replicator docs to possibly start new ones. However, with
fdb, there are no connected erlang nodes and no shards. All coordination
happens via fdb, so we'd have to somehow coordinate replication job
ownership through there.

For discussion, here is a proposal for a worker registration layer do that
job coordination:

The basic idea is erlang fabric nodes would declare, by writing to fdb,
that they can take on certain "roles". "replicator" would be one such role.
And so, for each role there is a list of nodes. Each node picks a fraction
of jobs based on how many other nodes of the same role are in the list.
When membership changes, nodes which are alive might have to pick up new
jobs or stop running existing jobs since they'd be started by other nodes.

For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
currently down so the membership list is [n2, n3]. If there are 60
replication jobs then n2 might run 30, and n3 another 30. n1 comes online
and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
then picks 20 replication jobs. At about the same time n2 and n3 notice n1
is online and decide to stop running the jobs that n1 would pick up and
they each would end up running roughly 20 jobs.

The difficulty here comes from maintaining liveliness. A node could stop at
any time without removing itself from the membership list of its roles.
That means all of the sudden a subset of jobs would stop running without
anyone picking them up. So, the idea is to have nodes periodically update
their health status in fdb to indicate they are alive. Once a node doesn't
update its status often enough it will be considered dead and others can
pick up its share of jobs.

To start the discussion, I sketched this data layout and pseudocode:

Data layout:

("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
...])
("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
Timestamp)

Role : In our case it would be "replicator", but it could be any other role.

WId : are workers IDs. These should unique identify workers. It would be
nice if it could be persisted, such that a worker doing a quick restart
will end up with the same id and the membership list won't change. However,
a random UUID would work as well.

Timeout : This is the timeout declared by the nodes themselves. These need
not be the same for all node. Some nodes might decide they run slower so
their timeouts would be larger. But they essentially promise to update
their health status at least that often.

Timestamp: The time of the last health report from that node. Timestamps
technically might not be needed as neighbor monitors could remember the
time delta between when it saw changes to the health values' version stamp.

Pseudocode:

init(Role) ->
    Members = tx(add_to_members(self(), Role)
    spawn health_ping(Members, Role)
    spawn neighbor_check(Members, Role)
    loop()

terminate() ->
    tx(remove_self_from_members_and_health_list())

loop() ->
    {Members, Watch} = tx(add_members(self(), Role), get_watch())
    receive
    {Watch, NewMembers} ->
        case diff(Members, NewMembers) of
        no_diff ->
            ok;
        {Added, Removed} ->
            update_neighbor_check(NewMembers)
            fire_callbacks(Added, Removed)
    end,
    loop()

health_ping(Members, Role) ->
   tx(update_health(Role, self(), Timestamp))
   sleep(Timeout / 3)
   health_ping(Members, Role)

neighbor_check(Members, Role) ->
  Neighbor = next_in_list(self(), Members)
  {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
  case now() - Timestamp > Timeout of
  true ->
      NewMembers = Tx(remove_neighbor(Neighbor, Role))
      neighbor_check(NewMembers, Role)
  false ->
      sleep(Timeout)
      neighbor_check(Members, Role)
  end


Description:

Nodes add themselves to a membership list for each role they participate
in. The membership list has a version stamp. It's there to ensure that the
watch that is created during the update would find any change occurring
since their update.

tx(...) is pseudocode for "runs in a transaction"

neighbor_check() is how entries for dead workers are cleaned up. Each node
will monitor its neighbor's status. If it sees the neighbor has stopped
responding it will remove it from the list. That will update the membership
list and will fire the watch. Everyone will notice and rescan their
replicator docs.

fire_callbacks() is just reporting to the replicator app that membership
has changed it and might need to rescan. On top of this code currently
there is a cluster stability logic that waits a bit before rescanning in
case there is a flurry of node membership changes. Like say on rolling node
reboots or cluster startup.

I am not entirely sure on the semantics of watches and how lightweight or
heavyweight they are. Creating a watch and a version stamp will hopefully
not lose updates. That is, all updates after that transaction's watch will
fire the watch. Watches seem to have limits and then I think we'd need to
revert to polling
https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
which make sense but wondering if we should just start with polling first
and larger poll intervals. I guess it depends on how many other places we'd
have use watches and if we'd ever come close the even needing to handle
that error case.


What does everyone think? The idea would to be turn the proposal from 2)
into an RFC but wanted to open it for a general discussion and see what
everythone thought about it.

Reply via email to