This is an automated email from the ASF dual-hosted git repository.

nickva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new db961c080 Fix cluster index and process cleanup
db961c080 is described below

commit db961c0809cf8e201824ff0fbaa952f0a3fc72e2
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Apr 27 21:53:37 2026 -0400

    Fix cluster index and process cleanup
    
    Previously, as described in #5980 we didn't perform a thorough index cleanup
    when ddocs changed. We only cleaned up on nodes where the design docs were
    located. That was true for a n=3 db and an n=3, db but may not be true in
    general in a cluster.
    
    To fix the issue, run a small gen_server responsible performing cluster
    index cleanup. To avoid spawning Q*N jobs, deduplicate the requests by 
delaying
    for up to 30 seconds per clustered db. For cleanup reuse and call the 
already
    existing fabric index file cleanup machinery. That accomplishes two things:
    
     - Starts a quicker index file cleanup. Previously we only did this during
       smoosh compaction runs. The view files could linger for a while until
       compaction in smoosh would be triggered.
    
     - Cleaning search index files also stops indexes on their (Java) side, so
       index file clean-up does "double duty" so speak when it comes to index 
shut
       down.
    
    Fix https://github.com/apache/couchdb/issues/5980
---
 rel/overlay/etc/default.ini                        |   4 +
 src/couch/src/couch_secondary_sup.erl              |   3 +-
 src/couch_index/src/couch_index_cleanup.erl        |  98 ++++++++
 src/couch_index/src/couch_index_server.erl         |  68 ++----
 .../test/eunit/couch_index_ddoc_updated_tests.erl  | 178 ---------------
 src/couch_mrview/src/couch_mrview_cleanup.erl      |  40 +++-
 src/couch_mrview/src/couch_mrview_util.erl         |  12 +-
 .../test/eunit/couch_mrview_cleanup_tests.erl      | 252 +++++++++++++++++++++
 .../test/eunit/couch_mrview_util_tests.erl         |   6 +-
 9 files changed, 427 insertions(+), 234 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 17b5eee48..79026dea3 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -158,6 +158,10 @@ view_index_dir = {{view_index_dir}}
 ;
 ;time_seq_min_time = 1754006400
 
+; Clustered index cleanup deduplication hold-off. How long to wait before
+; running clean up per clustered db.
+;index_cleanup_delay_msec = 30000
+
 [bt_engine_cache]
 ; Memory used for btree engine cache. This is a cache for top levels of
 ; database btrees (id tree, seq tree) and a few terms from the db header. Value
diff --git a/src/couch/src/couch_secondary_sup.erl 
b/src/couch/src/couch_secondary_sup.erl
index 766235d5d..7ee9cab78 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -28,7 +28,8 @@ init([]) ->
             {query_servers, {couch_proc_manager, start_link, []}},
             {vhosts, {couch_httpd_vhost, start_link, []}},
             {uuids, {couch_uuids, start, []}},
-            {disk_manager, {couch_disk_monitor, start_link, []}}
+            {disk_manager, {couch_disk_monitor, start_link, []}},
+            {couch_index_cleanup, {couch_index_cleanup, start_link, []}}
         ] ++ couch_index_servers(),
 
     MaybeHttp =
