This is an automated email from the ASF dual-hosted git repository.
jiahuili430 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 5f2aee142 Couch scanner plugin: conflict finder (#5466)
5f2aee142 is described below
commit 5f2aee142709bf6733e13d47f960f086e862b129
Author: Jiahui Li <[email protected]>
AuthorDate: Mon Apr 7 16:23:52 2025 -0500
Couch scanner plugin: conflict finder (#5466)
* Fix typo in couch scanner
* Couch scanner plugin: conflict finder
Add `couch_scanner_plugin_conflict_finder` to find conflicting docs
in the database.
- If `doc_report` is set to true, the report will show doc name and
revision information.
e.g.: `couch_scanner_plugin_conflict_finder s:1742238038-9e432378f2f1 db:db
doc:d1 conflicts:["2-y","2-x"] deleted_conflicts:["2-d"]`
- Otherwise, only conflicted revision numbers are accumulated.
e.g.: `couch_scanner_plugin_conflict_finder s:1742238038-9e432378f2f1 db:db
conflicts:2 deleted_conflicts:1`
- If `max_revs = 1`, and the document has more than 1 revision, the
doc report will only show the length of the revs info, without the
details. By default, `max_revs = 10`.
e.g.: `couch_scanner_plugin_conflict_finder s:1742238038-9e432378f2f1 db:db
doc:d1 conflicts:2 deleted_conflicts:["2-d"]`
---
rel/overlay/etc/default.ini | 15 ++
src/couch_scanner/README.md | 2 +-
src/couch_scanner/src/couch_scanner_plugin.erl | 4 +-
.../src/couch_scanner_plugin_conflict_finder.erl | 212 +++++++++++++++++++++
.../test/eunit/couch_scanner_test.erl | 66 +++++--
5 files changed, 285 insertions(+), 14 deletions(-)
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 98199f970..5547b1458 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -1003,6 +1003,7 @@ url = {{nouveau_url}}
[couch_scanner_plugins]
;couch_scanner_plugin_ddoc_features = false
;couch_scanner_plugin_find = false
+;couch_scanner_plugin_conflict_finder = false
;couch_quickjs_scanner_plugin = false
; The following [$plugin*] settings apply to all plugins
@@ -1070,6 +1071,20 @@ url = {{nouveau_url}}
; is to aggregate reports per database.
;ddoc_report = false
+[couch_scanner_plugin_conflict_finder]
+; Which types of conflicting docs to scan.
+;conflicts = true
+;deleted_conflicts = true
+
+; Emit reports for each conflicted doc or aggregate them per database.
+; If doc_report is set to true, the report will indicate doc name and revs
info.
+; Otherwise, only conflicted doc numbers are accumulated.
+;doc_report = true
+
+; Limit the number of revs shown in the doc report.
+; To see the full list, please try `GET
/{db}/{docid}?conflicts=true&deleted_conflicts=true`
+;max_revs = 10
+
[couch_quickjs_scanner_plugin]
; Limit the number of ddocs processed per db
;max_ddocs = 100
diff --git a/src/couch_scanner/README.md b/src/couch_scanner/README.md
index 660b6899f..55d9a1685 100644
--- a/src/couch_scanner/README.md
+++ b/src/couch_scanner/README.md
@@ -88,7 +88,7 @@ checkpoint document. Plugin modules may implement the
optional `checkpoint/1`
API and save some plugin specific data alongside the database traversal
checkpoint which gets saved automatically. For instance, it maybe useful for
plugins to save their start up configuration to detect when it changes so they
-could restart their scanning. Or, it they want to accumulate some statistics
+could restart their scanning. Or, if they want to accumulate some statistics
and only emit them at the end of the scan.
Reading and writing to checkpoints is done in the
diff --git a/src/couch_scanner/src/couch_scanner_plugin.erl
b/src/couch_scanner/src/couch_scanner_plugin.erl
index ff521f604..04f394c12 100644
--- a/src/couch_scanner/src/couch_scanner_plugin.erl
+++ b/src/couch_scanner/src/couch_scanner_plugin.erl
@@ -35,7 +35,7 @@
% calls will be called with the same St object, and may return an updated
% version of it.
%
-% If the plugin hasn't completed runing and has resumed running after the node
+% If the plugin hasn't completed running and has resumed running after the node
% was restarted or an error happened, the resume/2 function will be called.
% That's the difference between start and resume: start/2 is called when the
% scan starts from the beginning (first db, first shard, ...), and resume/2 is
@@ -52,7 +52,7 @@
% checkpoint map value.
%
% The complete/1 callback is called when the scan has finished. The complete
-% callback should return final checkpoint map object. The last checkoint will
+% callback should return final checkpoint map object. The last checkpoint will
% be written and then it will be passed to the start/2 callback if the plugin
% runs again.
%
diff --git a/src/couch_scanner/src/couch_scanner_plugin_conflict_finder.erl
b/src/couch_scanner/src/couch_scanner_plugin_conflict_finder.erl
new file mode 100644
index 000000000..5f8f17527
--- /dev/null
+++ b/src/couch_scanner/src/couch_scanner_plugin_conflict_finder.erl
@@ -0,0 +1,212 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_scanner_plugin_conflict_finder).
+-behaviour(couch_scanner_plugin).
+
+-export([
+ start/2,
+ resume/2,
+ complete/1,
+ checkpoint/1,
+ db/2,
+ doc_id/3
+]).
+
+-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
+
+-record(st, {
+ sid,
+ opts = #{},
+ dbname,
+ report = #{},
+ doc_report = true,
+ max_revs = 10
+}).
+
+-define(CONFLICTS, <<"conflicts">>).
+-define(DELETED_CONFLICTS, <<"deleted_conflicts">>).
+
+-define(OPTS, #{
+ ?CONFLICTS => true,
+ ?DELETED_CONFLICTS => true
+}).
+
+% Behavior callbacks
+
+start(SId, #{}) ->
+ St = init_config(#st{sid = SId}),
+ ?INFO("Starting.", [], #{sid => SId}),
+ {ok, St}.
+
+resume(SId, #{<<"opts">> := OldOpts}) ->
+ St = init_config(#st{sid = SId}),
+ case OldOpts == St#st.opts of
+ true ->
+ ?INFO("Resuming.", [], #{sid => SId}),
+ {ok, St};
+ false ->
+ ?INFO("Resetting. Config changed.", [], #{sid => SId}),
+ reset
+ end.
+
+complete(#st{sid = SId, dbname = DbName, report = Report} = St) ->
+ report_per_db(St, DbName, Report),
+ ?INFO("Completed", [], #{sid => SId}),
+ {ok, #{}}.
+
+checkpoint(#st{sid = SId, opts = Opts}) ->
+ case Opts == opts() of
+ true ->
+ {ok, #{<<"opts">> => Opts}};
+ false ->
+ ?INFO("Resetting. Config changed.", [], #{sid => SId}),
+ reset
+ end.
+
+db(#st{} = St, _DbName) ->
+ {ok, St}.
+
+doc_id(#st{} = St, <<?DESIGN_DOC_PREFIX, _/binary>>, _Db) ->
+ {skip, St};
+doc_id(#st{} = St, DocId, Db) ->
+ {ok, #doc_info{revs = Revs}} = couch_db:get_doc_info(Db, DocId),
+ DbName = mem3:dbname(couch_db:name(Db)),
+ {ok, check(St, DbName, DocId, Revs)}.
+
+% Private
+
+init_config(#st{} = St) ->
+ St#st{
+ opts = opts(),
+ doc_report = cfg_bool("doc_report", St#st.doc_report),
+ max_revs = cfg_int("max_revs", St#st.max_revs)
+ }.
+
+cfg_int(Key, Default) when is_list(Key), is_integer(Default) ->
+ config:get_integer(atom_to_list(?MODULE), Key, Default).
+
+cfg_bool(Key, Default) when is_list(Key), is_boolean(Default) ->
+ config:get_boolean(atom_to_list(?MODULE), Key, Default).
+
+opts() ->
+ Fun = fun(Key, Default) -> cfg_bool(binary_to_list(Key), Default) end,
+ maps:map(Fun, ?OPTS).
+
+check(#st{} = St, _, _, Revs) when length(Revs) =< 1 ->
+ St;
+check(#st{doc_report = true, max_revs = Max, opts = Opts} = St, DbName, DocId,
Revs) ->
+ {DeletedConflicts, Conflicts} = lists:partition(fun(R) ->
R#rev_info.deleted end, Revs),
+ ConflictsReport = gen_report(doc, ?CONFLICTS, Opts, Conflicts, Max),
+ DeletedConflictsReport = gen_report(doc, ?DELETED_CONFLICTS, Opts,
DeletedConflicts, Max),
+ DocReport = maps:merge(ConflictsReport, DeletedConflictsReport),
+ report_per_doc(#st{} = St, DbName, DocId, DocReport),
+ DbReport = maps:map(
+ fun(_K, V) ->
+ case V of
+ V when is_list(V) -> length(V);
+ N when is_number(N) -> N
+ end
+ end,
+ DocReport
+ ),
+ report(#st{} = St, DbName, DbReport);
+check(#st{max_revs = _Max, opts = Opts} = St, DbName, _DocId, Revs) ->
+ {DeletedConflicts, Conflicts} =
+ lists:partition(fun(R) -> R#rev_info.deleted end, Revs),
+ ConflictsReport = gen_report(db, ?CONFLICTS, Opts, Conflicts, _Max),
+ DeletedConflictsReport = gen_report(db, ?DELETED_CONFLICTS, Opts,
DeletedConflicts, _Max),
+ DbReport = maps:merge(ConflictsReport, DeletedConflictsReport),
+ report(#st{} = St, DbName, DbReport).
+
+gen_report(doc, ?CONFLICTS, #{?CONFLICTS := true}, Revs, Max) when
length(Revs) =< Max ->
+ #{?CONFLICTS => [?b2l(couch_doc:rev_to_str(R#rev_info.rev)) || R <- Revs]};
+gen_report(doc, ?DELETED_CONFLICTS, #{?DELETED_CONFLICTS := true}, Revs, Max)
when
+ length(Revs) =< Max
+->
+ #{?DELETED_CONFLICTS => [?b2l(couch_doc:rev_to_str(R#rev_info.rev)) || R
<- Revs]};
+gen_report(_ReportType, ?CONFLICTS, #{?CONFLICTS := true}, Revs, _Max) ->
+ #{?CONFLICTS => length(Revs)};
+gen_report(_ReportType, ?DELETED_CONFLICTS, #{?DELETED_CONFLICTS := true},
Revs, _Max) ->
+ #{?DELETED_CONFLICTS => length(Revs)};
+gen_report(_ReportType, _Key, #{} = _Opts, _Revs, _Max) ->
+ #{}.
+
+report(#st{} = St, DbName, Report) ->
+ #st{report = Total, dbname = PrevDbName} = St,
+ case is_binary(PrevDbName) andalso DbName =/= PrevDbName of
+ true ->
+ % We switched dbs, so report stats for old db
+ % and make the new one the current one
+ report_per_db(St, PrevDbName, Total),
+ St#st{report = Report, dbname = DbName};
+ false ->
+ % Keep accumulating per-db stats
+ St#st{report = merge_report(Total, Report), dbname = DbName}
+ end.
+
+merge_report(#{} = Total, #{} = Update) ->
+ Fun = fun(_K, V1, V2) -> V1 + V2 end,
+ maps:merge_with(Fun, Total, Update).
+
+report_per_db(#st{sid = SId}, DbName, #{} = Report) when
+ map_size(Report) > 0, is_binary(DbName)
+->
+ {Fmt, Args} = report_fmt(Report),
+ Meta = #{sid => SId, db => DbName},
+ ?WARN(Fmt, Args, Meta);
+report_per_db(#st{}, _, _) ->
+ ok.
+
+report_per_doc(#st{sid = SId}, DbName, DocId, Report) when
+ map_size(Report) > 0, is_binary(DbName)
+->
+ {Fmt, Args} = report_fmt(Report),
+ Meta = #{sid => SId, db => DbName, doc => DocId},
+ ?WARN(Fmt, Args, Meta);
+report_per_doc(#st{}, _, _, _) ->
+ ok.
+
+report_fmt(Report) ->
+ Sorted = lists:sort(maps:to_list(Report)),
+ FmtArgs = [{"~s:~p ", [K, V]} || {K, V} <- Sorted],
+ {Fmt1, Args1} = lists:unzip(FmtArgs),
+ Fmt2 = lists:flatten(Fmt1),
+ Args2 = flatten_one_level(Args1),
+ {Fmt2, Args2}.
+
+flatten_one_level(List) when is_list(List) ->
+ case lists:flatten(List) =:= List of
+ true ->
+ List;
+ false ->
+ lists:append([
+ case is_list(E) of
+ true -> E;
+ false -> [E]
+ end
+ || E <- List
+ ])
+ end.
+
+-ifdef(TEST).
+-include_lib("couch/include/couch_eunit.hrl").
+flatten_one_level_test() ->
+ ?assertEqual([1, 2, 3], flatten_one_level([1, 2, 3])),
+ ?assertEqual([1, 2, 3], flatten_one_level([[1], 2, 3])),
+ ?assertEqual([1, 2, 3], flatten_one_level([[1, 2], 3])),
+ ?assertEqual([1, 2, 3], flatten_one_level([[1], 2, [3]])),
+ ?assertEqual([1, [2], 3], flatten_one_level([[1, [2]], 3])),
+ ?assertEqual([1, [2], 3], flatten_one_level([[1, [2]], [3]])),
+ ?assertEqual([1, [2, [3]]], flatten_one_level([1, [[2, [3]]]])),
+ ?assertError(function_clause, flatten_one_level(wrong)).
+-endif.
diff --git a/src/couch_scanner/test/eunit/couch_scanner_test.erl
b/src/couch_scanner/test/eunit/couch_scanner_test.erl
index 3824b4fd6..6f7f3d8e2 100644
--- a/src/couch_scanner/test/eunit/couch_scanner_test.erl
+++ b/src/couch_scanner/test/eunit/couch_scanner_test.erl
@@ -26,6 +26,7 @@ couch_scanner_test_() ->
?TDEF_FE(t_run_through_all_callbacks_basic, 10),
?TDEF_FE(t_find_reporting_works, 10),
?TDEF_FE(t_ddoc_features_works, 20),
+ ?TDEF_FE(t_conflict_finder_works, 30),
?TDEF_FE(t_config_skips, 10),
?TDEF_FE(t_resume_after_error, 10),
?TDEF_FE(t_reset, 10),
@@ -38,9 +39,12 @@ couch_scanner_test_() ->
-define(DOC2, <<"_design/scanner_test_doc2">>).
-define(DOC3, <<"scanner_test_doc3">>).
-define(DOC4, <<"_design/scanner_test_doc4">>).
+-define(DOC5, <<"conflicts_doc5">>).
+-define(DOC6, <<"deleted_conflicts_doc6">>).
-define(FIND_PLUGIN, couch_scanner_plugin_find).
-define(FEATURES_PLUGIN, couch_scanner_plugin_ddoc_features).
+-define(CONFLICTS_PLUGIN, couch_scanner_plugin_conflict_finder).
setup() ->
{module, _} = code:ensure_loaded(?FIND_PLUGIN),
@@ -50,8 +54,10 @@ setup() ->
Ctx = test_util:start_couch([fabric, couch_scanner]),
DbName1 = <<"dbname1", (?tempdb())/binary>>,
DbName2 = <<"dbname2", (?tempdb())/binary>>,
+ DbName3 = <<"dbname3", (?tempdb())/binary>>,
ok = fabric:create_db(DbName1, [{q, "2"}, {n, "1"}]),
ok = fabric:create_db(DbName2, [{q, "2"}, {n, "1"}]),
+ ok = fabric:create_db(DbName3, [{q, "2"}, {n, "1"}]),
ok = add_doc(DbName1, ?DOC1, #{foo1 => bar}),
ok = add_doc(DbName1, ?DOC2, #{
foo2 => baz,
@@ -77,15 +83,20 @@ setup() ->
}),
ok = add_doc(DbName2, ?DOC3, #{foo3 => bax}),
ok = add_doc(DbName2, ?DOC4, #{foo4 => baw, <<>> =>
this_is_ok_apparently}),
+ add_docs(DbName3, [
+ {doc, ?DOC5, {2, [<<"x">>, <<"z">>]}, {[]}, [], false, []},
+ {doc, ?DOC5, {2, [<<"y">>, <<"z">>]}, {[]}, [], false, []}
+ ]),
couch_scanner:reset_checkpoints(),
- {Ctx, {DbName1, DbName2}}.
+ {Ctx, {DbName1, DbName2, DbName3}}.
-teardown({Ctx, {DbName1, DbName2}}) ->
+teardown({Ctx, {DbName1, DbName2, DbName3}}) ->
config:delete("couch_scanner", "maintenance_mode", false),
config_delete_section("couch_scanner"),
config_delete_section("couch_scanner_plugins"),
config_delete_section(atom_to_list(?FEATURES_PLUGIN)),
config_delete_section(atom_to_list(?FIND_PLUGIN)),
+ config_delete_section(atom_to_list(?CONFLICTS_PLUGIN)),
lists:foreach(
fun(Subsection) ->
config_delete_section(atom_to_list(?FIND_PLUGIN) ++ "." ++
Subsection)
@@ -96,6 +107,7 @@ teardown({Ctx, {DbName1, DbName2}}) ->
couch_scanner:resume(),
fabric:delete_db(DbName1),
fabric:delete_db(DbName2),
+ fabric:delete_db(DbName3),
test_util:stop_couch(Ctx),
meck:unload().
@@ -117,7 +129,7 @@ t_start_stop(_) ->
?assertEqual(ok, couch_scanner_server:resume()),
?assertMatch(#{stopped := false}, couch_scanner:status()).
-t_run_through_all_callbacks_basic({_, {DbName1, DbName2}}) ->
+t_run_through_all_callbacks_basic({_, {DbName1, DbName2, _}}) ->
% Run the "find" plugin without any regexes
meck:reset(couch_scanner_server),
config:set("couch_scanner_plugins", atom_to_list(?FIND_PLUGIN), "true",
false),
@@ -154,7 +166,7 @@ t_find_reporting_works(_) ->
% doc2 should have a baz and doc1 and doc4 matches foo14
?assertEqual(3, log_calls(warning)).
-t_ddoc_features_works({_, {_, DbName2}}) ->
+t_ddoc_features_works({_, {_, DbName2, _}}) ->
% Run the "ddoc_features" plugin
Plugin = atom_to_list(?FEATURES_PLUGIN),
config:set(Plugin, "filters", "true", false),
@@ -171,17 +183,36 @@ t_ddoc_features_works({_, {_, DbName2}}) ->
ok = add_doc(DbName2, <<"_design/doc42">>, #{
rewrites => <<"function(r) {return r;}">>
}),
- config:set("couch_scanner", "interval_sec", "1", false),
- couch_scanner:resume(),
+ resume_couch_scanner(Plugin),
+ ?assertEqual(2, meck:num_calls(couch_scanner_util, log, LogArgs)).
+
+t_conflict_finder_works({_, {_, _, DbName3}}) ->
+ % Run the "conflict_finder" plugin
+ Plugin = atom_to_list(?CONFLICTS_PLUGIN),
+ config:set(Plugin, "conflicts", "true", false),
+ config:set(Plugin, "deleted_conflicts", "true", false),
+ config:set(Plugin, "doc_report", "true", false),
meck:reset(couch_scanner_server),
meck:reset(couch_scanner_util),
- Now = erlang:system_time(second),
- TStamp = calendar:system_time_to_rfc3339(Now + 1, [{offset, "Z"}]),
- config:set(Plugin, "after", TStamp, false),
+ config:set("couch_scanner_plugins", Plugin, "true", false),
wait_exit(10000),
- ?assertEqual(2, meck:num_calls(couch_scanner_util, log, LogArgs)).
+ LogArgs = [warning, ?CONFLICTS_PLUGIN, '_', '_', '_'],
+ ?assertEqual(2, meck:num_calls(couch_scanner_util, log, LogArgs)),
+ % Add a deleted conflicting doc to the third database.
+ % 3 reports are expected: 2 doc reports and 1 db report.
+ add_docs(DbName3, [
+ {doc, ?DOC6, {2, [<<"x">>, <<"z">>]}, {[]}, [], false, []},
+ {doc, ?DOC6, {2, [<<"d">>, <<"z">>]}, {[]}, [], true, []}
+ ]),
+ resume_couch_scanner(Plugin),
+ ?assertEqual(3, meck:num_calls(couch_scanner_util, log, LogArgs)),
+ % Set doc_report to false to only have 1 db report.
+ config:set(Plugin, "doc_report", "false", false),
+ resume_couch_scanner(Plugin),
+ ?assertEqual(1, meck:num_calls(couch_scanner_util, log, LogArgs)),
+ ok.
-t_config_skips({_, {DbName1, DbName2}}) ->
+t_config_skips({_, {DbName1, DbName2, _}}) ->
Plugin = atom_to_list(?FIND_PLUGIN),
config:set(Plugin ++ ".skip_dbs", "x", binary_to_list(DbName2), false),
config:set(Plugin ++ ".skip_docs", "y", binary_to_list(?DOC1), false),
@@ -276,6 +307,9 @@ mkdoc(Id, #{} = Body) ->
Body1 = Body#{<<"_id">> => Id},
jiffy:decode(jiffy:encode(Body1)).
+add_docs(DbName, Docs) ->
+ {ok, []} = fabric:update_docs(DbName, Docs, [?REPLICATED_CHANGES,
?ADMIN_CTX]).
+
num_calls(Fun, Args) ->
meck:num_calls(?FIND_PLUGIN, Fun, Args).
@@ -284,3 +318,13 @@ log_calls(Level) ->
wait_exit(MSec) ->
meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'],
MSec).
+
+resume_couch_scanner(Plugin) ->
+ config:set("couch_scanner", "interval_sec", "1", false),
+ couch_scanner:resume(),
+ meck:reset(couch_scanner_server),
+ meck:reset(couch_scanner_util),
+ Now = erlang:system_time(second),
+ TStamp = calendar:system_time_to_rfc3339(Now + 1, [{offset, "Z"}]),
+ config:set(Plugin, "after", TStamp, false),
+ wait_exit(10000).