This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d16071de595270b401100a2c395148896c9459b7
Author: Paul J. Davis <paul.joseph.da...@gmail.com>
AuthorDate: Tue Apr 24 12:24:10 2018 -0500

    Implement new node local purge APIs
    
    Rewrite purge logic to use the new couch_db_engine purge APIs. This work
    will allow for the new purge behaviors to enable clustered purge.
---
 src/couch/priv/stats_descriptions.cfg |  12 +++
 src/couch/src/couch_db.erl            | 153 +++++++++++++++++++++++++++++--
 src/couch/src/couch_db_updater.erl    | 164 +++++++++++++++++-----------------
 src/couch/src/couch_httpd_db.erl      |  23 +++--
 4 files changed, 258 insertions(+), 94 deletions(-)

diff --git a/src/couch/priv/stats_descriptions.cfg 
b/src/couch/priv/stats_descriptions.cfg
index f091978..bceb0ce 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -34,6 +34,10 @@
     {type, counter},
     {desc, <<"number of times a document was read from a database">>}
 ]}.
+{[couchdb, database_purges], [
+    {type, counter},
+    {desc, <<"number of times a database was purged">>}
+]}.
 {[couchdb, db_open_time], [
     {type, histogram},
     {desc, <<"milliseconds required to open a database">>}
@@ -46,6 +50,10 @@
     {type, counter},
     {desc, <<"number of document write operations">>}
 ]}.
+{[couchdb, document_purges], [
+    {type, counter},
+    {desc, <<"number of document purge operations">>}
+]}.
 {[couchdb, local_document_writes], [
     {type, counter},
     {desc, <<"number of _local document write operations">>}
@@ -74,6 +82,10 @@
     {type, counter},
     {desc, <<"number of clients for continuous _changes">>}
 ]}.
