This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit e284b6e780bd1010be83b32c8fbae770a7841133
Author: Robert Newson <[email protected]>
AuthorDate: Tue Apr 1 18:36:54 2025 +0100

    peer checkpoints for map/reduce including cleanup
---
 src/couch_mrview/src/couch_mrview_cleanup.erl | 36 ++++++++++++++-
 src/couch_mrview/src/couch_mrview_index.erl   | 21 ++++++++-
 src/couch_mrview/src/couch_mrview_util.erl    |  8 ++++
 src/fabric/src/fabric.erl                     | 12 +++--
 src/fabric/src/fabric_drop_seq.erl            | 64 +++++++++++++++++++++++++++
 5 files changed, 135 insertions(+), 6 deletions(-)

diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl 
b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 5b5afbdce..25c887f46 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -15,10 +15,12 @@
 -export([
     run/1,
     cleanup_purges/3,
-    cleanup_indices/2
+    cleanup_indices/2,
+    cleanup_peer_checkpoints/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 
 run(Db) ->
     Indices = couch_mrview_util:get_index_files(Db),
@@ -43,6 +45,38 @@ cleanup_indices(#{} = Sigs, #{} = IndexMap) ->
     maps:map(Fun, maps:without(maps:keys(Sigs), IndexMap)),
     ok.
 
+cleanup_peer_checkpoints(DbName, Sigs) ->
+    MrArgs = #mrargs{
+        view_type = map,
+        include_docs = true,
+        start_key = <<"_local/peer-checkpoint-mrview-">>,
+        end_key = <<"_local/peer-checkpoint-mrview.">>,
+        extra = [
+            {include_system, true},
+            {namespace, <<"_local">>}
+        ]
+    },
+    {ok, {Sigs, DocsToDelete}} = fabric:all_docs(
+        DbName, fun cleanup_peer_checkpoints_cb/2, {Sigs, []}, MrArgs
+    ),
+    {ok, _} = fabric:update_docs(DbName, DocsToDelete, [?ADMIN_CTX]).
+
+cleanup_peer_checkpoints_cb({row, Row}, {KeepSigs, DocsToDelete} = Acc) ->
+    {doc, JsonDoc} = lists:keyfind(doc, 1, Row),
+    Doc = couch_doc:from_json_obj(JsonDoc),
+    #doc{
+        id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-mrview-", SigHash/binary>>
+    } = Doc,
+    [Sig, _Hash] = binary:split(SigHash, <<"-">>),
+    case maps:is_key(Sig, KeepSigs) of
+        true ->
+            {ok, Acc};
+        false ->
+            {ok, {KeepSigs, [Doc#doc{deleted = true, body = {[]}} | 
DocsToDelete]}}
+    end;
+cleanup_peer_checkpoints_cb(_Else, Acc) ->
+    {ok, Acc}.
+
 delete_file(File) ->
     RootDir = couch_index_util:root_dir(),
     couch_log:debug("~p : deleting inactive index : ~s", [?MODULE, File]),
diff --git a/src/couch_mrview/src/couch_mrview_index.erl 
b/src/couch_mrview/src/couch_mrview_index.erl
index 51777480c..dbcbf8d1d 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -129,12 +129,14 @@ open(Db, State0) ->
                     NewSt = init_and_upgrade_state(Db, Fd, State, Header),
                     ok = commit(NewSt),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 % end of upgrade code for <= 2.x
                 {ok, {Sig, Header}} ->
                     % Matching view signatures.
                     NewSt = init_and_upgrade_state(Db, Fd, State, Header),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     check_collator_versions(DbName, NewSt),
                     {ok, NewSt};
                 {ok, {WrongSig, _}} ->
@@ -144,6 +146,7 @@ open(Db, State0) ->
                     ),
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 {ok, Else} ->
                     couch_log:error(
@@ -152,10 +155,12 @@ open(Db, State0) ->
                     ),
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 no_valid_header ->
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt}
             end;
         {error, Reason} = Error ->
@@ -210,7 +215,14 @@ finish_update(State) ->
 
 commit(State) ->
     Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
