This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-purge in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit afa2e94796036f9b192887188eae9b7ebbca181b Author: Robert Newson <rnew...@apache.org> AuthorDate: Fri May 19 16:42:17 2023 +0100 support clustered purge in nouveau --- src/nouveau/src/nouveau_api.erl | 13 ++- src/nouveau/src/nouveau_epi.erl | 5 +- src/nouveau/src/nouveau_index_updater.erl | 119 +++++++++++++++++++++------- src/nouveau/src/nouveau_plugin_couch_db.erl | 24 ++++++ src/nouveau/src/nouveau_util.erl | 101 +++++++++++++++++++++++ 5 files changed, 233 insertions(+), 29 deletions(-) diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 7744eb161..c78fb37be 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -24,6 +24,7 @@ delete_path/1, delete_path/2, delete_doc/3, + purge_doc/3, update_doc/5, search/2 ]). @@ -97,7 +98,17 @@ delete_path(Path, Exclusions) when delete_doc(#index{} = Index, DocId, UpdateSeq) when is_binary(DocId), is_integer(UpdateSeq) -> - ReqBody = {[{<<"seq">>, UpdateSeq}]}, + delete_doc(Index, DocId, UpdateSeq, false). + +purge_doc(#index{} = Index, DocId, PurgeSeq) when + is_binary(DocId), is_integer(PurgeSeq) +-> + delete_doc(Index, DocId, PurgeSeq, true). + +delete_doc(#index{} = Index, DocId, Seq, IsPurge) when + is_binary(DocId), is_integer(Seq), is_boolean(IsPurge) +-> + ReqBody = #{seq => Seq, purge => IsPurge}, Resp = send_if_enabled( doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody) ), diff --git a/src/nouveau/src/nouveau_epi.erl b/src/nouveau/src/nouveau_epi.erl index f42e17970..9f34ae579 100644 --- a/src/nouveau/src/nouveau_epi.erl +++ b/src/nouveau/src/nouveau_epi.erl @@ -31,7 +31,10 @@ app() -> nouveau. providers() -> - [{chttpd_handlers, nouveau_httpd_handlers}]. + [ + {couch_db, nouveau_plugin_couch_db}, + {chttpd_handlers, nouveau_httpd_handlers} + ]. services() -> []. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index a78b1ff15..8115ddfd5 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -28,9 +28,10 @@ outdated(#index{} = Index) -> case open_or_create_index(Index) of - {ok, IndexSeq} -> - DbSeq = get_db_seq(Index), - DbSeq > IndexSeq; + {ok, #{} = Info} -> + #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info, + {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index), + DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq; {error, Reason} -> {error, Reason} end. @@ -38,11 +39,14 @@ outdated(#index{} = Index) -> update(#index{} = Index) -> {ok, Db} = couch_db:open_int(Index#index.dbname, []), try - case open_or_create_index(Index) of + case open_or_create_index(Db, Index) of {error, Reason} -> exit({error, Reason}); - {ok, CurSeq} -> - TotalChanges = couch_db:count_changes_since(Db, CurSeq), + {ok, #{} = Info} -> + #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info, + ChangesSince = couch_db:count_changes_since(Db, IndexUpdateSeq), + PurgesSince = couch_db:get_purge_seq(Db) - IndexPurgeSeq, + TotalChanges = ChangesSince + PurgesSince, couch_task_status:add_task([ {type, search_indexer}, {database, Index#index.dbname}, @@ -56,11 +60,13 @@ update(#index{} = Index) -> %% update status every half second couch_task_status:set_update_frequency(500), + {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq), + Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), - Acc0 = {Db, Index, Proc, 0, TotalChanges}, - {ok, _} = couch_db:fold_changes(Db, CurSeq, fun load_docs/2, Acc0, []) + Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs}, + {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []) after ret_os_process(Proc) end @@ -71,14 +77,20 @@ update(#index{} = Index) -> load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; -load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) -> +load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) -> couch_task_status:update([ {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges} ]), - DI = couch_doc:to_doc_info(FDI), - #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, + #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, + case lists:member({Id, Rev}, ExcludeIdRevs) of + true -> ok; + false -> update_or_delete_index(Db, Index, DI, Proc) + end, + {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}. +update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) -> + #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> ok = nouveau_api:delete_doc(Index, Id, Seq); @@ -104,17 +116,16 @@ load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) -> exit({error, Reason}) end end - end, - {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}. + end. open_or_create_index(#index{} = Index) -> - case get_index_seq(Index) of - {ok, UpdateSeq} -> - {ok, UpdateSeq}; + case nouveau_api:index_info(Index) of + {ok, #{} = Info} -> + {ok, Info}; {error, {not_found, _}} -> case nouveau_api:create_index(Index, index_definition(Index)) of ok -> - {ok, 0}; + nouveau_api:index_info(Index); {error, Reason} -> {error, Reason} end; @@ -122,24 +133,78 @@ open_or_create_index(#index{} = Index) -> {error, Reason} end. -get_db_seq(#index{} = Index) -> +open_or_create_index(Db, #index{} = Index) -> + case open_or_create_index(Index) of + {ok, #{} = Info} -> + nouveau_util:maybe_create_local_purge_doc(Db, Index), + {ok, Info}; + Else -> + Else + end. + +get_db_info(#index{} = Index) -> {ok, Db} = couch_db:open_int(Index#index.dbname, []), try - couch_db:get_update_seq(Db) + UpdateSeq = couch_db:get_update_seq(Db), + PurgeSeq = couch_db:get_purge_seq(Db), + {UpdateSeq, PurgeSeq} after couch_db:close(Db) end. -get_index_seq(#index{} = Index) -> - case nouveau_api:index_info(Index) of - {ok, #{<<"update_seq">> := Seq}} -> - {ok, Seq}; - {error, Reason} -> - {error, Reason} - end. - index_definition(#index{} = Index) -> #{ <<"default_analyzer">> => Index#index.default_analyzer, <<"field_analyzers">> => Index#index.field_analyzers }. + +purge_index(Db, Index, IndexPurgeSeq) -> + Proc = get_os_process(Index#index.def_lang), + try + true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), + FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) -> + Acc0 = + case couch_db:get_full_doc_info(Db, Id) of + not_found -> + ok = nouveau_api:purge_doc(Index, Id, PurgeSeq), + Acc; + FDI -> + DI = couch_doc:to_doc_info(FDI), + #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, + case lists:member({Id, Rev}, Acc) of + true -> + Acc; + false -> + update_or_delete_index(Db, Index, DI, Proc), + [{Id, Rev} | Acc] + end + end, + update_task(1), + {ok, {Acc0, PurgeSeq}} + end, + + {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos( + Db, IndexPurgeSeq, FoldFun, {[], 0}, [] + ), + update_local_doc(Db, Index, NewPurgeSeq), + {ok, ExcludeList} + after + ret_os_process(Proc) + end. + +update_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = + case Total of + 0 -> + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]). + +update_local_doc(Db, #index{} = Index, PurgeSeq) -> + DocId = nouveau_util:get_local_purge_doc_id(Index#index.sig), + DocContent = nouveau_util:get_local_purge_doc_body(DocId, PurgeSeq, Index), + couch_db:update_doc(Db, DocContent, []). diff --git a/src/nouveau/src/nouveau_plugin_couch_db.erl b/src/nouveau/src/nouveau_plugin_couch_db.erl new file mode 100644 index 000000000..dcd3ae1f1 --- /dev/null +++ b/src/nouveau/src/nouveau_plugin_couch_db.erl @@ -0,0 +1,24 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(nouveau_plugin_couch_db). + +-export([ + is_valid_purge_client/2, + on_compact/2 +]). + +is_valid_purge_client(DbName, Props) -> + nouveau_util:verify_index_exists(DbName, Props). + +on_compact(DbName, DDocs) -> + nouveau_util:ensure_local_purge_docs(DbName, DDocs). diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl index b7da7e802..148c42c36 100644 --- a/src/nouveau/src/nouveau_util.erl +++ b/src/nouveau/src/nouveau_util.erl @@ -22,6 +22,11 @@ index_name/1, design_doc_to_indexes/2, design_doc_to_index/3, + verify_index_exists/2, + ensure_local_purge_docs/2, + maybe_create_local_purge_doc/2, + get_local_purge_doc_id/1, + get_local_purge_doc_body/3, nouveau_url/0 ]). @@ -93,5 +98,101 @@ design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) -> {error, InvalidDDocError} end. +verify_index_exists(DbName, Props) -> + try + Type = couch_util:get_value(<<"type">>, Props), + if + Type =/= <<"nouveau">> -> + false; + true -> + DDocId = couch_util:get_value(<<"ddoc_id">>, Props), + IndexName = couch_util:get_value(<<"indexname">>, Props), + Sig = couch_util:get_value(<<"signature">>, Props), + couch_util:with_db(DbName, fun(Db) -> + case couch_db:get_design_doc(Db, DDocId) of + {ok, #doc{} = DDoc} -> + {ok, IdxState} = design_doc_to_index( + DbName, DDoc, IndexName + ), + IdxState#index.sig == Sig; + {not_found, _} -> + false + end + end) + end + catch + _:_ -> + false + end. + +ensure_local_purge_docs(DbName, DDocs) -> + couch_util:with_db(DbName, fun(Db) -> + lists:foreach( + fun(DDoc) -> + #doc{body = {Props}} = DDoc, + case couch_util:get_value(<<"indexes">>, Props) of + undefined -> + false; + _ -> + try design_doc_to_indexes(DbName, DDoc) of + SIndexes -> ensure_local_purge_doc(Db, SIndexes) + catch + _:_ -> + ok + end + end + end, + DDocs + ) + end). + +ensure_local_purge_doc(Db, SIndexes) -> + if + SIndexes =/= [] -> + lists:map( + fun(SIndex) -> + maybe_create_local_purge_doc(Db, SIndex) + end, + SIndexes + ); + true -> + ok + end. + +maybe_create_local_purge_doc(Db, Index) -> + DocId = get_local_purge_doc_id(Index#index.sig), + case couch_db:open_doc(Db, DocId) of + {not_found, _} -> + DbPurgeSeq = couch_db:get_purge_seq(Db), + DocContent = get_local_purge_doc_body( + DocId, DbPurgeSeq, Index + ), + couch_db:update_doc(Db, DocContent, []); + _ -> + ok + end. + +get_local_purge_doc_id(Sig) -> + iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]). + +get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) -> + #index{ + name = IdxName, + ddoc_id = DDocId, + sig = Sig + } = Index, + NowSecs = os:system_time(second), + JsonList = + {[ + {<<"_id">>, LocalDocId}, + {<<"purge_seq">>, PurgeSeq}, + {<<"updated_on">>, NowSecs}, + {<<"indexname">>, IdxName}, + {<<"ddoc_id">>, DDocId}, + {<<"signature">>, Sig}, + {<<"type">>, <<"nouveau">>} + ]}, + couch_doc:from_json_obj(JsonList). + nouveau_url() -> config:get("nouveau", "url", "http://127.0.0.1:8080").