+{[couchdb, httpd, purge_requests], [
+    {type, counter},
+    {desc, <<"number of purge requests">>}
+]}.
 {[couchdb, httpd_request_methods, 'COPY'], [
     {type, counter},
     {desc, <<"number of HTTP COPY requests">>}
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 3449274..8592193 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -43,7 +43,6 @@
     get_epochs/1,
     get_filepath/1,
     get_instance_start_time/1,
-    get_last_purged/1,
     get_pid/1,
     get_revs_limit/1,
     get_security/1,
@@ -51,12 +50,15 @@
     get_user_ctx/1,
     get_uuid/1,
     get_purge_seq/1,
+    get_oldest_purge_seq/1,
+    get_purge_infos_limit/1,
 
     is_db/1,
     is_system_db/1,
     is_clustered/1,
 
     set_revs_limit/2,
+    set_purge_infos_limit/2,
     set_security/2,
     set_user_ctx/2,
 
@@ -75,6 +77,9 @@
     get_full_doc_infos/2,
     get_missing_revs/2,
     get_design_docs/1,
+    get_purge_infos/2,
+
+    get_minimum_purge_seq/1,
 
     update_doc/3,
     update_doc/4,
@@ -84,6 +89,7 @@
     delete_doc/3,
 
     purge_docs/2,
+    purge_docs/3,
 
     with_stream/3,
     open_write_stream/2,
@@ -97,6 +103,8 @@
     fold_changes/4,
     fold_changes/5,
     count_changes_since/2,
+    fold_purge_infos/4,
+    fold_purge_infos/5,
 
     calculate_start_seq/3,
     owner_of/2,
@@ -369,8 +377,132 @@ get_full_doc_info(Db, Id) ->
 get_full_doc_infos(Db, Ids) ->
     couch_db_engine:open_docs(Db, Ids).
 
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
-    gen_server:call(Pid, {purge_docs, IdsRevs}).
+purge_docs(Db, IdRevs) ->
+    purge_docs(Db, IdRevs, []).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
+    {ok, [Reply]} when
+    UUId :: binary(),
+    Id :: binary(),
+    Rev :: {non_neg_integer(), binary()},
+    PurgeOption :: interactive_edit | replicated_changes,
+    Reply :: {ok, []} | {ok, [Rev]}.
+purge_docs(#db{main_pid = Pid} = Db, UUIdsIdsRevs, Options) ->
+    increment_stat(Db, [couchdb, database_purges]),
+    gen_server:call(Pid, {purge_docs, UUIdsIdsRevs, Options}).
+
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+    UUId :: binary(),
+    PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
+    PurgeSeq :: non_neg_integer(),
+    Id :: binary(),
+    Rev :: {non_neg_integer(), binary()}.
+get_purge_infos(Db, UUIDs) ->
+    couch_db_engine:load_purge_infos(Db, UUIDs).
+
+
+get_minimum_purge_seq(#db{} = Db) ->
+    PurgeSeq = couch_db_engine:get_purge_seq(Db),
+    OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
+    PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),
+
+    FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+        case DocId of
+            <<"_local/purge-", _/binary>> ->
+                ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+                case ClientSeq of
+                    CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+                        {ok, SeqAcc};
+                    CS when is_integer(CS) ->
+                        case purge_client_exists(Db, DocId, Props) of
+                            true -> {ok, erlang:min(CS, SeqAcc)};
+                            false -> {ok, SeqAcc}
+                        end;
+                    _ ->
+                        % If there's a broken doc we have to keep every
+                        % purge info until the doc is fixed or removed.
+                        Fmt = "Invalid purge doc '~s' with purge_seq '~w'",
+                        couch_log:error(Fmt, [DocId, ClientSeq]),
+                        {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+                end;
+            _ ->
+                {stop, SeqAcc}
+        end
+    end,
+    InitMinSeq = PurgeSeq - PurgeInfosLimit,
+    Opts = [
+        {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
+    ],
+    {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+    FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+        true -> MinIdxSeq;
+        false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+    end,
+    % Log a warning if we've got a purge sequence exceeding the
+    % configured threshold.
+    if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true ->
+        Fmt = "The purge sequence for '~s' exceeds configured threshold",
+        couch_log:warning(Fmt, [couch_db:name(Db)])
+    end,
+    FinalSeq.
+
+
+purge_client_exists(DbName, DocId, Props) ->
+    % Warn about clients that have not updated their purge
+    % checkpoints in the last "index_lag_warn_seconds"
+    LagWindow = config:get_integer(
+            "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    LagThreshold = NowSecs - LagWindow,
+
+    try
+        CheckFun = get_purge_client_fun(DocId, Props),
+        Exists = CheckFun(DbName, DocId, Props),
+        if not Exists -> ok; true ->
+            Updated = couch_util:get_value(<<"updated_on">>, Props),
+            if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+                Diff = NowSecs - Updated,
+                Fmt = "Purge checkpint '~s' not updated in ~p seconds",
+                couch_log:error(Fmt, [DocId, Diff])
+            end
+        end,
+        Exists
+    catch _:_ ->
+        % If we fail to check for a client we have to assume that
+        % it exists.
+        true
+    end.
+
+
+get_purge_client_fun(DocId, Props) ->
+    M0 = couch_util:get_value(<<"verify_module">>, Props),
+    M = try
+        binary_to_existing_atom(M0, latin1)
+    catch error:badarg ->
+        Fmt1 = "Missing index module '~s' for purge checkpoint '~s'",
+        couch_log:error(Fmt1, [M0, DocId]),
+        throw(failed)
+    end,
+
+    F0 = couch_util:get_value(<<"verify_function">>, Props),
+    try
+        F = binary_to_existing_atom(F0, latin1),
+        fun M:F/2
+    catch error:badarg ->
+        Fmt2 = "Missing function '~s' in '~s' for purge checkpoint '~s'",
+        couch_log:error(Fmt2, [F0, M0, DocId]),
+        throw(failed)
+    end.
+
+
+set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
+set_purge_infos_limit(_Db, _Limit) ->
+    throw(invalid_purge_infos_limit).
+
 
 get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
     Fun.
@@ -392,8 +524,11 @@ get_user_ctx(?OLD_DB_REC = Db) ->
 get_purge_seq(#db{}=Db) ->
     {ok, couch_db_engine:get_purge_seq(Db)}.
 
-get_last_purged(#db{}=Db) ->
-    {ok, couch_db_engine:get_last_purged(Db)}.
+get_oldest_purge_seq(#db{}=Db) ->
+    {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
+
+get_purge_infos_limit(#db{}=Db) ->
+    couch_db_engine:get_purge_infos_limit(Db).
 
 get_pid(#db{main_pid = Pid}) ->
     Pid.
@@ -1406,6 +1541,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
     couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
 
 
+fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
+    fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+    couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_updater.erl 
b/src/couch/src/couch_db_updater.erl
index 6a30d65..315f737 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -94,79 +94,41 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
     ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
     {reply, ok, Db3, idle_limit()};
 
-handle_call({purge_docs, _IdRevs}, _From,
-        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
-    {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
-    DocIds = [Id || {Id, _Revs} <- IdRevs],
-    OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
-    NewDocInfos = lists:flatmap(fun
-        ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
-            case couch_key_tree:remove_leafs(Tree, Revs) of
-                {_, [] = _RemovedRevs} -> % no change
-                    [];
-                {NewTree, RemovedRevs} ->
-                    NewFDI = FDI#full_doc_info{rev_tree = NewTree},
-                    [{FDI, NewFDI, RemovedRevs}]
-            end;
-        ({_, not_found}) ->
-            []
-    end, lists:zip(IdRevs, OldDocInfos)),
-
-    InitUpdateSeq = couch_db_engine:get_update_seq(Db),
-    InitAcc = {InitUpdateSeq, [], []},
-    FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
-        #full_doc_info{
-            id = Id,
-            rev_tree = OldTree
-        } = OldFDI,
-        {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
-        {NewFDIAcc, NewSeqAcc} = case OldTree of
-            [] ->
-                % If we purged every #leaf{} in the doc record
-                % then we're removing it completely from the
-                % database.
-                {FDIAcc, SeqAcc0};
-            _ ->
-                % Its possible to purge the #leaf{} that contains
-                % the update_seq where this doc sits in the update_seq
-                % sequence. Rather than do a bunch of complicated checks
-                % we just re-label every #leaf{} and reinsert it into
-                % the update_seq sequence.
-                {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
-                    (_RevId, Leaf, leaf, InnerSeqAcc) ->
-                        {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
-                    (_RevId, Value, _Type, InnerSeqAcc) ->
-                        {Value, InnerSeqAcc}
-                end, SeqAcc0, OldTree),
-
-                NewFDI = OldFDI#full_doc_info{
-                    update_seq = SeqAcc1,
-                    rev_tree = NewTree
-                },
-
-                {[NewFDI | FDIAcc], SeqAcc1}
-        end,
-        NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
-        {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
-    end, InitAcc, NewDocInfos),
-
-    {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
+    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    {reply, ok, Db2};
+
+handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
+    % Filter out any previously applied updates during
+    % internal replication
+    IsRepl = lists:member(replicated_changes, Options),
+    PurgeReqs = if not IsRepl -> PurgeReqs0; true ->
+        UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+        {ok, PurgeInfos} = couch_db:load_purge_infos(Db, UUIDs),
+        lists:flatmap(fun
+            ({not_found, PReq}) -> [PReq];
+            ({{_, _, _, _}, _}) -> []
+        end, lists:zip(PurgeInfos, PurgeReqs0))
+    end,
 
-    % We need to only use the list of #full_doc_info{} records
-    % that we have actually changed due to a purge.
-    PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
-    Pairs = pair_purge_info(PreviousFDIs, FDIs),
+    Ids = lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs),
+    DocInfos = couch_db_engine:open_docs(Db, Ids),
+    UpdateSeq = couch_db_engine:get_update_seq(Db),
+    PurgeSeq = couch_db_engine:get_purge_seq(Db),
 
-    {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-    Db3 = commit_data(Db2),
-    ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
-    couch_event:notify(Db#db.name, updated),
+    InitAcc = {[], [], []},
+    {Pairs, PInfos, Replies} = purge_docs(
+            PurgeReqs, DocInfos, UpdateSeq, PurgeSeq, InitAcc),
 
-    PurgeSeq = couch_db_engine:get_purge_seq(Db3),
-    {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
+    Db3 = if Pairs == [] -> Db; true ->
+        {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+        Db2 = commit_data(Db1),
+        ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+        couch_event:notify(Db2#db.name, updated),
+        Db2
+    end,
+    {reply, {ok, Replies}, Db3, idle_limit()};
 
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -646,7 +608,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, 
FullCommit) ->
     Pairs = pair_write_info(OldDocLookups, IndexFDIs),
     LocalDocs2 = update_local_doc_revs(LocalDocs),
 
-    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
 
     WriteCount = length(IndexFDIs),
     couch_stats:increment_counter([couchdb, document_inserts],
@@ -692,6 +654,57 @@ update_local_doc_revs(Docs) ->
     end, Docs).
 
 
+purge_docs([], [], _USeq, _PSeq, {Pairs, PInfos, Replies}) ->
+    {lists:reverse(Pairs), lists:reverse(PInfos), lists:reverse(Replies)};
+
+purge_docs([Req | RestReqs], [FDI | RestInfos], USeq, PSeq, Acc) ->
+    {UUID, DocId, Revs} = Req,
+    {Pair, RemovedRevs, NewUSeq} = case FDI of
+        #full_doc_info{rev_tree = Tree} ->
+            case couch_key_tree:remove_leafs(Tree, Revs) of
+                {_, []} ->
+                    % No change
+                    {no_change, [], USeq};
+                {[], Removed} ->
+                    % Completely purged
+                    {{FDI, not_found}, Removed, USeq};
+                {NewTree, Removed} ->
+                    % Its possible to purge the #leaf{} that contains
+                    % the update_seq where this doc sits in the
+                    % update_seq sequence. Rather than do a bunch of
+                    % complicated checks we just re-label every #leaf{}
+                    % and reinsert it into the update_seq sequence.
+                    {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+                        (_RevId, Leaf, leaf, SeqAcc) ->
+                            {Leaf#leaf{seq = SeqAcc + 1},
+                                SeqAcc + 1};
+                        (_RevId, Value, _Type, SeqAcc) ->
+                            {Value, SeqAcc}
+                    end, USeq, NewTree),
+
+                    NewFDI = FDI#full_doc_info{
+                        update_seq = NewUpdateSeq,
+                        rev_tree = NewTree2
+                    },
+                    {{FDI, NewFDI}, Removed, NewUpdateSeq}
+            end;
+        not_found ->
+            % Not found means nothing to change
+            {no_change, [], USeq}
+    end,
+    {Pairs, PInfos, Replies} = Acc,
+    NewPairs = case Pair of
+        no_change -> Pairs;
+        _ -> [Pair | Pairs]
+    end,
+    NewAcc = {
+        NewPairs,
+        [{PSeq + 1, UUID, DocId, Revs} | PInfos],
+        [{ok, RemovedRevs} | Replies]
+    },
+    purge_docs(RestReqs, RestInfos, NewUSeq, PSeq + 1, NewAcc).
+
+
 commit_data(Db) ->
     commit_data(Db, false).
 
@@ -721,15 +734,6 @@ pair_write_info(Old, New) ->
     end, New).
 
 
-pair_purge_info(Old, New) ->
-    lists:map(fun(OldFDI) ->
-        case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
-            #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
-            false -> {OldFDI, not_found}
-        end
-    end, Old).
-
-
 get_meta_body_size(Meta) ->
     {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
     ExternalSize.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 99b1192..27f7d70 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) 
->
     send_method_not_allowed(Req, "POST");
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+    couch_stats:increment_counter([couchdb, httpd, purge_requests]),
     couch_httpd:validate_ctype(Req, "application/json"),
-    {IdsRevs} = couch_httpd:json_body_obj(Req),
-    IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
+    {IdRevs} = couch_httpd:json_body_obj(Req),
+    PurgeReqs = lists:map(fun({Id, JsonRevs}) ->
+        {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)}
+    end, IdRevs),
 
-    case couch_db:purge_docs(Db, IdsRevs2) of
-    {ok, PurgeSeq, PurgedIdsRevs} ->
-        PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- 
PurgedIdsRevs],
-        send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, 
{PurgedIdsRevs2}}]});
-    Error ->
-        throw(Error)
-    end;
+    {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs),
+
+    Results = lists:zipwith(fun({{Id, _}, Reply}) ->
+        {Id, couch_doc:revs_to_strs(Reply)}
+    end, IdRevs, Replies),
+
+    {ok, Db2} = couch_db:reopen(Db),
+    {ok, PurgeSeq} = couch_db:get_purge_seq(Db2),
+    send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]});
 
 db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");

-- 
To stop receiving notification emails like this one, please contact
dav...@apache.org.

Reply via email to