This is an automated email from the ASF dual-hosted git repository. jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 2af5b5a2b53cc26f84b4a7fed81636aab5f9e46f Author: jiangphcn <jian...@cn.ibm.com> AuthorDate: Thu Jun 21 16:10:06 2018 +0800 Address Ilya's comments - Use couch_epi:any/5 to call index verification func - swap "stop_couch/1" and "remove files" - refactor reason for throw bad_request - directly use meck:unload/0 COUCHDB-3326 --- src/couch/src/couch_db.erl | 44 +++-------------- src/couch/src/couch_db_plugin.erl | 6 +++ src/couch/test/couch_bt_engine_upgrade_tests.erl | 4 +- .../src/couch_index_plugin_couch_db.erl | 5 ++ src/couch_mrview/src/couch_mrview_index.erl | 55 ++++++++-------------- src/couch_mrview/src/couch_mrview_util.erl | 5 +- .../test/couch_mrview_purge_docs_fabric_tests.erl | 2 +- .../test/couch_mrview_purge_docs_tests.erl | 25 +++++++--- .../src/cpse_test_purge_bad_checkpoints.erl | 49 ++----------------- src/mem3/src/mem3_epi.erl | 3 +- .../src/mem3_plugin_couch_db.erl} | 8 ++-- src/mem3/src/mem3_rep.erl | 49 ++++++++++--------- 12 files changed, 96 insertions(+), 159 deletions(-) diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2645ea2..2a3eb85 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -81,7 +81,6 @@ get_minimum_purge_seq/1, purge_client_exists/3, - get_purge_client_fun/2, update_doc/3, update_doc/4, @@ -451,6 +450,7 @@ get_minimum_purge_seq(#db{} = Db) -> {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")} ], {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts), + FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of true -> MinIdxSeq; false -> erlang:max(0, PurgeSeq - PurgeInfosLimit) @@ -475,56 +475,26 @@ purge_client_exists(DbName, DocId, Props) -> LagThreshold = NowSecs - LagWindow, try - CheckFun = get_purge_client_fun(DocId, Props), - Exists = CheckFun(Props), + Exists = couch_db_plugin:is_valid_purge_client(Props), if not Exists -> ok; true -> Updated = couch_util:get_value(<<"updated_on">>, Props), if is_integer(Updated) and Updated > LagThreshold -> ok; true -> Diff = NowSecs - Updated, - Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds", - couch_log:error(Fmt1, [DocId, Diff]) + Fmt2 = "Purge checkpoint '~s' not updated in ~p seconds", + couch_log:error(Fmt2, [DocId, Diff]) end end, Exists catch _:_ -> % If we fail to check for a client we have to assume that % it exists. - Fmt2 = "Failed to check purge checkpoint using - document '~p' on database ~p", - couch_log:error(Fmt2, [DbName, DocId]), + Fmt3 = "Failed to check purge checkpoint using + document '~p' in database ~p", + couch_log:error(Fmt3, [DbName, DocId]), true end. -get_purge_client_fun(DocId, Props) -> - M0 = couch_util:get_value(<<"verify_module">>, Props), - M = try - binary_to_existing_atom(M0, latin1) - catch error:badarg -> - Fmt1 = "Missing index module '~p' for purge checkpoint '~s'", - couch_log:error(Fmt1, [M0, DocId]), - throw(failed) - end, - - F0 = couch_util:get_value(<<"verify_function">>, Props), - try - F = binary_to_existing_atom(F0, latin1), - case erlang:function_exported(M, F, 1) of - true -> - fun M:F/1; - false -> - Fmt2 = "Missing exported function '~p' in '~p' - for purge checkpoint '~s'", - couch_log:error(Fmt2, [F0, M0, DocId]), - throw(failed) - end - catch error:badarg -> - Fmt3 = "Missing function '~p' in '~p' for purge checkpoint '~s'", - couch_log:error(Fmt3, [F0, M0, DocId]), - throw(failed) - end. - - set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity); diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 8163256..c0bcc2f 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -18,6 +18,7 @@ after_doc_read/2, validate_docid/1, check_is_admin/1, + is_valid_purge_client/1, on_compact/2, on_delete/2 ]). @@ -57,6 +58,11 @@ check_is_admin(Db) -> %% callbacks return true only if it specifically allow the given Id couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []). +is_valid_purge_client(Props) -> + Handle = couch_epi:get_handle(?SERVICE_ID), + %% callbacks return true only if it specifically allow the given Id + couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [Props], []). + on_compact(DbName, DDocs) -> Handle = couch_epi:get_handle(?SERVICE_ID), couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []). diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl index 6aef366..8c748f8 100644 --- a/src/couch/test/couch_bt_engine_upgrade_tests.erl +++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl @@ -37,10 +37,10 @@ setup() -> teardown({Ctx, Paths}) -> + test_util:stop_couch(Ctx), lists:foreach(fun(Path) -> file:delete(Path) - end, Paths), - test_util:stop_couch(Ctx). + end, Paths). upgrade_test_() -> diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl index 937f7c8..5d4a6ac 100644 --- a/src/couch_index/src/couch_index_plugin_couch_db.erl +++ b/src/couch_index/src/couch_index_plugin_couch_db.erl @@ -13,9 +13,14 @@ -module(couch_index_plugin_couch_db). -export([ + is_valid_purge_client/1, on_compact/2 ]). +is_valid_purge_client(Props) -> + couch_mrview_index:verify_index_exists(Props). + + on_compact(DbName, DDocs) -> couch_mrview_index:ensure_local_purge_docs(DbName, DDocs). diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 7756f52..8ab29b9 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -220,45 +220,30 @@ index_file_exists(State) -> verify_index_exists(Props) -> - ShardDbName = couch_mrview_util:get_value_from_options( - <<"dbname">>, - Props - ), - DDocId = couch_mrview_util:get_value_from_options( - <<"ddoc_id">>, - Props - ), - SigInLocal = couch_mrview_util:get_value_from_options( - <<"signature">>, - Props - ), - case couch_db:open_int(ShardDbName, []) of - {ok, Db} -> - try - DbName = mem3:dbname(couch_db:name(Db)), - case ddoc_cache:open(DbName, DDocId) of - {ok, DDoc} -> + try + Type = couch_util:get_value(<<"type">>, Props), + if Type =/= <<"mrview">> -> false; true -> + ShardDbName = couch_util:get_value(<<"dbname">>, Props), + DDocId = couch_util:get_value(<<"ddoc_id">>, Props), + couch_util:with_db(ShardDbName, fun(Db) -> + {ok, DesignDocs} = couch_db:get_design_docs(Db), + case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of + #full_doc_info{} = DDocInfo -> + {ok, DDoc} = couch_db:open_doc_int( + Db, DDocInfo, [ejson_body]), {ok, IdxState} = couch_mrview_util:ddoc_to_mrst( - ShardDbName, - DDoc - ), + ShardDbName, DDoc), IdxSig = IdxState#mrst.sig, + SigInLocal = couch_util:get_value( + <<"signature">>, Props), couch_index_util:hexsig(IdxSig) == SigInLocal; - _Else -> + false -> false end - catch E:T -> - Stack = erlang:get_stacktrace(), - couch_log:error( - "Error occurs when verifying existence of ~s/~s :: ~p ~p", - [ShardDbName, DDocId, {E, T}, Stack] - ), - false - after - catch couch_db:close(Db) - end; - _ -> - false + end) + end + catch _:_ -> + false end. @@ -305,8 +290,6 @@ update_local_purge_doc(Db, State, PSeq) -> {<<"type">>, <<"mrview">>}, {<<"purge_seq">>, PSeq}, {<<"updated_on">>, NowSecs}, - {<<"verify_module">>, <<"couch_mrview_index">>}, - {<<"verify_function">>, <<"verify_index_exists">>}, {<<"dbname">>, get(db_name, State)}, {<<"ddoc_id">>, get(idx_name, State)}, {<<"signature">>, Sig} diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index b274961..a9ae661 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -44,14 +44,13 @@ get_local_purge_doc_id(Sig) -> - Version = "v" ++ config:get("purge", "version", "1") ++ "-", - ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Version ++ Sig). + ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig). get_value_from_options(Key, Options) -> case couch_util:get_value(Key, Options) of undefined -> - Reason = binary_to_list(Key) ++ " must exist in Options.", + Reason = <<"'", Key/binary, "' must exists in options.">>, throw({bad_request, Reason}); Value -> Value end. diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl index 99bfa9e..6b0d4d9 100644 --- a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl +++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl @@ -31,7 +31,7 @@ setup() -> teardown(DbName) -> - meck:unload(couch_mrview_index), + meck:unload(), ok = fabric:delete_db(DbName, [?ADMIN_CTX]). diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl index d5bf9f6..a7fb34f 100644 --- a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl @@ -361,20 +361,19 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) -> couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit), _Result = run_query(Db1, []), - % purge 150 documents - PurgedDocsNum = 150, + % first purge 30 documents + PurgedDocsNum1 = 30, IdsRevs = lists:foldl(fun(Id, CIdRevs) -> Id1 = docid(Id), FDI1 = couch_db:get_full_doc_info(Db1, Id1), Rev1 = get_rev(FDI1), UUID1 = uuid(Id), [{UUID1, Id1, [Rev1]} | CIdRevs] - end, [], lists:seq(1, PurgedDocsNum)), + end, [], lists:seq(1, PurgedDocsNum1)), {ok, _} = couch_db:purge_docs(Db1, IdsRevs), - % run query again to reflect purge requests - % to mrview {ok, Db2} = couch_db:reopen(Db1), + % run query again to reflect purge request to mrview _Result1 = run_query(Db2, []), {ok, PurgedIdRevs} = couch_db:fold_purge_infos( Db2, @@ -383,9 +382,21 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) -> [], [] ), - ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)), + ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)), + + % then purge 120 documents + PurgedDocsNum2 = 150, + IdsRevs2 = lists:foldl(fun(Id, CIdRevs) -> + Id1 = docid(Id), + FDI1 = couch_db:get_full_doc_info(Db1, Id1), + Rev1 = get_rev(FDI1), + UUID1 = uuid(Id), + [{UUID1, Id1, [Rev1]} | CIdRevs] + end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)), + {ok, _} = couch_db:purge_docs(Db2, IdsRevs2), % run compaction to trigger pruning of purge tree + % only the first 30 purge requests are pruned {ok, Db3} = couch_db:open_int(DbName, []), {ok, _CompactPid} = couch_db:start_compact(Db3), wait_compaction(DbName, "database", ?LINE), @@ -401,7 +412,7 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) -> [], [] ), - ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)) + ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2)) end). diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl index 3d5edb1..b511e01 100644 --- a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl +++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl @@ -48,7 +48,7 @@ teardown_each(Db) -> cpse_bad_purge_seq(Db1) -> - Db2 = save_local_doc(Db1, <<"foo">>, ?MODULE, valid_fun), + Db2 = save_local_doc(Db1, <<"foo">>), ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), ok = couch_db:set_purge_infos_limit(Db2, 5), @@ -56,35 +56,8 @@ cpse_bad_purge_seq(Db1) -> ?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)). -cpse_bad_verify_mod(Db1) -> - Db2 = save_local_doc(Db1, 2, [invalid_module], valid_fun), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - -cpse_bad_verify_fun(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, [invalid_function]), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - -cpse_verify_fun_throws(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, throw_fun), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - cpse_verify_non_boolean(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, non_bool_fun), + Db2 = save_local_doc(Db1, 2), ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), ok = couch_db:set_purge_infos_limit(Db2, 5), @@ -92,30 +65,16 @@ cpse_verify_non_boolean(Db1) -> ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). -save_local_doc(Db1, PurgeSeq, Mod, Fun) -> +save_local_doc(Db1, PurgeSeq) -> {Mega, Secs, _} = os:timestamp(), NowSecs = Mega * 1000000 + Secs, Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[ - {<<"_id">>, <<"_local/purge-test-stuff-v1">>}, + {<<"_id">>, <<"_local/purge-test-stuff">>}, {<<"purge_seq">>, PurgeSeq}, {<<"timestamp_utc">>, NowSecs}, - {<<"verify_module">>, Mod}, - {<<"verify_function">>, Fun}, {<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}}, {<<"type">>, <<"test">>} ]}))), {ok, _} = couch_db:update_doc(Db1, Doc, []), {ok, Db2} = couch_db:reopen(Db1), Db2. - - -valid_fun(_Options) -> - true. - - -throw_fun(_Options) -> - throw(failed). - - -not_bool(_Options) -> - ok. diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl index ebcd596..4bf2bf5 100644 --- a/src/mem3/src/mem3_epi.erl +++ b/src/mem3/src/mem3_epi.erl @@ -30,7 +30,8 @@ app() -> providers() -> [ - {chttpd_handlers, mem3_httpd_handlers} + {couch_db, mem3_plugin_couch_db}, + {chttpd_handlers, mem3_httpd_handlers} ]. diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl similarity index 79% copy from src/couch_index/src/couch_index_plugin_couch_db.erl copy to src/mem3/src/mem3_plugin_couch_db.erl index 937f7c8..f19f5eb 100644 --- a/src/couch_index/src/couch_index_plugin_couch_db.erl +++ b/src/mem3/src/mem3_plugin_couch_db.erl @@ -10,12 +10,12 @@ % License for the specific language governing permissions and limitations under % the License. --module(couch_index_plugin_couch_db). +-module(mem3_plugin_couch_db). -export([ - on_compact/2 + is_valid_purge_client/1 ]). -on_compact(DbName, DDocs) -> - couch_mrview_index:ensure_local_purge_docs(DbName, DDocs). +is_valid_purge_client(Props) -> + mem3_rep:verify_purge_checkpoint(Props). diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 3f224cd..22a3f7a 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -122,32 +122,37 @@ make_local_id(SourceThing, TargetThing, Filter) -> make_purge_id(SourceUUID, TargetUUID) -> - Version = "v" ++ config:get("purge", "version", "1") ++ "-", - ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mem3-" ++ Version ++ - ?b2l(SourceUUID) ++ "-" ++ ?b2l(TargetUUID)). + <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>. verify_purge_checkpoint(Props) -> - DbName = couch_util:get_value(<<"dbname">>, Props), - SourceBin = couch_util:get_value(<<"source">>, Props), - TargetBin = couch_util:get_value(<<"target">>, Props), - Range = couch_util:get_value(<<"range">>, Props), - - Source = binary_to_existing_atom(SourceBin, latin1), - Target = binary_to_existing_atom(TargetBin, latin1), - try - Shards = mem3:shards(DbName), - Nodes = lists:foldl(fun(Shard, Acc) -> - case Shard#shard.range == Range of - true -> [Shard#shard.node | Acc]; - false -> Acc + Type = couch_util:get_value(<<"type">>, Props), + if Type =/= <<"internal_replication">> -> false; true -> + DbName = couch_util:get_value(<<"dbname">>, Props), + SourceBin = couch_util:get_value(<<"source">>, Props), + TargetBin = couch_util:get_value(<<"target">>, Props), + Range = couch_util:get_value(<<"range">>, Props), + + Source = binary_to_existing_atom(SourceBin, latin1), + Target = binary_to_existing_atom(TargetBin, latin1), + + try + Shards = mem3:shards(DbName), + Nodes = lists:foldl(fun(Shard, Acc) -> + case Shard#shard.range == Range of + true -> [Shard#shard.node | Acc]; + false -> Acc + end + end, [], mem3:shards(DbName)), + lists:member(Source, Nodes) andalso lists:member(Target, Nodes) + catch + error:database_does_not_exist -> + false end - end, [], mem3:shards(DbName)), - lists:member(Source, Nodes) andalso lists:member(Target, Nodes) - catch - error:database_does_not_exist -> - false + end + catch _:_ -> + false end. @@ -500,8 +505,6 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) -> {<<"type">>, <<"internal_replication">>}, {<<"updated_on">>, NowSecs}, {<<"purge_seq">>, PurgeSeq}, - {<<"verify_module">>, <<"mem3_rep">>}, - {<<"verify_function">>, <<"verify_purge_checkpoint">>}, {<<"dbname">>, Source#shard.dbname}, {<<"source">>, atom_to_binary(Source#shard.node, latin1)}, {<<"target">>, atom_to_binary(Target#shard.node, latin1)},