-    couch_file:write_header(State#mrst.fd, Header).
+    ok = couch_file:sync(State#mrst.fd),
+    ok = couch_file:write_header(State#mrst.fd, Header),
+    ok = couch_file:sync(State#mrst.fd),
+    fabric_drop_seq:update_peer_checkpoint_doc(
+        State#mrst.db_name,
+        couch_mrview_util:peer_checkpoint_id(State),
+        State#mrst.update_seq
+    ).
 
 compact(Db, State, Opts) ->
     couch_mrview_compactor:compact(Db, State, Opts).
@@ -295,6 +307,13 @@ ensure_local_purge_doc(Db, #mrst{} = State) ->
             ok
     end.
 
+ensure_peer_checkpoint_doc(#mrst{} = State) ->
+    fabric_drop_seq:create_peer_checkpoint_doc_if_missing(
+        State#mrst.db_name,
+        couch_mrview_util:peer_checkpoint_id(State),
+        State#mrst.update_seq
+    ).
+
 create_local_purge_doc(Db, State) ->
     PurgeSeq = couch_db:get_purge_seq(Db),
     update_local_purge_doc(Db, State, PurgeSeq).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl 
b/src/couch_mrview/src/couch_mrview_util.erl
index a478685da..c20a6a6ce 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -36,6 +36,7 @@
 -export([get_collator_versions/1]).
 -export([compact_on_collator_upgrade/0]).
 -export([commit_on_header_upgrade/0]).
+-export([peer_checkpoint_id/1]).
 
 -define(MOD, couch_mrview_index).
 -define(GET_VIEW_RETRY_COUNT, 1).
@@ -1355,3 +1356,10 @@ compact_on_collator_upgrade() ->
 
 commit_on_header_upgrade() ->
     config:get_boolean("view_upgrade", "commit_on_header_upgrade", true).
+
+peer_checkpoint_id(#mrst{} = State) ->
+    Sig = couch_util:encodeBase64Url(State#mrst.sig),
+    Hash = couch_util:encodeBase64Url(
+        crypto:hash(sha256, [atom_to_binary(node()), $0, State#mrst.db_name])
+    ),
+    <<"mrview-", Sig/binary, "-", Hash/binary>>.
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 92f30e4e8..595580742 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -586,18 +586,19 @@ cleanup_index_files() ->
 cleanup_index_files(DbName) ->
     try
         ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))],
-        cleanup_local_indices_and_purge_checkpoints(ShardNames)
+        Sigs = couch_mrview_util:get_signatures(hd(ShardNames)),
+        cleanup_local_indices_and_purge_checkpoints(Sigs, ShardNames),
+        ok = cleanup_peer_checkpoints(DbName, Sigs)
     catch
         error:database_does_not_exist ->
             ok
     end.
 
-cleanup_local_indices_and_purge_checkpoints([]) ->
+cleanup_local_indices_and_purge_checkpoints(_Sigs, []) ->
     ok;
-cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) ->
+cleanup_local_indices_and_purge_checkpoints(Sigs, [_ | _] = Dbs) ->
     AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs),
     AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs),
-    Sigs = couch_mrview_util:get_signatures(hd(Dbs)),
     ok = cleanup_purges(Sigs, AllPurges, Dbs),
     ok = cleanup_indices(Sigs, AllIndices).
 
@@ -614,6 +615,9 @@ cleanup_indices(Sigs, AllIndices) ->
     end,
     lists:foreach(Fun, AllIndices).
 
+cleanup_peer_checkpoints(DbName, Sigs) ->
+    couch_mrview_cleanup:cleanup_peer_checkpoints(DbName, Sigs).
+
 %% @doc clean up index files for a specific db on all nodes
 -spec cleanup_index_files_all_nodes(dbname()) -> [reference()].
 cleanup_index_files_all_nodes(DbName) ->
diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index c00ea24ea..cc5907605 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -6,6 +6,11 @@
 
 -export([go/1]).
 
+-export([
+    create_peer_checkpoint_doc_if_missing/3,
+    update_peer_checkpoint_doc/3
+]).
+
 -type range() :: [non_neg_integer()].
 
 -type uuid() :: binary().
@@ -222,3 +227,62 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
         #{},
         ShardSyncHistory
     ).
+
+create_peer_checkpoint_doc_if_missing(DbName, PeerId, UpdateSeq) when
+    is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq)
+->
+    create_peer_checkpoint_doc_if_missing(DbName, PeerId, pack_seq(DbName, 
UpdateSeq));
+create_peer_checkpoint_doc_if_missing(DbName, PeerId, UpdateSeq) when
+    is_binary(DbName), is_binary(PeerId), is_binary(UpdateSeq)
+->
+    {_, Ref} = spawn_monitor(fun() ->
+        case fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(PeerId), 
[?ADMIN_CTX]) of
+            {ok, _} ->
+                ok;
+            {not_found, _} ->
+                update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq);
+            {error, Reason} ->
+                throw({checkpoint_commit_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', Ref, _, _, ok} ->
+            ok;
+        {'DOWN', Ref, _, _, Else} ->
+            Else
+    end.
+
+update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq) when
+    is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq)
+->
+    update_peer_checkpoint_doc(DbName, PeerId, pack_seq(DbName, UpdateSeq));
+update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq) when
+    is_binary(DbName), is_binary(PeerId), is_binary(UpdateSeq)
+->
+    Doc = peer_checkpoint_doc(PeerId, UpdateSeq),
+    {_, Ref} = spawn_monitor(fun() ->
+        case fabric:update_doc(mem3:dbname(DbName), Doc, [?ADMIN_CTX]) of
+            {ok, _} ->
+                ok;
+            {error, Reason} ->
+                throw({checkpoint_commit_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', Ref, _, _, ok} ->
+            ok;
+        {'DOWN', Ref, _, _, Else} ->
+            Else
+    end.
+
+peer_checkpoint_doc(PeerId, UpdateSeq) when is_binary(PeerId), 
is_binary(UpdateSeq) ->
+    #doc{id = peer_checkpoint_id(PeerId), body = {[{<<"update_seq">>, 
UpdateSeq}]}}.
+
+peer_checkpoint_id(PeerId) ->
+    <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", PeerId/binary>>.
+
+pack_seq(DbName, UpdateSeq) ->
+    DbUuid = couch_util:with_db(DbName, fun(Db) -> couch_db:get_uuid(Db) end),
+    Seq0 = [node(), mem3:range(DbName), {UpdateSeq, DbUuid, node()}],
+    Seq1 = couch_util:encodeBase64Url(?term_to_bin(Seq0, [compressed])),
+    <<(integer_to_binary(UpdateSeq))/binary, $-, Seq1/binary>>.

Reply via email to