This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-purge
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6f9a7d447f295e58a6d2f87fa41b66bbe3dc3861
Author: Robert Newson <rnew...@apache.org>
AuthorDate: Fri May 19 16:42:17 2023 +0100

    support clustered purge in nouveau
---
 src/dreyfus/src/dreyfus_plugin_couch_db.erl        |   4 +-
 src/nouveau/src/nouveau_api.erl                    |  13 ++-
 src/nouveau/src/nouveau_epi.erl                    |   5 +-
 src/nouveau/src/nouveau_index_updater.erl          | 119 ++++++++++++++++-----
 .../src/nouveau_plugin_couch_db.erl}               |   6 +-
 src/nouveau/src/nouveau_util.erl                   | 100 +++++++++++++++++
 6 files changed, 213 insertions(+), 34 deletions(-)

diff --git a/src/dreyfus/src/dreyfus_plugin_couch_db.erl 
b/src/dreyfus/src/dreyfus_plugin_couch_db.erl
index a55c26373..6b6bbd438 100644
--- a/src/dreyfus/src/dreyfus_plugin_couch_db.erl
+++ b/src/dreyfus/src/dreyfus_plugin_couch_db.erl
@@ -18,7 +18,7 @@
 ]).
 
 is_valid_purge_client(DbName, Props) ->
-    dreyfus_util:verify_index_exists(DbName, Props).
+    nouveau_util:verify_index_exists(DbName, Props).
 
 on_compact(DbName, DDocs) ->
-    dreyfus_util:ensure_local_purge_docs(DbName, DDocs).
+    nouveau_util:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 7744eb161..c78fb37be 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -24,6 +24,7 @@
     delete_path/1,
     delete_path/2,
     delete_doc/3,
+    purge_doc/3,
     update_doc/5,
     search/2
 ]).
