iilyak commented on code in PR #5558:
URL: https://github.com/apache/couchdb/pull/5558#discussion_r2159520903


##########
src/fabric/src/fabric_drop_seq.erl:
##########
@@ -0,0 +1,1020 @@
+-module(fabric_drop_seq).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-export([go/1]).
+
+-export([
+    create_peer_checkpoint_doc_if_missing/5,
+    update_peer_checkpoint_doc/5,
+    cleanup_peer_checkpoint_docs/3,
+    peer_checkpoint_doc/4,
+    peer_id_from_sig/2
+]).
+
+%% rpc
+-export([gather_drop_seq_info_rpc/1]).
+
+-type range() :: [non_neg_integer()].
+
+-type uuid() :: binary().
+
+-type seq() :: non_neg_integer().
+
+-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}.
+
+-type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), 
Seq :: seq()}}.
+
+-type history_item() :: {
+    SourceUuid :: uuid(), SourceSeq :: seq(), TargetUuid :: uuid(), TargetSeq 
:: seq()
+}.
+
+-type shard_sync_history() :: #{
+    {Range :: range(), SourceNode :: node(), TargetNode :: node()} => 
[history_item()]
+}.
+
+-define(START_KEY(SubType), <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", 
SubType/binary, "-">>).
+-define(END_KEY(SubType), <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", 
SubType/binary, ".">>).
+
+go(DbName) ->
+    Shards0 = mem3:shards(DbName),
+    case gather_drop_seq_info(Shards0) of
+        {error, Reason} ->
+            {error, Reason};
+        {ok, #{
+            uuid_map := UuidMap,
+            peer_checkpoints := PeerCheckpoints,
+            shard_sync_history := ShardSyncHistory
+        }} ->
+            Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
+            DropSeqs = calculate_drop_seqs(
+                Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory
+            ),
+            Workers = lists:filtermap(
+                fun(Shard) ->
+                    #shard{range = Range, node = Node, name = ShardName} = 
Shard,
+                    case maps:find({Range, Node}, DropSeqs) of
+                        {ok, {_UuidPrefix, 0}} ->
+                            false;
+                        {ok, {UuidPrefix, DropSeq}} ->
+                            Ref = rexi:cast(
+                                Node,
+                                {fabric_rpc, set_drop_seq, [
+                                    ShardName, UuidPrefix, DropSeq, 
[?ADMIN_CTX]
+                                ]}
+                            ),
+                            {true, Shard#shard{ref = Ref, opts = [{drop_seq, 
DropSeq}]}};
+                        error ->
+                            false
+                    end
+                end,
+                Shards1
+            ),
+            if
+                Workers == [] ->
+                    %% nothing to do
+                    {ok, #{}};
+                true ->
+                    RexiMon = fabric_util:create_monitors(Shards1),
+                    Acc0 = {#{}, length(Workers) - 1},
+                    try
+                        case
+                            fabric_util:recv(
+                                Workers, #shard.ref, fun 
handle_set_drop_seq_reply/3, Acc0
+                            )
+                        of
+                            {ok, Results} ->
+                                {ok, Results};
+                            {timeout, {WorkersDict, _}} ->
+                                DefunctWorkers = 
fabric_util:remove_done_workers(
+                                    WorkersDict,
+                                    nil
+                                ),
+                                fabric_util:log_timeout(
+                                    DefunctWorkers,
+                                    "set_drop_seq"
+                                ),
+                                {error, timeout};
+                            {error, Reason} ->
+                                {error, Reason}
+                        end
+                    after
+                        rexi_monitor:stop(RexiMon)
+                    end
+            end
+    end.
+
+-spec calculate_drop_seqs([#shard{}], uuid_map(), peer_checkpoints(), 
shard_sync_history()) ->
+    peer_checkpoints().
+calculate_drop_seqs(Shards, UuidMap, PeerCheckpoints0, ShardSyncHistory) ->
+    PeerCheckpoints1 = substitute_splits(Shards, UuidMap, PeerCheckpoints0),
+    PeerCheckpoints2 = crossref(PeerCheckpoints1, ShardSyncHistory),
+    ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
+    maps:merge_with(fun merge_peers/3, PeerCheckpoints2, ShardSyncCheckpoints).
+
+handle_set_drop_seq_reply(ok, Worker, {Results0, Waiting}) ->
+    DropSeq = proplists:get_value(drop_seq, Worker#shard.opts),
+    [B, E] = Worker#shard.range,
+    BHex = couch_util:to_hex_bin(<<B:32/integer>>),
+    EHex = couch_util:to_hex_bin(<<E:32/integer>>),
+    Range = <<BHex/binary, "-", EHex/binary>>,
+    Results1 = maps:merge_with(
+        fun(_Key, Val1, Val2) ->
+            maps:merge(Val1, Val2)
+        end,
+        Results0,
+        #{Range => #{Worker#shard.node => DropSeq}}
+    ),
+    if
+        Waiting == 0 ->
+            {stop, Results1};
+        true ->
+            {ok, {Results1, Waiting - 1}}
+    end;
+handle_set_drop_seq_reply(Error, _, _Acc) ->
+    {error, Error}.
+
+crossref(PeerCheckpoints0, ShardSyncHistory) ->
+    PeerCheckpoints1 = maps:fold(
+        fun({Range, Node}, {Uuid, Seq}, Acc1) ->
+            Others = maps:filter(
+                fun({R, S, _T}, _History) -> R == Range andalso S == Node end, 
ShardSyncHistory
+            ),
+            if
+                Seq == 0 ->
+                    %% propogate any 0 checkpoint as they would not be
+                    %% matched in shard sync history.
+                    maps:fold(
+                        fun({R, _S, T}, _History, Acc2) ->
+                            maps:merge_with(fun merge_peers/3, #{{R, T} => 
{<<>>, 0}}, Acc2)
+                        end,
+                        Acc1,
+                        Others
+                    );
+                true ->
+                    maps:fold(
+                        fun({R, _S, T}, History, Acc2) ->
+                            case
+                                lists:search(
+                                    fun({SU, SS, _TU, _TS}) ->
+                                        uuids_match([Uuid, SU]) andalso SS =< 
Seq
+                                    end,
+                                    History
+                                )
+                            of
+                                {value, {_SU, _SS, TU, TS}} ->
+                                    maps:merge_with(fun merge_peers/3, #{{R, 
T} => {TU, TS}}, Acc2);
+                                false ->
+                                    Acc2
+                            end
+                        end,
+                        Acc1,
+                        Others
+                    )
+            end
+        end,
+        PeerCheckpoints0,
+        PeerCheckpoints0
+    ),
+
+    %% mem3 sync is not hub-spoke, each iteration of crossref will add
+    %% some crossreferences. we call it again if the map changes as new
+    %% crossreferences may be possible.
+    if
+        PeerCheckpoints0 == PeerCheckpoints1 ->
+            PeerCheckpoints1;
+        true ->
+            crossref(PeerCheckpoints1, ShardSyncHistory)
+    end.
+
+%% return only the shards that have synced to by every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+    lists:filter(
+        fun(#shard{range = Range, node = Node}) ->
+            ExpectedPeers = [
+                S#shard.node
+             || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+            ],
+            ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+            lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, 
ExpectedKeys)
+        end,
+        Shards
+    ).
+
+-spec gather_drop_seq_info(Shards :: [#shard{}]) ->
+    {ok, peer_checkpoints(), shard_sync_history()} | {error, term()}.
+gather_drop_seq_info([#shard{} | _] = Shards) ->
+    Workers = fabric_util:submit_jobs(
+        Shards, ?MODULE, gather_drop_seq_info_rpc, []
+    ),
+    RexiMon = fabric_util:create_monitors(Workers),
+    Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history => 
#{}},
+    try
+        case
+            rexi_utils:recv(
+                Workers,
+                #shard.ref,
+                fun gather_drop_seq_info_cb/3,
+                {Acc0, length(Workers) - 1},
+                fabric_util:request_timeout(),
+                infinity
+            )
+        of
+            {ok, Result} ->
+                {ok, Result};
+            {timeout, _State} ->
+                {error, timeout};
+            {error, Reason} ->
+                {error, Reason}
+        end
+    after
+        rexi_monitor:stop(RexiMon),
+        fabric_streams:cleanup(Workers)
+    end.
+
+gather_drop_seq_info_rpc(DbName) ->
+    case couch_db:open_int(DbName, []) of
+        {ok, Db} ->
+            try
+                Uuid = couch_db:get_uuid(Db),
+                Seq = couch_db:get_committed_update_seq(Db),
+                Range = mem3:range(DbName),
+                Acc0 = {#{{Range, node()} => {Uuid, Seq}}, #{}},
+                {ok, {PeerCheckpoints, ShardSyncHistory}} = 
couch_db:fold_local_docs(
+                    Db, fun gather_drop_seq_info_fun/2, Acc0, []
+                ),
+                rexi:reply(
+                    {ok, #{
+                        uuid => Uuid,
+                        seq => Seq,
+                        peer_checkpoints => PeerCheckpoints,
+                        shard_sync_history => ShardSyncHistory
+                    }}
+                )
+            after
+                couch_db:close(Db)
+            end;
+        Else ->
+            rexi:reply(Else)
+    end.
+
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc,
+    {PeerCheckpoints0, ShardSyncHistory} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"update_seq">>, Props) of
+        undefined ->
+            {ok, Acc};
+        UpdateSeq ->
+            PeerCheckpoints1 = maps:merge_with(
+                fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
+            ),
+            {ok, {PeerCheckpoints1, ShardSyncHistory}}
+    end;
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc,
+    {PeerCheckpoints, ShardSyncHistory0} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"dbname">>, Props) of
+        undefined ->
+            %% not yet upgraded with new property
+            {ok, Acc};
+        DbName ->
+            Range = mem3:range(DbName),
+            {[{_SrcNode, History}]} = couch_util:get_value(<<"history">>, 
Props),
+            KeyFun = fun({Item}) ->
+                {Range, 
binary_to_existing_atom(couch_util:get_value(<<"source_node">>, Item)),
+                    
binary_to_existing_atom(couch_util:get_value(<<"target_node">>, Item))}
+            end,
+            ValueFun = fun({Item}) ->
+                {
+                    couch_util:get_value(<<"source_uuid">>, Item),
+                    couch_util:get_value(<<"source_seq">>, Item),
+                    couch_util:get_value(<<"target_uuid">>, Item),
+                    couch_util:get_value(<<"target_seq">>, Item)
+                }
+            end,
+            ShardSyncHistory1 = maps:merge(
+                maps:groups_from_list(KeyFun, ValueFun, History), 
ShardSyncHistory0
+            ),
+            {ok, {PeerCheckpoints, ShardSyncHistory1}}
+    end;
+gather_drop_seq_info_fun(#doc{}, Acc) ->
+    %% ignored
+    {ok, Acc}.
+
+gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) ->
+    MergedInfo = merge_info(Worker, Info, Acc),
+    if
+        Count == 0 ->
+            {stop, MergedInfo};
+        true ->
+            {ok, {MergedInfo, Count - 1}}
+    end;
+gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}}.
+
+merge_info(#shard{} = Shard, Info, Acc) ->
+    #{
+        uuid_map =>
+            maps:put(
+                {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info), 
maps:get(uuid_map, Acc)
+            ),
+        peer_checkpoints => maps:merge_with(
+            fun merge_peers/3,
+            maps:get(peer_checkpoints, Info),
+            maps:get(peer_checkpoints, Acc)
+        ),
+        shard_sync_history => maps:merge(
+            maps:get(shard_sync_history, Info), maps:get(shard_sync_history, 
Acc)
+        )
+    }.
+
+merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
+    is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
+->
+    ?assert(uuids_match([Uuid1, Uuid2]), "UUIDs belong to different shard 
files"),
+    {Uuid1, min(Val1, Val2)}.
+
+uuids_match(Uuids) when is_list(Uuids) ->
+    PrefixLen = lists:min([byte_size(Uuid) || Uuid <- Uuids]),
+    binary:longest_common_prefix(Uuids) == PrefixLen.
+
+decode_seq(OpaqueSeq) ->
+    Decoded = fabric_view_changes:decode_seq(OpaqueSeq),
+    lists:foldl(
+        fun
+            ({_Node, [S, E], {Seq, Uuid, Node}}, Acc) when
+                is_integer(S),
+                is_integer(E),
+                is_integer(Seq),
+                S >= 0,
+                E > S,
+                Seq >= 0,
+                is_binary(Uuid),
+                is_atom(Node)
+            ->
+                Acc#{{[S, E], Node} => {Uuid, Seq}};
+            (_Else, Acc) ->
+                Acc
+        end,
+        #{},
+        Decoded
+    ).
+
+latest_shard_sync_checkpoints(ShardSyncHistory) ->
+    maps:fold(
+        fun({R, SN, _TN}, History, Acc) ->
+            {SU, SS, _TU, _TS} = hd(History),
+            maps:merge_with(fun merge_peers/3, #{{R, SN} => {SU, SS}}, Acc)
+        end,
+        #{},
+        ShardSyncHistory
+    ).
+
+%% A shard may have been split since a peer saw it.
+-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> 
peer_checkpoints().
+substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
+    maps:fold(
+        fun({[PS, PE], Node}, {Uuid, Seq}, Acc) ->
+            ShardsInRange = [

Review Comment:
   We could re-use this functionality from mem3.
   
   ```erlang
   %% mem3.erl
   
   filter_shards_by_range(Range, Shards) ->
       mem3_shards:filter_shards_by_range(Range, Shards)
   
   %% mem3_shards.erl
   
   -export([filter_shards_by_range/2]).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to