This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e284b6e780bd1010be83b32c8fbae770a7841133 Author: Robert Newson <[email protected]> AuthorDate: Tue Apr 1 18:36:54 2025 +0100 peer checkpoints for map/reduce including cleanup --- src/couch_mrview/src/couch_mrview_cleanup.erl | 36 ++++++++++++++- src/couch_mrview/src/couch_mrview_index.erl | 21 ++++++++- src/couch_mrview/src/couch_mrview_util.erl | 8 ++++ src/fabric/src/fabric.erl | 12 +++-- src/fabric/src/fabric_drop_seq.erl | 64 +++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 6 deletions(-) diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl index 5b5afbdce..25c887f46 100644 --- a/src/couch_mrview/src/couch_mrview_cleanup.erl +++ b/src/couch_mrview/src/couch_mrview_cleanup.erl @@ -15,10 +15,12 @@ -export([ run/1, cleanup_purges/3, - cleanup_indices/2 + cleanup_indices/2, + cleanup_peer_checkpoints/2 ]). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). run(Db) -> Indices = couch_mrview_util:get_index_files(Db), @@ -43,6 +45,38 @@ cleanup_indices(#{} = Sigs, #{} = IndexMap) -> maps:map(Fun, maps:without(maps:keys(Sigs), IndexMap)), ok. +cleanup_peer_checkpoints(DbName, Sigs) -> + MrArgs = #mrargs{ + view_type = map, + include_docs = true, + start_key = <<"_local/peer-checkpoint-mrview-">>, + end_key = <<"_local/peer-checkpoint-mrview.">>, + extra = [ + {include_system, true}, + {namespace, <<"_local">>} + ] + }, + {ok, {Sigs, DocsToDelete}} = fabric:all_docs( + DbName, fun cleanup_peer_checkpoints_cb/2, {Sigs, []}, MrArgs + ), + {ok, _} = fabric:update_docs(DbName, DocsToDelete, [?ADMIN_CTX]). + +cleanup_peer_checkpoints_cb({row, Row}, {KeepSigs, DocsToDelete} = Acc) -> + {doc, JsonDoc} = lists:keyfind(doc, 1, Row), + Doc = couch_doc:from_json_obj(JsonDoc), + #doc{ + id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-mrview-", SigHash/binary>> + } = Doc, + [Sig, _Hash] = binary:split(SigHash, <<"-">>), + case maps:is_key(Sig, KeepSigs) of + true -> + {ok, Acc}; + false -> + {ok, {KeepSigs, [Doc#doc{deleted = true, body = {[]}} | DocsToDelete]}} + end; +cleanup_peer_checkpoints_cb(_Else, Acc) -> + {ok, Acc}. + delete_file(File) -> RootDir = couch_index_util:root_dir(), couch_log:debug("~p : deleting inactive index : ~s", [?MODULE, File]), diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 51777480c..dbcbf8d1d 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -129,12 +129,14 @@ open(Db, State0) -> NewSt = init_and_upgrade_state(Db, Fd, State, Header), ok = commit(NewSt), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; % end of upgrade code for <= 2.x {ok, {Sig, Header}} -> % Matching view signatures. NewSt = init_and_upgrade_state(Db, Fd, State, Header), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), check_collator_versions(DbName, NewSt), {ok, NewSt}; {ok, {WrongSig, _}} -> @@ -144,6 +146,7 @@ open(Db, State0) -> ), NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; {ok, Else} -> couch_log:error( @@ -152,10 +155,12 @@ open(Db, State0) -> ), NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; no_valid_header -> NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt} end; {error, Reason} = Error -> @@ -210,7 +215,14 @@ finish_update(State) -> commit(State) -> Header = {State#mrst.sig, couch_mrview_util:make_header(State)}, - couch_file:write_header(State#mrst.fd, Header). + ok = couch_file:sync(State#mrst.fd), + ok = couch_file:write_header(State#mrst.fd, Header), + ok = couch_file:sync(State#mrst.fd), + fabric_drop_seq:update_peer_checkpoint_doc( + State#mrst.db_name, + couch_mrview_util:peer_checkpoint_id(State), + State#mrst.update_seq + ). compact(Db, State, Opts) -> couch_mrview_compactor:compact(Db, State, Opts). @@ -295,6 +307,13 @@ ensure_local_purge_doc(Db, #mrst{} = State) -> ok end. +ensure_peer_checkpoint_doc(#mrst{} = State) -> + fabric_drop_seq:create_peer_checkpoint_doc_if_missing( + State#mrst.db_name, + couch_mrview_util:peer_checkpoint_id(State), + State#mrst.update_seq + ). + create_local_purge_doc(Db, State) -> PurgeSeq = couch_db:get_purge_seq(Db), update_local_purge_doc(Db, State, PurgeSeq). diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index a478685da..c20a6a6ce 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -36,6 +36,7 @@ -export([get_collator_versions/1]). -export([compact_on_collator_upgrade/0]). -export([commit_on_header_upgrade/0]). +-export([peer_checkpoint_id/1]). -define(MOD, couch_mrview_index). -define(GET_VIEW_RETRY_COUNT, 1). @@ -1355,3 +1356,10 @@ compact_on_collator_upgrade() -> commit_on_header_upgrade() -> config:get_boolean("view_upgrade", "commit_on_header_upgrade", true). + +peer_checkpoint_id(#mrst{} = State) -> + Sig = couch_util:encodeBase64Url(State#mrst.sig), + Hash = couch_util:encodeBase64Url( + crypto:hash(sha256, [atom_to_binary(node()), $0, State#mrst.db_name]) + ), + <<"mrview-", Sig/binary, "-", Hash/binary>>. diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 92f30e4e8..595580742 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -586,18 +586,19 @@ cleanup_index_files() -> cleanup_index_files(DbName) -> try ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))], - cleanup_local_indices_and_purge_checkpoints(ShardNames) + Sigs = couch_mrview_util:get_signatures(hd(ShardNames)), + cleanup_local_indices_and_purge_checkpoints(Sigs, ShardNames), + ok = cleanup_peer_checkpoints(DbName, Sigs) catch error:database_does_not_exist -> ok end. -cleanup_local_indices_and_purge_checkpoints([]) -> +cleanup_local_indices_and_purge_checkpoints(_Sigs, []) -> ok; -cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) -> +cleanup_local_indices_and_purge_checkpoints(Sigs, [_ | _] = Dbs) -> AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs), AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs), - Sigs = couch_mrview_util:get_signatures(hd(Dbs)), ok = cleanup_purges(Sigs, AllPurges, Dbs), ok = cleanup_indices(Sigs, AllIndices). @@ -614,6 +615,9 @@ cleanup_indices(Sigs, AllIndices) -> end, lists:foreach(Fun, AllIndices). +cleanup_peer_checkpoints(DbName, Sigs) -> + couch_mrview_cleanup:cleanup_peer_checkpoints(DbName, Sigs). + %% @doc clean up index files for a specific db on all nodes -spec cleanup_index_files_all_nodes(dbname()) -> [reference()]. cleanup_index_files_all_nodes(DbName) -> diff --git a/src/fabric/src/fabric_drop_seq.erl b/src/fabric/src/fabric_drop_seq.erl index c00ea24ea..cc5907605 100644 --- a/src/fabric/src/fabric_drop_seq.erl +++ b/src/fabric/src/fabric_drop_seq.erl @@ -6,6 +6,11 @@ -export([go/1]). +-export([ + create_peer_checkpoint_doc_if_missing/3, + update_peer_checkpoint_doc/3 +]). + -type range() :: [non_neg_integer()]. -type uuid() :: binary(). @@ -222,3 +227,62 @@ latest_shard_sync_checkpoints(ShardSyncHistory) -> #{}, ShardSyncHistory ). + +create_peer_checkpoint_doc_if_missing(DbName, PeerId, UpdateSeq) when + is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq) +-> + create_peer_checkpoint_doc_if_missing(DbName, PeerId, pack_seq(DbName, UpdateSeq)); +create_peer_checkpoint_doc_if_missing(DbName, PeerId, UpdateSeq) when + is_binary(DbName), is_binary(PeerId), is_binary(UpdateSeq) +-> + {_, Ref} = spawn_monitor(fun() -> + case fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(PeerId), [?ADMIN_CTX]) of + {ok, _} -> + ok; + {not_found, _} -> + update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq); + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end + end), + receive + {'DOWN', Ref, _, _, ok} -> + ok; + {'DOWN', Ref, _, _, Else} -> + Else + end. + +update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq) when + is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq) +-> + update_peer_checkpoint_doc(DbName, PeerId, pack_seq(DbName, UpdateSeq)); +update_peer_checkpoint_doc(DbName, PeerId, UpdateSeq) when + is_binary(DbName), is_binary(PeerId), is_binary(UpdateSeq) +-> + Doc = peer_checkpoint_doc(PeerId, UpdateSeq), + {_, Ref} = spawn_monitor(fun() -> + case fabric:update_doc(mem3:dbname(DbName), Doc, [?ADMIN_CTX]) of + {ok, _} -> + ok; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end + end), + receive + {'DOWN', Ref, _, _, ok} -> + ok; + {'DOWN', Ref, _, _, Else} -> + Else + end. + +peer_checkpoint_doc(PeerId, UpdateSeq) when is_binary(PeerId), is_binary(UpdateSeq) -> + #doc{id = peer_checkpoint_id(PeerId), body = {[{<<"update_seq">>, UpdateSeq}]}}. + +peer_checkpoint_id(PeerId) -> + <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", PeerId/binary>>. + +pack_seq(DbName, UpdateSeq) -> + DbUuid = couch_util:with_db(DbName, fun(Db) -> couch_db:get_uuid(Db) end), + Seq0 = [node(), mem3:range(DbName), {UpdateSeq, DbUuid, node()}], + Seq1 = couch_util:encodeBase64Url(?term_to_bin(Seq0, [compressed])), + <<(integer_to_binary(UpdateSeq))/binary, $-, Seq1/binary>>.