@@ -97,7 +98,17 @@ delete_path(Path, Exclusions) when
 delete_doc(#index{} = Index, DocId, UpdateSeq) when
     is_binary(DocId), is_integer(UpdateSeq)
 ->
-    ReqBody = {[{<<"seq">>, UpdateSeq}]},
+    delete_doc(Index, DocId, UpdateSeq, false).
+
+purge_doc(#index{} = Index, DocId, PurgeSeq) when
+    is_binary(DocId), is_integer(PurgeSeq)
+->
+    delete_doc(Index, DocId, PurgeSeq, true).
+
+delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
+    is_binary(DocId), is_integer(Seq), is_boolean(IsPurge)
+->
+    ReqBody = #{seq => Seq, purge => IsPurge},
     Resp = send_if_enabled(
         doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody)
     ),
diff --git a/src/nouveau/src/nouveau_epi.erl b/src/nouveau/src/nouveau_epi.erl
index f42e17970..9f34ae579 100644
--- a/src/nouveau/src/nouveau_epi.erl
+++ b/src/nouveau/src/nouveau_epi.erl
@@ -31,7 +31,10 @@ app() ->
     nouveau.
 
 providers() ->
-    [{chttpd_handlers, nouveau_httpd_handlers}].
+    [
+        {couch_db, nouveau_plugin_couch_db},
+        {chttpd_handlers, nouveau_httpd_handlers}
+    ].
 
 services() ->
     [].
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index a78b1ff15..6f3cb0d08 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -28,9 +28,10 @@
 
 outdated(#index{} = Index) ->
     case open_or_create_index(Index) of
-        {ok, IndexSeq} ->
-            DbSeq = get_db_seq(Index),
-            DbSeq > IndexSeq;
+        {ok, #{} = Info} ->
+            #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := 
IndexPurgeSeq} = Info,
+            {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index),
+            DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq;
         {error, Reason} ->
             {error, Reason}
     end.
@@ -38,11 +39,14 @@ outdated(#index{} = Index) ->
 update(#index{} = Index) ->
     {ok, Db} = couch_db:open_int(Index#index.dbname, []),
     try
-        case open_or_create_index(Index) of
+        case open_or_create_index(Db, Index) of
             {error, Reason} ->
                 exit({error, Reason});
-            {ok, CurSeq} ->
-                TotalChanges = couch_db:count_changes_since(Db, CurSeq),
+            {ok, #{} = Info} ->
+                #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := 
IndexPurgeSeq} = Info,
+                ChangesSince = couch_db:count_changes_since(Db, 
IndexUpdateSeq),
+                PurgesSince = couch_db:get_purge_seq(Db) - IndexPurgeSeq,
+                TotalChanges = ChangesSince + PurgesSince,
                 couch_task_status:add_task([
                     {type, search_indexer},
                     {database, Index#index.dbname},
@@ -56,11 +60,13 @@ update(#index{} = Index) ->
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
+                {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq),
+
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
-                    Acc0 = {Db, Index, Proc, 0, TotalChanges},
-                    {ok, _} = couch_db:fold_changes(Db, CurSeq, fun 
load_docs/2, Acc0, [])
+                    Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs},
+                    {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun 
load_docs/2, Acc0, [])
                 after
                     ret_os_process(Proc)
                 end
@@ -71,14 +77,20 @@ update(#index{} = Index) ->
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
     {ok, Acc};
-load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) ->
+load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) ->
     couch_task_status:update([
         {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div 
TotalChanges}
     ]),
-
     DI = couch_doc:to_doc_info(FDI),
-    #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
+    #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
+    case lists:member({Id, Rev}, ExcludeIdRevs) of
+        true -> ok;
+        false -> update_or_delete_index(Db, Index, DI, Proc)
+    end,
+    {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}.
 
+update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) ->
+    #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
             ok = nouveau_api:delete_doc(Index, Id, Seq);
@@ -104,17 +116,16 @@ load_docs(FDI, {Db, Index, Proc, ChangesDone, 
TotalChanges}) ->
                             exit({error, Reason})
                     end
             end
-    end,
-    {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}.
+    end.
 
 open_or_create_index(#index{} = Index) ->
-    case get_index_seq(Index) of
-        {ok, UpdateSeq} ->
-            {ok, UpdateSeq};
+    case nouveau_api:index_info(Index) of
+        {ok, #{} = Info} ->
+            {ok, Info};
         {error, {not_found, _}} ->
             case nouveau_api:create_index(Index, index_definition(Index)) of
                 ok ->
-                    {ok, 0};
+                    {ok, #{}};
                 {error, Reason} ->
                     {error, Reason}
             end;
@@ -122,24 +133,78 @@ open_or_create_index(#index{} = Index) ->
             {error, Reason}
     end.
 
-get_db_seq(#index{} = Index) ->
+open_or_create_index(Db, #index{} = Index) ->
+    case nouveau_api:index_info(Index) of
+        {error, {not_found, _}} ->
+            ok = nouveau_util:maybe_create_local_purge_doc(Db, Index),
+            open_or_create_index(Index);
+        Else ->
+            Else
+    end.
+
+get_db_info(#index{} = Index) ->
     {ok, Db} = couch_db:open_int(Index#index.dbname, []),
     try
-        couch_db:get_update_seq(Db)
+        UpdateSeq = couch_db:get_update_seq(Db),
+        PurgeSeq = couch_db:get_purge_seq(Db),
+        {UpdateSeq, PurgeSeq}
     after
         couch_db:close(Db)
     end.
 
-get_index_seq(#index{} = Index) ->
-    case nouveau_api:index_info(Index) of
-        {ok, #{<<"update_seq">> := Seq}} ->
-            {ok, Seq};
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
 index_definition(#index{} = Index) ->
     #{
         <<"default_analyzer">> => Index#index.default_analyzer,
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
+
+purge_index(Db, Index, IndexPurgeSeq) ->
+    Proc = get_os_process(Index#index.def_lang),
+    try
+        true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
+        FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
+            Acc0 =
+                case couch_db:get_full_doc_info(Db, Id) of
+                    not_found ->
+                        ok = nouveau_api:purge_doc(Index, Id, PurgeSeq),
+                        Acc;
+                    FDI ->
+                        DI = couch_doc:to_doc_info(FDI),
+                        #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} 
= DI,
+                        case lists:member({Id, Rev}, Acc) of
+                            true ->
+                                Acc;
+                            false ->
+                                update_or_delete_index(Db, Index, DI, Proc),
+                                [{Id, Rev} | Acc]
+                        end
+                end,
+            update_task(1),
+            {ok, {Acc0, PurgeSeq}}
+        end,
+
+        {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
+            Db, IndexPurgeSeq, FoldFun, {[], 0}, []
+        ),
+        update_local_doc(Db, Index, NewPurgeSeq),
+        {ok, ExcludeList}
+    after
+        ret_os_process(Proc)
+    end.
+
+update_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress =
+        case Total of
+            0 ->
+                0;
+            _ ->
+                (Changes2 * 100) div Total
+        end,
+    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
+
+update_local_doc(Db, #index{} = Index, PurgeSeq) ->
+    DocId = nouveau_util:get_local_purge_doc_id(Index#index.sig),
+    DocContent = nouveau_util:get_local_purge_doc_body(DocId, PurgeSeq, Index),
+    couch_db:update_doc(Db, DocContent, []).
diff --git a/src/dreyfus/src/dreyfus_plugin_couch_db.erl 
b/src/nouveau/src/nouveau_plugin_couch_db.erl
similarity index 82%
copy from src/dreyfus/src/dreyfus_plugin_couch_db.erl
copy to src/nouveau/src/nouveau_plugin_couch_db.erl
index a55c26373..dcd3ae1f1 100644
--- a/src/dreyfus/src/dreyfus_plugin_couch_db.erl
+++ b/src/nouveau/src/nouveau_plugin_couch_db.erl
@@ -10,7 +10,7 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(dreyfus_plugin_couch_db).
+-module(nouveau_plugin_couch_db).
 
 -export([
     is_valid_purge_client/2,
@@ -18,7 +18,7 @@
 ]).
 
 is_valid_purge_client(DbName, Props) ->
-    dreyfus_util:verify_index_exists(DbName, Props).
+    nouveau_util:verify_index_exists(DbName, Props).
 
 on_compact(DbName, DDocs) ->
-    dreyfus_util:ensure_local_purge_docs(DbName, DDocs).
+    nouveau_util:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index b7da7e802..e7b4ad738 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -22,6 +22,10 @@
     index_name/1,
     design_doc_to_indexes/2,
     design_doc_to_index/3,
+    verify_index_exists/2,
+    ensure_local_purge_docs/2,
+    get_local_purge_doc_id/1,
+    get_local_purge_doc_body/3,
     nouveau_url/0
 ]).
 
@@ -93,5 +97,101 @@ design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, 
IndexName) ->
             {error, InvalidDDocError}
     end.
 
+verify_index_exists(DbName, Props) ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if
+            Type =/= <<"nouveau">> ->
+                false;
+            true ->
+                DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+                IndexName = couch_util:get_value(<<"indexname">>, Props),
+                Sig = couch_util:get_value(<<"signature">>, Props),
+                couch_util:with_db(DbName, fun(Db) ->
+                    case couch_db:get_design_doc(Db, DDocId) of
+                        {ok, #doc{} = DDoc} ->
+                            {ok, IdxState} = design_doc_to_index(
+                                DbName, DDoc, IndexName
+                            ),
+                            IdxState#index.sig == Sig;
+                        {not_found, _} ->
+                            false
+                    end
+                end)
+        end
+    catch
+        _:_ ->
+            false
+    end.
+
+ensure_local_purge_docs(DbName, DDocs) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        lists:foreach(
+            fun(DDoc) ->
+                #doc{body = {Props}} = DDoc,
+                case couch_util:get_value(<<"indexes">>, Props) of
+                    undefined ->
+                        false;
+                    _ ->
+                        try design_doc_to_indexes(DbName, DDoc) of
+                            SIndexes -> ensure_local_purge_doc(Db, SIndexes)
+                        catch
+                            _:_ ->
+                                ok
+                        end
+                end
+            end,
+            DDocs
+        )
+    end).
+
+ensure_local_purge_doc(Db, SIndexes) ->
+    if
+        SIndexes =/= [] ->
+            lists:map(
+                fun(SIndex) ->
+                    maybe_create_local_purge_doc(Db, SIndex)
+                end,
+                SIndexes
+            );
+        true ->
+            ok
+    end.
+
+maybe_create_local_purge_doc(Db, Index) ->
+    DocId = get_local_purge_doc_id(Index#index.sig),
+    case couch_db:open_doc(Db, DocId) of
+        {not_found, _} ->
+            DbPurgeSeq = couch_db:get_purge_seq(Db),
+            DocContent = get_local_purge_doc_body(
+                DocId, DbPurgeSeq, Index
+            ),
+            couch_db:update_doc(Db, DocContent, []);
+        _ ->
+            ok
+    end.
+
+get_local_purge_doc_id(Sig) ->
+    iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]).
+
+get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
+    #index{
+        name = IdxName,
+        ddoc_id = DDocId,
+        sig = Sig
+    } = Index,
+    NowSecs = os:system_time(second),
+    JsonList =
+        {[
+            {<<"_id">>, LocalDocId},
+            {<<"purge_seq">>, PurgeSeq},
+            {<<"updated_on">>, NowSecs},
+            {<<"indexname">>, IdxName},
+            {<<"ddoc_id">>, DDocId},
+            {<<"signature">>, Sig},
+            {<<"type">>, <<"nouveau">>}
+        ]},
+    couch_doc:from_json_obj(JsonList).
+
 nouveau_url() ->
     config:get("nouveau", "url", "http://127.0.0.1:8080";).

Reply via email to