This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor-2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d16071de595270b401100a2c395148896c9459b7 Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Tue Apr 24 12:24:10 2018 -0500 Implement new node local purge APIs Rewrite purge logic to use the new couch_db_engine purge APIs. This work will allow for the new purge behaviors to enable clustered purge. --- src/couch/priv/stats_descriptions.cfg | 12 +++ src/couch/src/couch_db.erl | 153 +++++++++++++++++++++++++++++-- src/couch/src/couch_db_updater.erl | 164 +++++++++++++++++----------------- src/couch/src/couch_httpd_db.erl | 23 +++-- 4 files changed, 258 insertions(+), 94 deletions(-) diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index f091978..bceb0ce 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -34,6 +34,10 @@ {type, counter}, {desc, <<"number of times a document was read from a database">>} ]}. +{[couchdb, database_purges], [ + {type, counter}, + {desc, <<"number of times a database was purged">>} +]}. {[couchdb, db_open_time], [ {type, histogram}, {desc, <<"milliseconds required to open a database">>} @@ -46,6 +50,10 @@ {type, counter}, {desc, <<"number of document write operations">>} ]}. +{[couchdb, document_purges], [ + {type, counter}, + {desc, <<"number of document purge operations">>} +]}. {[couchdb, local_document_writes], [ {type, counter}, {desc, <<"number of _local document write operations">>} @@ -74,6 +82,10 @@ {type, counter}, {desc, <<"number of clients for continuous _changes">>} ]}. +{[couchdb, httpd, purge_requests], [ + {type, counter}, + {desc, <<"number of purge requests">>} +]}. {[couchdb, httpd_request_methods, 'COPY'], [ {type, counter}, {desc, <<"number of HTTP COPY requests">>} diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 3449274..8592193 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -43,7 +43,6 @@ get_epochs/1, get_filepath/1, get_instance_start_time/1, - get_last_purged/1, get_pid/1, get_revs_limit/1, get_security/1, @@ -51,12 +50,15 @@ get_user_ctx/1, get_uuid/1, get_purge_seq/1, + get_oldest_purge_seq/1, + get_purge_infos_limit/1, is_db/1, is_system_db/1, is_clustered/1, set_revs_limit/2, + set_purge_infos_limit/2, set_security/2, set_user_ctx/2, @@ -75,6 +77,9 @@ get_full_doc_infos/2, get_missing_revs/2, get_design_docs/1, + get_purge_infos/2, + + get_minimum_purge_seq/1, update_doc/3, update_doc/4, @@ -84,6 +89,7 @@ delete_doc/3, purge_docs/2, + purge_docs/3, with_stream/3, open_write_stream/2, @@ -97,6 +103,8 @@ fold_changes/4, fold_changes/5, count_changes_since/2, + fold_purge_infos/4, + fold_purge_infos/5, calculate_start_seq/3, owner_of/2, @@ -369,8 +377,132 @@ get_full_doc_info(Db, Id) -> get_full_doc_infos(Db, Ids) -> couch_db_engine:open_docs(Db, Ids). -purge_docs(#db{main_pid=Pid}, IdsRevs) -> - gen_server:call(Pid, {purge_docs, IdsRevs}). +purge_docs(Db, IdRevs) -> + purge_docs(Db, IdRevs, []). + +-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) -> + {ok, [Reply]} when + UUId :: binary(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}, + PurgeOption :: interactive_edit | replicated_changes, + Reply :: {ok, []} | {ok, [Rev]}. +purge_docs(#db{main_pid = Pid} = Db, UUIdsIdsRevs, Options) -> + increment_stat(Db, [couchdb, database_purges]), + gen_server:call(Pid, {purge_docs, UUIdsIdsRevs, Options}). + +-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when + UUId :: binary(), + PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found, + PurgeSeq :: non_neg_integer(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}. +get_purge_infos(Db, UUIDs) -> + couch_db_engine:load_purge_infos(Db, UUIDs). + + +get_minimum_purge_seq(#db{} = Db) -> + PurgeSeq = couch_db_engine:get_purge_seq(Db), + OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db), + PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db), + + FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) -> + case DocId of + <<"_local/purge-", _/binary>> -> + ClientSeq = couch_util:get_value(<<"purge_seq">>, Props), + case ClientSeq of + CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit -> + {ok, SeqAcc}; + CS when is_integer(CS) -> + case purge_client_exists(Db, DocId, Props) of + true -> {ok, erlang:min(CS, SeqAcc)}; + false -> {ok, SeqAcc} + end; + _ -> + % If there's a broken doc we have to keep every + % purge info until the doc is fixed or removed. + Fmt = "Invalid purge doc '~s' with purge_seq '~w'", + couch_log:error(Fmt, [DocId, ClientSeq]), + {ok, erlang:min(OldestPurgeSeq, SeqAcc)} + end; + _ -> + {stop, SeqAcc} + end + end, + InitMinSeq = PurgeSeq - PurgeInfosLimit, + Opts = [ + {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")} + ], + {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts), + FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of + true -> MinIdxSeq; + false -> erlang:max(0, PurgeSeq - PurgeInfosLimit) + end, + % Log a warning if we've got a purge sequence exceeding the + % configured threshold. + if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true -> + Fmt = "The purge sequence for '~s' exceeds configured threshold", + couch_log:warning(Fmt, [couch_db:name(Db)]) + end, + FinalSeq. + + +purge_client_exists(DbName, DocId, Props) -> + % Warn about clients that have not updated their purge + % checkpoints in the last "index_lag_warn_seconds" + LagWindow = config:get_integer( + "purge", "index_lag_warn_seconds", 86400), % Default 24 hours + + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + LagThreshold = NowSecs - LagWindow, + + try + CheckFun = get_purge_client_fun(DocId, Props), + Exists = CheckFun(DbName, DocId, Props), + if not Exists -> ok; true -> + Updated = couch_util:get_value(<<"updated_on">>, Props), + if is_integer(Updated) and Updated > LagThreshold -> ok; true -> + Diff = NowSecs - Updated, + Fmt = "Purge checkpint '~s' not updated in ~p seconds", + couch_log:error(Fmt, [DocId, Diff]) + end + end, + Exists + catch _:_ -> + % If we fail to check for a client we have to assume that + % it exists. + true + end. + + +get_purge_client_fun(DocId, Props) -> + M0 = couch_util:get_value(<<"verify_module">>, Props), + M = try + binary_to_existing_atom(M0, latin1) + catch error:badarg -> + Fmt1 = "Missing index module '~s' for purge checkpoint '~s'", + couch_log:error(Fmt1, [M0, DocId]), + throw(failed) + end, + + F0 = couch_util:get_value(<<"verify_function">>, Props), + try + F = binary_to_existing_atom(F0, latin1), + fun M:F/2 + catch error:badarg -> + Fmt2 = "Missing function '~s' in '~s' for purge checkpoint '~s'", + couch_log:error(Fmt2, [F0, M0, DocId]), + throw(failed) + end. + + +set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> + check_is_admin(Db), + gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity); +set_purge_infos_limit(_Db, _Limit) -> + throw(invalid_purge_infos_limit). + get_after_doc_read_fun(#db{after_doc_read = Fun}) -> Fun. @@ -392,8 +524,11 @@ get_user_ctx(?OLD_DB_REC = Db) -> get_purge_seq(#db{}=Db) -> {ok, couch_db_engine:get_purge_seq(Db)}. -get_last_purged(#db{}=Db) -> - {ok, couch_db_engine:get_last_purged(Db)}. +get_oldest_purge_seq(#db{}=Db) -> + {ok, couch_db_engine:get_oldest_purge_seq(Db)}. + +get_purge_infos_limit(#db{}=Db) -> + couch_db_engine:get_purge_infos_limit(Db). get_pid(#db{main_pid = Pid}) -> Pid. @@ -1406,6 +1541,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts). +fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) -> + fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []). + + +fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) -> + couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts). + + count_changes_since(Db, SinceSeq) -> couch_db_engine:count_changes_since(Db, SinceSeq). diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 6a30d65..315f737 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -94,79 +94,41 @@ handle_call({set_revs_limit, Limit}, _From, Db) -> ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), {reply, ok, Db3, idle_limit()}; -handle_call({purge_docs, _IdRevs}, _From, - #db{compactor_pid=Pid}=Db) when Pid /= nil -> - {reply, {error, purge_during_compaction}, Db, idle_limit()}; -handle_call({purge_docs, IdRevs}, _From, Db) -> - DocIds = [Id || {Id, _Revs} <- IdRevs], - OldDocInfos = couch_db_engine:open_docs(Db, DocIds), - - NewDocInfos = lists:flatmap(fun - ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) -> - case couch_key_tree:remove_leafs(Tree, Revs) of - {_, [] = _RemovedRevs} -> % no change - []; - {NewTree, RemovedRevs} -> - NewFDI = FDI#full_doc_info{rev_tree = NewTree}, - [{FDI, NewFDI, RemovedRevs}] - end; - ({_, not_found}) -> - [] - end, lists:zip(IdRevs, OldDocInfos)), - - InitUpdateSeq = couch_db_engine:get_update_seq(Db), - InitAcc = {InitUpdateSeq, [], []}, - FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) -> - #full_doc_info{ - id = Id, - rev_tree = OldTree - } = OldFDI, - {SeqAcc0, FDIAcc, IdRevsAcc} = Acc, - - {NewFDIAcc, NewSeqAcc} = case OldTree of - [] -> - % If we purged every #leaf{} in the doc record - % then we're removing it completely from the - % database. - {FDIAcc, SeqAcc0}; - _ -> - % Its possible to purge the #leaf{} that contains - % the update_seq where this doc sits in the update_seq - % sequence. Rather than do a bunch of complicated checks - % we just re-label every #leaf{} and reinsert it into - % the update_seq sequence. - {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun - (_RevId, Leaf, leaf, InnerSeqAcc) -> - {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1}; - (_RevId, Value, _Type, InnerSeqAcc) -> - {Value, InnerSeqAcc} - end, SeqAcc0, OldTree), - - NewFDI = OldFDI#full_doc_info{ - update_seq = SeqAcc1, - rev_tree = NewTree - }, - - {[NewFDI | FDIAcc], SeqAcc1} - end, - NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc], - {NewSeqAcc, NewFDIAcc, NewIdRevsAcc} - end, InitAcc, NewDocInfos), - - {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc, +handle_call({set_purge_infos_limit, Limit}, _From, Db) -> + {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2}; + +handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> + % Filter out any previously applied updates during + % internal replication + IsRepl = lists:member(replicated_changes, Options), + PurgeReqs = if not IsRepl -> PurgeReqs0; true -> + UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0], + {ok, PurgeInfos} = couch_db:load_purge_infos(Db, UUIDs), + lists:flatmap(fun + ({not_found, PReq}) -> [PReq]; + ({{_, _, _, _}, _}) -> [] + end, lists:zip(PurgeInfos, PurgeReqs0)) + end, - % We need to only use the list of #full_doc_info{} records - % that we have actually changed due to a purge. - PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos], - Pairs = pair_purge_info(PreviousFDIs, FDIs), + Ids = lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs), + DocInfos = couch_db_engine:open_docs(Db, Ids), + UpdateSeq = couch_db_engine:get_update_seq(Db), + PurgeSeq = couch_db_engine:get_purge_seq(Db), - {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs), - Db3 = commit_data(Db2), - ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), - couch_event:notify(Db#db.name, updated), + InitAcc = {[], [], []}, + {Pairs, PInfos, Replies} = purge_docs( + PurgeReqs, DocInfos, UpdateSeq, PurgeSeq, InitAcc), - PurgeSeq = couch_db_engine:get_purge_seq(Db3), - {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()}; + Db3 = if Pairs == [] -> Db; true -> + {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos), + Db2 = commit_data(Db1), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + couch_event:notify(Db2#db.name, updated), + Db2 + end, + {reply, {ok, Replies}, Db3, idle_limit()}; handle_call(Msg, From, Db) -> case couch_db_engine:handle_db_updater_call(Msg, From, Db) of @@ -646,7 +608,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> Pairs = pair_write_info(OldDocLookups, IndexFDIs), LocalDocs2 = update_local_doc_revs(LocalDocs), - {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []), + {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2), WriteCount = length(IndexFDIs), couch_stats:increment_counter([couchdb, document_inserts], @@ -692,6 +654,57 @@ update_local_doc_revs(Docs) -> end, Docs). +purge_docs([], [], _USeq, _PSeq, {Pairs, PInfos, Replies}) -> + {lists:reverse(Pairs), lists:reverse(PInfos), lists:reverse(Replies)}; + +purge_docs([Req | RestReqs], [FDI | RestInfos], USeq, PSeq, Acc) -> + {UUID, DocId, Revs} = Req, + {Pair, RemovedRevs, NewUSeq} = case FDI of + #full_doc_info{rev_tree = Tree} -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []} -> + % No change + {no_change, [], USeq}; + {[], Removed} -> + % Completely purged + {{FDI, not_found}, Removed, USeq}; + {NewTree, Removed} -> + % Its possible to purge the #leaf{} that contains + % the update_seq where this doc sits in the + % update_seq sequence. Rather than do a bunch of + % complicated checks we just re-label every #leaf{} + % and reinsert it into the update_seq sequence. + {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun + (_RevId, Leaf, leaf, SeqAcc) -> + {Leaf#leaf{seq = SeqAcc + 1}, + SeqAcc + 1}; + (_RevId, Value, _Type, SeqAcc) -> + {Value, SeqAcc} + end, USeq, NewTree), + + NewFDI = FDI#full_doc_info{ + update_seq = NewUpdateSeq, + rev_tree = NewTree2 + }, + {{FDI, NewFDI}, Removed, NewUpdateSeq} + end; + not_found -> + % Not found means nothing to change + {no_change, [], USeq} + end, + {Pairs, PInfos, Replies} = Acc, + NewPairs = case Pair of + no_change -> Pairs; + _ -> [Pair | Pairs] + end, + NewAcc = { + NewPairs, + [{PSeq + 1, UUID, DocId, Revs} | PInfos], + [{ok, RemovedRevs} | Replies] + }, + purge_docs(RestReqs, RestInfos, NewUSeq, PSeq + 1, NewAcc). + + commit_data(Db) -> commit_data(Db, false). @@ -721,15 +734,6 @@ pair_write_info(Old, New) -> end, New). -pair_purge_info(Old, New) -> - lists:map(fun(OldFDI) -> - case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of - #full_doc_info{} = NewFDI -> {OldFDI, NewFDI}; - false -> {OldFDI, not_found} - end - end, Old). - - get_meta_body_size(Meta) -> {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta), ExternalSize. diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 99b1192..27f7d70 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> + couch_stats:increment_counter([couchdb, httpd, purge_requests]), couch_httpd:validate_ctype(Req, "application/json"), - {IdsRevs} = couch_httpd:json_body_obj(Req), - IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], + {IdRevs} = couch_httpd:json_body_obj(Req), + PurgeReqs = lists:map(fun({Id, JsonRevs}) -> + {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)} + end, IdRevs), - case couch_db:purge_docs(Db, IdsRevs2) of - {ok, PurgeSeq, PurgedIdsRevs} -> - PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], - send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]}); - Error -> - throw(Error) - end; + {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs), + + Results = lists:zipwith(fun({{Id, _}, Reply}) -> + {Id, couch_doc:revs_to_strs(Reply)} + end, IdRevs, Replies), + + {ok, Db2} = couch_db:reopen(Db), + {ok, PurgeSeq} = couch_db:get_purge_seq(Db2), + send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]}); db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); -- To stop receiving notification emails like this one, please contact dav...@apache.org.