diff --git a/src/couch_index/src/couch_index_cleanup.erl 
b/src/couch_index/src/couch_index_cleanup.erl
new file mode 100644
index 000000000..be37c5654
--- /dev/null
+++ b/src/couch_index/src/couch_index_cleanup.erl
@@ -0,0 +1,98 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_cleanup).
+-behaviour(gen_server).
+
+-export([
+    start_link/0,
+    schedule/1
+]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+-export([
+    handle_db_event/3
+]).
+
+-define(DEFAULT_DELAY_MSEC, 30000).
+
+-record(st, {
+    pending = #{} :: #{binary() => reference()}
+}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+schedule(DbName) when is_binary(DbName) ->
+    gen_server:cast(?MODULE, {schedule, DbName, fanout}).
+
+init([]) ->
+    {ok, _} = couch_event:link_listener(?MODULE, handle_db_event, nil, 
[all_dbs]),
+    {ok, #st{}}.
+
+handle_call(Msg, _From, #st{} = St) ->
+    {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+handle_cast({schedule, DbName, Mode}, #st{pending = Pending} = St) ->
+    case maps:is_key(DbName, Pending) of
+        true ->
+            {noreply, St};
+        false ->
+            case Mode of
+                fanout -> fanout(DbName);
+                no_fanout -> ok
+            end,
+            TRef = erlang:send_after(delay_msec(), self(), {run_cleanup, 
DbName}),
+            {noreply, St#st{pending = Pending#{DbName => TRef}}}
+    end;
+handle_cast(Msg, St) ->
+    {stop, {invalid_cast, Msg}, St}.
+
+handle_info({run_cleanup, DbName}, #st{pending = Pending} = St) ->
+    spawn(fun() ->
+        try
+            fabric:cleanup_index_files_this_node(DbName)
+        catch
+            Class:Reason:Stack ->
+                WArgs = [?MODULE, DbName, Class, Reason, Stack],
+                couch_log:warning("~p: cleanup ~s failed ~p:~p~n~p", WArgs)
+        end
+    end),
+    {noreply, St#st{pending = maps:remove(DbName, Pending)}};
+handle_info(Msg, St) ->
+    {stop, {invalid_info, Msg}, St}.
+
+handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, _DDocId}, St) 
->
+    % Clustered dbs only
+    schedule(mem3:dbname(DbName)),
+    {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+    {ok, St}.
+
+fanout(DbName) ->
+    try mem3:shards(DbName) of
+        Shards ->
+            Nodes = lists:usort([mem3:node(S) || S <- Shards]) -- [node()],
+            Args = {schedule, DbName, no_fanout},
+            lists:foreach(fun(N) -> gen_server:cast({?MODULE, N}, Args) end, 
Nodes)
+    catch
+        _:_ -> ok
+    end.
+
+delay_msec() ->
+    config:get_integer("couchdb", "index_cleanup_delay_msec", 
?DEFAULT_DELAY_MSEC).
diff --git a/src/couch_index/src/couch_index_server.erl 
b/src/couch_index/src/couch_index_server.erl
index d4593ee0d..82ceb1541 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -23,6 +23,9 @@
 -export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, 
openers/1]).
 -export([aggregate_queue_len/0, names/0]).
 
+% Cluster cleanup helpers (used by couch_mrview_cleanup)
+-export([shard_entries/1, shard_index_pid/2, forget_ddoc_binding/3]).
+
 % Exported for callbacks
 -export([
     handle_config_change/5,
@@ -358,51 +361,9 @@ handle_db_event(DbName, created, St) ->
 handle_db_event(DbName, deleted, St) ->
     gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
     {ok, St};
-handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) 
->
-    %% this handle_db_event function must not crash (or it takes down the 
couch_index_server)
-    try
-        DDocResult = couch_util:with_db(DbName, fun(Db) ->
-            couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
-        end),
-        LocalShards = mem3:local_shards(mem3:dbname(DbName)),
-        DbShards = [mem3:name(Sh) || Sh <- LocalShards],
-        lists:foreach(
-            fun(DbShard) ->
-                lists:foreach(
-                    fun({_DbShard, {_DDocId, Sig}}) ->
-                        % check if there are other ddocs with the same Sig for 
the same db
-                        SigDDocs = ets:match_object(St#st.by_db, {DbShard, 
{'$1', Sig}}),
-                        if
-                            length(SigDDocs) > 1 ->
-                                % remove records from by_db for this DDoc
-                                Args = [DbShard, DDocId, Sig],
-                                gen_server:cast(St#st.server_name, 
{rem_from_ets, Args});
-                            true ->
-                                % single DDoc with this Sig - close 
couch_index processes
-                                case ets:lookup(St#st.by_sig, {DbShard, Sig}) 
of
-                                    [{_, IndexPid}] ->
-                                        (catch gen_server:cast(
-                                            IndexPid, {ddoc_updated, 
DDocResult}
-                                        ));
-                                    [] ->
-                                        []
-                                end
-                        end
-                    end,
-                    ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
-                )
-            end,
-            DbShards
-        ),
-        {ok, St}
-    catch
-        Class:Reason:Stack ->
-            couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, 
stack ~p", [
-                ?MODULE, Class, DbName, Reason, Stack
-            ]),
-            gen_server:cast(St#st.server_name, {rem_from_ets, [DbName, 
Reason]}),
-            {ok, St}
-    end;
+handle_db_event(<<"shards/", _/binary>>, {ddoc_updated, _DDocId}, St) ->
+    %% Cluster dbs cleanup is handled by couch_index_cleanup
+    {ok, St};
 handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(
         fun({_DbName, {_DDocId, Sig}}) ->
@@ -437,6 +398,23 @@ by_db(Arg) ->
 openers(Arg) ->
     name("couchdb_indexes_openers", Arg).
 
+% Return {DDocId, Sig} entries for a  shard. Used by cluster cleanup
+shard_entries(ShardName) when is_binary(ShardName) ->
+    Rows = ets:match_object(by_db(ShardName), {ShardName, '_'}),
+    [Entry || {_ShardName, Entry} <- Rows].
+
+% Return indexer Pid for {ShardName, Sig} or not_found
+shard_index_pid(ShardName, Sig) when is_binary(ShardName) ->
+    case ets:lookup(by_sig(ShardName), {ShardName, Sig}) of
+        [{_, Pid}] when is_pid(Pid) -> {ok, Pid};
+        _ -> not_found
+    end.
+
+% Remove {ShardName, {DDocId, Sig}} row from by_db. The indexer process is left
+% as is. This is for removing one of the ddocs pointing to the same sig
+forget_ddoc_binding(ShardName, DDocId, Sig) when is_binary(ShardName) ->
+    gen_server:cast(server_name(ShardName), {rem_from_ets, [ShardName, DDocId, 
Sig]}).
+
 name(BaseName, Arg) when is_list(Arg) ->
     name(BaseName, ?l2b(Arg));
 name(BaseName, Arg) when is_binary(Arg) ->
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl 
b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
deleted file mode 100644
index 6b7fe5a4a..000000000
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ /dev/null
@@ -1,178 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_ddoc_updated_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-start() ->
-    fake_index(),
-    Ctx = test_util:start_couch([mem3, fabric]),
-    DbName = ?tempdb(),
-    ok = fabric:create_db(DbName, [?ADMIN_CTX]),
-    {Ctx, DbName}.
-
-stop({Ctx, DbName}) ->
-    meck:unload(test_index),
-    ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
-    DbDir = config:get("couchdb", "database_dir", "."),
-    WaitFun = fun() ->
-        filelib:fold_files(
-            DbDir,
-            <<".*", DbName/binary, "\.[0-9]+.*">>,
-            true,
-            fun(_F, _A) -> wait end,
-            ok
-        )
-    end,
-    ok = test_util:wait(WaitFun),
-    test_util:stop_couch(Ctx),
-    ok.
-
-ddoc_update_test_() ->
-    {
-        "Check ddoc update actions",
-        {
-            setup,
-            fun start/0,
-            fun stop/1,
-            fun check_all_indexers_exit_on_ddoc_change/1
-        }
-    }.
-
-check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
-    ?_test(begin
-        [DbShard1 | RestDbShards] = lists:map(
-            fun(Sh) ->
-                {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
-                ShardDb
-            end,
-            mem3:local_shards(mem3:dbname(DbName))
-        ),
-
-        % create a DDoc on Db1
-        DDocID = <<"idx_name">>,
-        DDocJson = couch_doc:from_json_obj(
-            {[
-                {<<"_id">>, DDocID},
-                {<<"value">>, 1}
-            ]}
-        ),
-        {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []),
-        {ok, DbShard} = couch_db:reopen(DbShard1),
-        {ok, DDoc} = couch_db:open_doc(
-            DbShard, DDocID, [ejson_body, ?ADMIN_CTX]
-        ),
-        DbShards = [DbShard | RestDbShards],
-        N = length(DbShards),
-
-        % run couch_index process for each shard database
-        ok = meck:reset(test_index),
-        lists:foreach(
-            fun(ShardDb) ->
-                couch_index_server:get_index(test_index, ShardDb, DDoc)
-            end,
-            DbShards
-        ),
-
-        IndexesBefore = get_indexes_by_ddoc(DDocID, N),
-        ?assertEqual(N, length(IndexesBefore)),
-
-        AliveBefore = lists:filter(fun is_process_alive/1, IndexesBefore),
-        ?assertEqual(N, length(AliveBefore)),
-
-        % update ddoc
-        DDocJson2 = couch_doc:from_json_obj(
-            {[
-                {<<"_id">>, DDocID},
-                {<<"value">>, 2},
-                {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
-            ]}
-        ),
-        {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []),
-
-        % assert that all index processes exit after ddoc updated
-        ok = meck:reset(test_index),
-        lists:foreach(
-            fun(I) ->
-                couch_index_server:handle_db_event(
-                    couch_db:name(DbShard),
-                    {ddoc_updated, DDocID},
-                    {st, "", couch_index_server:server_name(I), 
couch_index_server:by_sig(I),
-                        couch_index_server:by_pid(I), 
couch_index_server:by_db(I),
-                        couch_index_server:openers(I)}
-                )
-            end,
-            seq()
-        ),
-
-        ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
-        ?assertEqual(0, length(IndexesAfter)),
-
-        %% assert that previously running indexes are gone
-        AliveAfter = lists:filter(fun is_process_alive/1, IndexesBefore),
-        ?assertEqual(0, length(AliveAfter)),
-        ok
-    end).
-
-fake_index() ->
-    ok = meck:new([test_index], [non_strict]),
-    ok = meck:expect(test_index, init, fun(Db, DDoc) ->
-        {ok, {couch_db:name(Db), DDoc}}
-    end),
-    ok = meck:expect(test_index, open, fun(_Db, State) ->
-        {ok, State}
-    end),
-    ok = meck:expect(test_index, get, fun
-        (db_name, {DbName, _DDoc}) ->
-            DbName;
-        (idx_name, {_DbName, DDoc}) ->
-            DDoc#doc.id;
-        (signature, {_DbName, DDoc}) ->
-            couch_hash:md5_hash(term_to_binary(DDoc));
-        (update_seq, Seq) ->
-            Seq
-    end),
-    ok = meck:expect(test_index, shutdown, ['_'], ok).
-
-get_indexes_by_ddoc(DDocID, N) ->
-    Indexes = test_util:wait(fun() ->
-        Indxs = lists:flatmap(
-            fun(I) ->
-                ets:match_object(
-                    couch_index_server:by_db(I), {'$1', {DDocID, '$2'}}
-                )
-            end,
-            seq()
-        ),
-        case length(Indxs) == N of
-            true ->
-                Indxs;
-            false ->
-                wait
-        end
-    end),
-    lists:foldl(
-        fun({DbName, {_DDocID, Sig}}, Acc) ->
-            case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) 
of
-                [{_, Pid}] -> [Pid | Acc];
-                _ -> Acc
-            end
-        end,
-        [],
-        Indexes
-    ).
-
-seq() ->
-    lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl 
b/src/couch_mrview/src/couch_mrview_cleanup.erl
index e8a2833a7..fab449ee5 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -14,7 +14,8 @@
 
 -export([
     run/1,
-    cleanup/2
+    cleanup/2,
+    cleanup_processes/2
 ]).
 
 run(Db) ->
@@ -23,7 +24,8 @@ run(Db) ->
     {ok, Db1} = couch_db:reopen(Db),
     Sigs = couch_mrview_util:get_signatures(Db1),
     ok = cleanup_purges(Db1, Sigs, Checkpoints),
-    ok = cleanup_indices(Sigs, Indices).
+    ok = cleanup_indices(Sigs, Indices),
+    ok = cleanup_processes(Db1, Sigs).
 
 % erpc endpoint for fabric_index_cleanup:cleanup_indexes/2
 %
@@ -34,7 +36,8 @@ cleanup(Dbs, #{} = Sigs) ->
                 Indices = couch_mrview_util:get_index_files(Db),
                 Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
                 ok = cleanup_purges(Db, Sigs, Checkpoints),
-                ok = cleanup_indices(Sigs, Indices)
+                ok = cleanup_indices(Sigs, Indices),
+                ok = cleanup_processes(Db, Sigs)
             end,
             Dbs
         )
@@ -43,6 +46,37 @@ cleanup(Dbs, #{} = Sigs) ->
             ok
     end.
 
+% Clean up indexer processes whose signature is no longer in the valid set.
+%
+cleanup_processes(ShardName, #{} = Sigs) when is_binary(ShardName) ->
+    Entries = couch_index_server:shard_entries(ShardName),
+    lists:foreach(
+        fun({DDocId, Sig}) ->
+            HexSig = couch_util:to_hex_bin(Sig),
+            case maps:find(HexSig, Sigs) of
+                {ok, ValidDDocs} ->
+                    % Sig in use. If DDocId doesn't reference it any
+                    % longer drop the stale by_db row
+                    case maps:is_key(DDocId, ValidDDocs) of
+                        true ->
+                            ok;
+                        false ->
+                            couch_index_server:forget_ddoc_binding(ShardName, 
DDocId, Sig)
+                    end;
+                error ->
+                    case couch_index_server:shard_index_pid(ShardName, Sig) of
+                        {ok, IndexPid} ->
+                            (catch gen_server:cast(IndexPid, {ddoc_updated, 
{not_found, deleted}}));
+                        not_found ->
+                            ok
+                    end
+            end
+        end,
+        Entries
+    );
+cleanup_processes(Db, #{} = Sigs) ->
+    cleanup_processes(couch_db:name(Db), Sigs).
+
 cleanup_purges(Db, Sigs, Checkpoints) ->
     couch_index_util:cleanup_purges(Db, Sigs, Checkpoints).
 
diff --git a/src/couch_mrview/src/couch_mrview_util.erl 
b/src/couch_mrview/src/couch_mrview_util.erl
index 48138bf76..17f6db34d 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -112,17 +112,19 @@ get_signatures(Db) ->
     DDocs1 = lists:foldl(FoldFun, [], DDocs),
     get_signatures_from_ddocs(DbName, DDocs1).
 
-% From a list of design #doc{} records returns signatures map: #{Sig => true}
-% This will be valid signatures of views we expect to run and build on this
-% node.
+% From a list of design #doc{} records returns the map
+% #{Sig => #{DDocId => true}}. The keys are the valid sig of views
+% and inner maps are ddocs referencing those sigs (we can have multiple
+% ddocs referencing the same sig).
 get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) ->
-    FoldFun = fun(#doc{} = Doc, Acc) ->
+    FoldFun = fun(#doc{id = DDocId} = Doc, Acc) ->
         try ddoc_to_mrst(DbName, Doc) of
             {ok, Mrst} ->
                 case couch_mrview_util:mrst_has_valid_views(Mrst) of
                     true ->
                         Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
-                        Acc#{Sig => true};
+                        Inner = maps:get(Sig, Acc, #{}),
+                        Acc#{Sig => Inner#{DDocId => true}};
                     false ->
                         Acc
                 end
diff --git a/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl 
b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl
new file mode 100644
index 000000000..52aee1332
--- /dev/null
+++ b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_cleanup_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TEST_INDEX, test_index).
+-define(DDOC_ID, <<"idx_name">>).
+
+start() ->
+    fake_index(),
+    Ctx = test_util:start_couch([mem3, fabric]),
+    config:set("couchdb", "index_cleanup_delay_msec", "60000", false),
+    DbName = ?tempdb(),
+    ok = fabric:create_db(DbName, [?ADMIN_CTX]),
+    {Ctx, DbName}.
+
+stop({Ctx, DbName}) ->
+    meck:unload(?TEST_INDEX),
+    ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
+    DbDir = config:get("couchdb", "database_dir", "."),
+    WaitFun = fun() ->
+        filelib:fold_files(
+            DbDir,
+            <<".*", DbName/binary, "\.[0-9]+.*">>,
+            true,
+            fun(_F, _A) -> wait end,
+            ok
+        )
+    end,
+    ok = test_util:wait(WaitFun),
+    config:delete("couchdb", "index_cleanup_delay_msec", false),
+    test_util:stop_couch(Ctx),
+    ok.
+
+cleanup_test_() ->
+    {
+        "couch_mrview_cleanup",
+        {
+            foreach,
+            fun start/0,
+            fun stop/1,
+            [
+                ?TDEF_FE(t_orphan_sigs_are_reaped),
+                ?TDEF_FE(t_valid_sigs_survive),
+                ?TDEF_FE(t_shared_sig_drops_stale_ddoc_row),
+                ?TDEF_FE(t_schedule_dedupes_within_window)
+            ]
+        }
+    }.
+
+% Sig is now invalid, reap it!
+t_orphan_sigs_are_reaped({_Ctx, DbName}) ->
+    [DbShard1 | RestDbShards] = open_shards(DbName),
+    {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+    DbShards = [DbShard | RestDbShards],
+    N = length(DbShards),
+    spawn_indexers(DbShards, DDoc),
+    IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+    ?assertEqual(N, length(IndexesBefore)),
+
+    ok = meck:reset(?TEST_INDEX),
+    lists:foreach(
+        fun(DbShardName) ->
+            couch_mrview_cleanup:cleanup_processes(DbShardName, #{})
+        end,
+        [couch_db:name(S) || S <- DbShards]
+    ),
+
+    wait_until_dead(IndexesBefore),
+    ?assertEqual(0, length(get_indexes_by_ddoc(?DDOC_ID, 0))),
+    ?assertEqual(0, length(lists:filter(fun is_process_alive/1, 
IndexesBefore))).
+
+% Sig is valid, leave it alone
+t_valid_sigs_survive({_Ctx, DbName}) ->
+    [DbShard1 | RestDbShards] = open_shards(DbName),
+    {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+    DbShards = [DbShard | RestDbShards],
+    N = length(DbShards),
+    spawn_indexers(DbShards, DDoc),
+    IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+    ?assertEqual(N, length(IndexesBefore)),
+
+    % All test indexes share the same sig (as mocked)
+    [{_, {_, RawSig}} | _] = ets:match_object(
+        couch_index_server:by_db(couch_db:name(DbShard)),
+        {couch_db:name(DbShard), {?DDOC_ID, '$1'}}
+    ),
+    ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{?DDOC_ID => true}},
+
+    lists:foreach(
+        fun(DbShardName) ->
+            couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs)
+        end,
+        [couch_db:name(S) || S <- DbShards]
+    ),
+
+    timer:sleep(100),
+    ?assertEqual(N, length(get_indexes_by_ddoc(?DDOC_ID, N))),
+    ?assertEqual(N, length(lists:filter(fun is_process_alive/1, 
IndexesBefore))).
+
+% Two ddocs share the same sig. Indexer has to stay alive old ddoc removed
+t_shared_sig_drops_stale_ddoc_row({_Ctx, DbName}) ->
+    [DbShard1 | RestDbShards] = open_shards(DbName),
+    {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+    DbShards = [DbShard | RestDbShards],
+    N = length(DbShards),
+    spawn_indexers(DbShards, DDoc),
+    IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+    ?assertEqual(N, length(IndexesBefore)),
+
+    [{_, {_, RawSig}} | _] = ets:match_object(
+        couch_index_server:by_db(couch_db:name(DbShard)),
+        {couch_db:name(DbShard), {?DDOC_ID, '$1'}}
+    ),
+    OtherDDocId = <<"some_other_ddoc">>,
+    ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{OtherDDocId => true}},
+
+    lists:foreach(
+        fun(DbShardName) ->
+            couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs)
+        end,
+        [couch_db:name(S) || S <- DbShards]
+    ),
+
+    test_util:wait(fun() ->
+        Stale = lists:flatmap(
+            fun(I) ->
+                ets:match_object(
+                    couch_index_server:by_db(I), {'$1', {?DDOC_ID, '$2'}}
+                )
+            end,
+            seq()
+        ),
+        case Stale of
+            [] -> ok;
+            _ -> wait
+        end
+    end),
+    ?assertEqual(N, length(lists:filter(fun is_process_alive/1, 
IndexesBefore))).
+
+% Three schedule calls should dedup
+t_schedule_dedupes_within_window({_Ctx, DbName}) ->
+    ClusteredDbName = mem3:dbname(DbName),
+    ok = couch_index_cleanup:schedule(ClusteredDbName),
+    ok = couch_index_cleanup:schedule(ClusteredDbName),
+    ok = couch_index_cleanup:schedule(ClusteredDbName),
+    ?assertEqual([ClusteredDbName], pending_dbnames()).
+
+% Helpers. Some copied from other tests
+
+open_shards(DbName) ->
+    lists:map(
+        fun(Sh) ->
+            {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
+            ShardDb
+        end,
+        mem3:local_shards(mem3:dbname(DbName))
+    ).
+
+create_ddoc(Db, DDocID) ->
+    DDocJson = couch_doc:from_json_obj(
+        {[
+            {<<"_id">>, DDocID},
+            {<<"value">>, 1}
+        ]}
+    ),
+    {ok, _Rev} = couch_db:update_doc(Db, DDocJson, []),
+    {ok, Db1} = couch_db:reopen(Db),
+    {ok, DDoc} = couch_db:open_doc(Db1, DDocID, [ejson_body, ?ADMIN_CTX]),
+    {DDoc, Db1}.
+
+spawn_indexers(DbShards, DDoc) ->
+    ok = meck:reset(?TEST_INDEX),
+    lists:foreach(
+        fun(ShardDb) ->
+            couch_index_server:get_index(?TEST_INDEX, ShardDb, DDoc)
+        end,
+        DbShards
+    ).
+
+fake_index() ->
+    ok = meck:new([?TEST_INDEX], [non_strict]),
+    ok = meck:expect(?TEST_INDEX, init, fun(Db, DDoc) ->
+        {ok, {couch_db:name(Db), DDoc}}
+    end),
+    ok = meck:expect(?TEST_INDEX, open, fun(_Db, State) ->
+        {ok, State}
+    end),
+    ok = meck:expect(?TEST_INDEX, get, fun
+        (db_name, {DbName, _DDoc}) ->
+            DbName;
+        (idx_name, {_DbName, DDoc}) ->
+            DDoc#doc.id;
+        (signature, {_DbName, DDoc}) ->
+            couch_hash:md5_hash(term_to_binary(DDoc));
+        (update_seq, Seq) ->
+            Seq
+    end),
+    ok = meck:expect(?TEST_INDEX, shutdown, ['_'], ok).
+
+get_indexes_by_ddoc(DDocID, N) ->
+    Indexes = test_util:wait(fun() ->
+        Indxs = lists:flatmap(
+            fun(I) ->
+                ets:match_object(
+                    couch_index_server:by_db(I), {'$1', {DDocID, '$2'}}
+                )
+            end,
+            seq()
+        ),
+        case length(Indxs) == N of
+            true -> Indxs;
+            false -> wait
+        end
+    end),
+    lists:foldl(
+        fun({DbName, {_DDocID, Sig}}, Acc) ->
+            case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) 
of
+                [{_, Pid}] -> [Pid | Acc];
+                _ -> Acc
+            end
+        end,
+        [],
+        Indexes
+    ).
+
+wait_until_dead(Pids) ->
+    test_util:wait(fun() ->
+        case lists:filter(fun is_process_alive/1, Pids) of
+            [] -> ok;
+            _ -> wait
+        end
+    end).
+
+pending_dbnames() ->
+    {st, Pending} = sys:get_state(couch_index_cleanup),
+    lists:sort(maps:keys(Pending)).
+
+seq() ->
+    lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl 
b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
index c304dcdad..75614b728 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
@@ -98,7 +98,8 @@ t_get_signatures_local({_, Db}) ->
     Sigs = couch_mrview_util:get_signatures(DbName),
     ?assert(is_map(Sigs)),
     ?assertEqual(1, map_size(Sigs)),
-    [{Sig, true}] = maps:to_list(Sigs),
+    [{Sig, DDocs}] = maps:to_list(Sigs),
+    ?assertEqual(#{?DDOC_ID => true}, DDocs),
     {ok, Info} = couch_mrview:get_info(Db, ?DDOC_ID),
     ?assertEqual(proplists:get_value(signature, Info), Sig),
 
@@ -115,7 +116,8 @@ t_get_signatures_clustered({DbName, _Db}) ->
     ?assertEqual(Sigs, couch_mrview_util:get_signatures(ShardName2)),
     ?assert(is_map(Sigs)),
     ?assertEqual(1, map_size(Sigs)),
-    [{Sig, true}] = maps:to_list(Sigs),
+    [{Sig, DDocs}] = maps:to_list(Sigs),
+    ?assertEqual(#{?DDOC_ID => true}, DDocs),
     {ok, Info} = couch_mrview:get_info(ShardName1, ?DDOC_ID),
     ?assertEqual(proplists:get_value(signature, Info), Sig),
 

Reply via email to