rnewson commented on code in PR #5558:
URL: https://github.com/apache/couchdb/pull/5558#discussion_r2157093052


##########
src/fabric/src/fabric_drop_seq.erl:
##########
@@ -0,0 +1,1016 @@
+-module(fabric_drop_seq).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([go/1]).
+
+-export([
+    create_peer_checkpoint_doc_if_missing/5,
+    update_peer_checkpoint_doc/5,
+    cleanup_peer_checkpoint_docs/3,
+    peer_checkpoint_doc/4,
+    peer_id_from_sig/2
+]).
+
+%% rpc
+-export([gather_drop_seq_info_rpc/1]).
+
+-type range() :: [non_neg_integer()].
+
+-type uuid() :: binary().
+
+-type seq() :: non_neg_integer().
+
+-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}.
+
+-type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), 
Seq :: seq()}}.
+
+-type history_item() :: {
+    SourceUuid :: uuid(), SourceSeq :: seq(), TargetUuid :: uuid(), TargetSeq 
:: seq()
+}.
+
+-type shard_sync_history() :: #{
+    {Range :: range(), SourceNode :: node(), TargetNode :: node()} => 
[history_item()]
+}.
+
+go(DbName) ->
+    Shards0 = mem3:shards(DbName),
+    case gather_drop_seq_info(Shards0) of
+        {error, Reason} ->
+            {error, Reason};
+        {ok, #{
+            uuid_map := UuidMap,
+            peer_checkpoints := PeerCheckpoints,
+            shard_sync_history := ShardSyncHistory
+        }} ->
+            Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
+            DropSeqs = calculate_drop_seqs(
+                Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory
+            ),
+            Workers = lists:filtermap(
+                fun(Shard) ->
+                    #shard{range = Range, node = Node, name = ShardName} = 
Shard,
+                    case maps:find({Range, Node}, DropSeqs) of
+                        {ok, {_UuidPrefix, 0}} ->
+                            false;
+                        {ok, {UuidPrefix, DropSeq}} ->
+                            Ref = rexi:cast(
+                                Node,
+                                {fabric_rpc, set_drop_seq, [
+                                    ShardName, UuidPrefix, DropSeq, 
[?ADMIN_CTX]
+                                ]}
+                            ),
+                            {true, Shard#shard{ref = Ref, opts = [{drop_seq, 
DropSeq}]}};
+                        error ->
+                            false
+                    end
+                end,
+                Shards1
+            ),
+            if
+                Workers == [] ->
+                    %% nothing to do
+                    {ok, #{}};
+                true ->
+                    RexiMon = fabric_util:create_monitors(Shards1),
+                    Acc0 = {#{}, length(Workers) - 1},
+                    try
+                        case
+                            fabric_util:recv(
+                                Workers, #shard.ref, fun 
handle_set_drop_seq_reply/3, Acc0
+                            )
+                        of
+                            {ok, Results} ->
+                                {ok, Results};
+                            {timeout, {WorkersDict, _}} ->
+                                DefunctWorkers = 
fabric_util:remove_done_workers(
+                                    WorkersDict,
+                                    nil
+                                ),
+                                fabric_util:log_timeout(
+                                    DefunctWorkers,
+                                    "set_drop_seq"
+                                ),
+                                {error, timeout};
+                            {error, Reason} ->
+                                {error, Reason}
+                        end
+                    after
+                        rexi_monitor:stop(RexiMon)
+                    end
+            end
+    end.
+
+-spec calculate_drop_seqs([#shard{}], uuid_map(), peer_checkpoints(), 
shard_sync_history()) ->
+    peer_checkpoints().
+calculate_drop_seqs(Shards, UuidMap, PeerCheckpoints0, ShardSyncHistory) ->
+    PeerCheckpoints1 = substitute_splits(Shards, UuidMap, PeerCheckpoints0),
+    PeerCheckpoints2 = crossref(PeerCheckpoints1, ShardSyncHistory),
+    ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
+    maps:merge_with(fun merge_peers/3, PeerCheckpoints2, ShardSyncCheckpoints).
+
+handle_set_drop_seq_reply(ok, Worker, {Results0, Waiting}) ->
+    DropSeq = proplists:get_value(drop_seq, Worker#shard.opts),
+    [B, E] = Worker#shard.range,
+    BHex = couch_util:to_hex(<<B:32/integer>>),
+    EHex = couch_util:to_hex(<<E:32/integer>>),
+    Range = list_to_binary([BHex, "-", EHex]),
+    Results1 = maps:merge_with(
+        fun(_Key, Val1, Val2) ->
+            maps:merge(Val1, Val2)
+        end,
+        Results0,
+        #{Range => #{Worker#shard.node => DropSeq}}
+    ),
+    if
+        Waiting == 0 ->
+            {stop, Results1};
+        true ->
+            {ok, {Results1, Waiting - 1}}
+    end;
+handle_set_drop_seq_reply(Error, _, _Acc) ->
+    {error, Error}.
+
+crossref(PeerCheckpoints0, ShardSyncHistory) ->
+    PeerCheckpoints1 = maps:fold(
+        fun({Range, Node}, {Uuid, Seq}, Acc1) ->
+            Others = maps:filter(
+                fun({R, S, _T}, _History) -> R == Range andalso S == Node end, 
ShardSyncHistory
+            ),
+            if
+                Seq == 0 ->
+                    %% propogate any 0 checkpoint as they would not be
+                    %% matched in shard sync history.
+                    maps:fold(
+                        fun({R, _S, T}, _History, Acc2) ->
+                            maps:merge_with(fun merge_peers/3, #{{R, T} => 
{<<>>, 0}}, Acc2)
+                        end,
+                        Acc1,
+                        Others
+                    );
+                true ->
+                    maps:fold(
+                        fun({R, _S, T}, History, Acc2) ->
+                            case
+                                lists:search(
+                                    fun({SU, SS, _TU, _TS}) ->
+                                        uuids_match([Uuid, SU]) andalso SS =< 
Seq
+                                    end,
+                                    History
+                                )
+                            of
+                                {value, {_SU, _SS, TU, TS}} ->
+                                    maps:merge_with(fun merge_peers/3, #{{R, 
T} => {TU, TS}}, Acc2);
+                                false ->
+                                    Acc2
+                            end
+                        end,
+                        Acc1,
+                        Others
+                    )
+            end
+        end,
+        PeerCheckpoints0,
+        PeerCheckpoints0
+    ),
+
+    %% mem3 sync is not hub-spoke, each iteration of crossref will add
+    %% some crossreferences. we call it again if the map changes as new
+    %% crossreferences may be possible.
+    if
+        PeerCheckpoints0 == PeerCheckpoints1 ->
+            PeerCheckpoints1;
+        true ->
+            crossref(PeerCheckpoints1, ShardSyncHistory)
+    end.
+
+%% return only the shards that have synced to by every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+    lists:filter(
+        fun(#shard{range = Range, node = Node}) ->
+            ExpectedPeers = [
+                S#shard.node
+             || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+            ],
+            ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+            lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, 
ExpectedKeys)
+        end,
+        Shards
+    ).
+
+-spec gather_drop_seq_info(Shards :: [#shard{}]) ->
+    {ok, peer_checkpoints(), shard_sync_history()} | {error, term()}.
+gather_drop_seq_info([#shard{} | _] = Shards) ->
+    Workers = fabric_util:submit_jobs(
+        Shards, ?MODULE, gather_drop_seq_info_rpc, []
+    ),
+    RexiMon = fabric_util:create_monitors(Workers),
+    Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history => 
#{}},
+    try
+        case
+            rexi_utils:recv(
+                Workers,
+                #shard.ref,
+                fun gather_drop_seq_info_cb/3,
+                {Acc0, length(Workers) - 1},
+                fabric_util:request_timeout(),
+                infinity
+            )
+        of
+            {ok, Result} ->
+                {ok, Result};
+            {timeout, _State} ->
+                {error, timeout};
+            {error, Reason} ->
+                {error, Reason}
+        end
+    after
+        rexi_monitor:stop(RexiMon),
+        fabric_streams:cleanup(Workers)
+    end.
+
+gather_drop_seq_info_rpc(DbName) ->
+    case couch_db:open_int(DbName, []) of
+        {ok, Db} ->
+            try
+                Uuid = couch_db:get_uuid(Db),
+                Seq = couch_db:get_committed_update_seq(Db),
+                Range = mem3:range(DbName),
+                Acc0 = {#{{Range, node()} => {Uuid, Seq}}, #{}},
+                {ok, {PeerCheckpoints, ShardSyncHistory}} = 
couch_db:fold_local_docs(
+                    Db, fun gather_drop_seq_info_fun/2, Acc0, []
+                ),
+                rexi:reply(
+                    {ok, #{
+                        uuid => Uuid,
+                        seq => Seq,
+                        peer_checkpoints => PeerCheckpoints,
+                        shard_sync_history => ShardSyncHistory
+                    }}
+                )
+            after
+                couch_db:close(Db)
+            end;
+        Else ->
+            rexi:reply(Else)
+    end.
+
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc,
+    {PeerCheckpoints0, ShardSyncHistory} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"update_seq">>, Props) of
+        undefined ->
+            {ok, Acc};
+        UpdateSeq ->
+            PeerCheckpoints1 = maps:merge_with(
+                fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
+            ),
+            {ok, {PeerCheckpoints1, ShardSyncHistory}}
+    end;
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc,
+    {PeerCheckpoints, ShardSyncHistory0} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"dbname">>, Props) of
+        undefined ->
+            %% not yet upgraded with new property
+            {ok, Acc};
+        DbName ->
+            Range = mem3:range(DbName),
+            {[{_SrcNode, History}]} = couch_util:get_value(<<"history">>, 
Props),
+            KeyFun = fun({Item}) ->
+                {Range, 
binary_to_existing_atom(couch_util:get_value(<<"source_node">>, Item)),
+                    
binary_to_existing_atom(couch_util:get_value(<<"target_node">>, Item))}
+            end,
+            ValueFun = fun({Item}) ->
+                {
+                    couch_util:get_value(<<"source_uuid">>, Item),
+                    couch_util:get_value(<<"source_seq">>, Item),
+                    couch_util:get_value(<<"target_uuid">>, Item),
+                    couch_util:get_value(<<"target_seq">>, Item)
+                }
+            end,
+            ShardSyncHistory1 = maps:merge(
+                maps:groups_from_list(KeyFun, ValueFun, History), 
ShardSyncHistory0
+            ),
+            {ok, {PeerCheckpoints, ShardSyncHistory1}}
+    end;
+gather_drop_seq_info_fun(#doc{}, Acc) ->
+    %% ignored
+    {ok, Acc}.
+
+gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) ->
+    MergedInfo = merge_info(Worker, Info, Acc),
+    if
+        Count == 0 ->
+            {stop, MergedInfo};
+        true ->
+            {ok, {MergedInfo, Count - 1}}
+    end;
+gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}}.
+
+merge_info(#shard{} = Shard, Info, Acc) ->
+    #{
+        uuid_map =>
+            maps:put(
+                {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info), 
maps:get(uuid_map, Acc)
+            ),
+        peer_checkpoints => maps:merge_with(
+            fun merge_peers/3,
+            maps:get(peer_checkpoints, Info),
+            maps:get(peer_checkpoints, Acc)
+        ),
+        shard_sync_history => maps:merge(
+            maps:get(shard_sync_history, Info), maps:get(shard_sync_history, 
Acc)
+        )
+    }.
+
+merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
+    is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
+->
+    true = uuids_match([Uuid1, Uuid2]),
+    {Uuid1, min(Val1, Val2)}.
+
+uuids_match(Uuids) when is_list(Uuids) ->
+    PrefixLen = lists:min([byte_size(Uuid) || Uuid <- Uuids]),
+    binary:longest_common_prefix(Uuids) == PrefixLen.
+
+decode_seq(OpaqueSeq) ->
+    Decoded = fabric_view_changes:decode_seq(OpaqueSeq),
+    lists:foldl(
+        fun
+            ({_Node, [S, E], {Seq, Uuid, Node}}, Acc) when
+                is_integer(S),
+                is_integer(E),
+                is_integer(Seq),
+                S >= 0,
+                E > S,
+                Seq >= 0,
+                is_binary(Uuid),
+                is_atom(Node)
+            ->
+                Acc#{{[S, E], Node} => {Uuid, Seq}};
+            (_Else, Acc) ->
+                Acc
+        end,
+        #{},
+        Decoded
+    ).
+
+latest_shard_sync_checkpoints(ShardSyncHistory) ->
+    maps:fold(
+        fun({R, SN, _TN}, History, Acc) ->
+            {SU, SS, _TU, _TS} = hd(History),
+            maps:merge_with(fun merge_peers/3, #{{R, SN} => {SU, SS}}, Acc)
+        end,
+        #{},
+        ShardSyncHistory
+    ).
+
+%% A shard may have been split since a peer saw it.
+-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> 
peer_checkpoints().
+substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
+    maps:fold(
+        fun({[PS, PE], Node}, {Uuid, Seq}, Acc) ->
+            ShardsInRange = [
+                S
+             || #shard{range = [SS, SE]} = S <- Shards,
+                Node == S#shard.node,
+                SS >= PS andalso SE =< PE
+            ],
+            %% lookup uuid from map if substituted
+            AsMap = maps:from_list(
+                lists:filtermap(
+                    fun(#shard{} = Shard) ->
+                        Key = {Shard#shard.range, Shard#shard.node},
+                        if
+                            [PS, PE] == Shard#shard.range ->
+                                {true, {Key, {Uuid, Seq}}};
+                            true ->
+                                case maps:find(Key, UuidMap) of
+                                    {ok, SubstUuid} ->
+                                        {true, {Key, {SubstUuid, Seq}}};
+                                    error ->
+                                        false
+                                end
+                        end
+                    end,
+                    ShardsInRange
+                )
+            ),
+            maps:merge_with(fun merge_peers/3, AsMap, Acc)
+        end,
+        #{},
+        PeerCheckpoints
+    ).
+
+create_peer_checkpoint_doc_if_missing(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq)
+->
+    create_peer_checkpoint_doc_if_missing(
+        DbName, Subtype, Source, PeerId, pack_seq(DbName, UpdateSeq)
+    );
+create_peer_checkpoint_doc_if_missing(
+    DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    ok;
+create_peer_checkpoint_doc_if_missing(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_binary(UpdateSeq)
+->
+    {_, Ref} = spawn_monitor(fun() ->
+        case
+            fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, 
PeerId), [?ADMIN_CTX])
+        of
+            {ok, _} ->
+                ok;
+            {not_found, _} ->
+                update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, 
UpdateSeq);
+            {error, Reason} ->
+                throw({checkpoint_commit_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', Ref, _, _, ok} ->
+            ok;
+        {'DOWN', Ref, _, _, Else} ->
+            Else
+    end.
+
+update_peer_checkpoint_doc(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, 
pack_seq(DbName, UpdateSeq));
+update_peer_checkpoint_doc(
+    DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    ok;
+update_peer_checkpoint_doc(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_binary(UpdateSeq)
+->
+    {_, OpenRef} = spawn_monitor(fun() ->
+        case
+            fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, 
PeerId), [?ADMIN_CTX])
+        of
+            {ok, ExistingDoc} ->
+                exit(ExistingDoc#doc.revs);
+            {not_found, _Reason} ->
+                exit({0, []});
+            {error, Reason} ->
+                throw({checkpoint_fetch_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', OpenRef, _, _, Revs} ->
+            NewDoc0 = peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq),
+            NewDoc1 = NewDoc0#doc{revs = Revs},
+            {_, UpdateRef} = spawn_monitor(fun() ->
+                case fabric:update_doc(mem3:dbname(DbName), NewDoc1, 
[?ADMIN_CTX]) of
+                    {ok, _} ->
+                        couch_log:notice(
+                            "updated peer checkpoint for db:~s, subtype:~s, 
source:~s, peer_id:~s, update_seq:~s",
+                            [
+                                DbName, Subtype, Source, PeerId, UpdateSeq
+                            ]
+                        ),
+                        ok;
+                    {error, Reason} ->
+                        throw({checkpoint_commit_failure, Reason})
+                end
+            end),
+            receive
+                {'DOWN', UpdateRef, _, _, ok} ->
+                    ok;
+                {'DOWN', UpdateRef, _, _, Else} ->
+                    Else
+            end;
+        {'DOWN', OpenRef, _, _, not_found} ->
+            ok;
+        {'DOWN', OpenRef, _, _, Else} ->
+            Else
+    end.
+
+peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq) when
+    is_binary(PeerId), is_binary(Subtype), is_binary(Source), 
is_binary(UpdateSeq)
+->
+    #doc{
+        id = peer_checkpoint_id(Subtype, PeerId),
+        body =
+            {[
+                {<<"type">>, <<"peer-checkpoint">>},
+                {<<"subtype">>, Subtype},
+                {<<"source">>, Source},
+                {<<"update_seq">>, UpdateSeq},
+                {<<"last_updated">>, ?l2b(couch_log_util:iso8601_timestamp())}
+            ]}
+    }.
+
+peer_checkpoint_id(Subtype, PeerId) ->
+    <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", Subtype/binary, "-", 
PeerId/binary>>.
+
+peer_id_from_sig(DbName, Sig) when is_binary(DbName), is_binary(Sig) ->
+    Hash = couch_util:encodeBase64Url(
+        crypto:hash(sha256, [atom_to_binary(node()), $0, DbName])
+    ),
+    <<Sig/binary, "$", Hash/binary>>.
+
+pack_seq(DbName, UpdateSeq) ->
+    PrefixLen = fabric_util:get_uuid_prefix_len(),
+    DbUuid = couch_util:with_db(DbName, fun(Db) -> couch_db:get_uuid(Db) end),
+    fabric_view_changes:pack_seqs(
+        [
+            {
+                #shard{node = node(), range = mem3:range(DbName)},
+                {UpdateSeq, binary:part(DbUuid, {0, PrefixLen}), node()}
+            }
+        ]
+    ).
+
+cleanup_peer_checkpoint_docs(DbName, SubType, KeepSigs) when
+    is_binary(DbName), is_binary(SubType), is_list(KeepSigs)
+->
+    MrArgs = #mrargs{
+        view_type = map,
+        include_docs = true,
+        start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, 
"-">>,

Review Comment:
   yes, I like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to