This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch replicator-skip-explicit-4xx-errors-only in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 4bcca6bc8c5983ae38fdd78a239eac69282e9f0e Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Jul 13 14:02:55 2023 -0400 Improve replicator handling of 4xx error on individual doc PUTs. Documents with attachments are written as individual HTTP PUT multipart requests. If a document PUT request fails with a 401, 403 or 413 error, the expectation is that the replicator would skip over that document, increment the `doc_write_failures` metric and continue replicating and checkpointing. That behavior is by design. However, previously, before this commit, documents were also skipped in case of any unknown 4xx error (except a 408 which was treated as a timeout and retried). That behavior is a bit unexpected as it's unclear what the reson for the 4xx error might be. Issue: https://github.com/apache/couchdb/issues/4676 --- .../src/couch_replicator_api_wrap.erl | 18 ++- .../src/couch_replicator_worker.erl | 94 +++++++----- .../couch_replicator_error_reporting_tests.erl | 160 +++++++++++++++++++-- 3 files changed, 218 insertions(+), 54 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index a44a79da1..4a2057773 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -343,6 +343,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> throw(Stub); {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} -> exit(max_backoff); + {'DOWN', Ref, process, Pid, {doc_write_failed, _} = Error} -> + exit(Error); {'DOWN', Ref, process, Pid, request_uri_too_long} -> NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2, case NewMaxLen < ?MIN_URL_LEN of @@ -451,17 +453,21 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> (409, _, _) -> throw(conflict); (Code, _, {Props}) -> - case {Code, get_value(<<"error">>, Props)} of + Error = get_value(<<"error">>, Props), + Reason = get_value(<<"reason">>, Props), + case {Code, Error} of {401, <<"unauthorized">>} -> - throw({unauthorized, get_value(<<"reason">>, Props)}); + throw({unauthorized, Reason}); {403, <<"forbidden">>} -> - throw({forbidden, get_value(<<"reason">>, Props)}); + throw({forbidden, Reason}); {412, <<"missing_stub">>} -> - throw({missing_stub, get_value(<<"reason">>, Props)}); + throw({missing_stub, Reason}); {413, _} -> {error, request_body_too_large}; - {_, Error} -> - {error, Error} + {_, undefined} -> + {error, {Code, Props}}; + {_, _} -> + {error, {Error, Reason}} end end ). diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 46e4a6e94..ab56c34af 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -238,6 +238,8 @@ handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; +handle_info({'EXIT', _Pid, {doc_write_failed, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -583,44 +585,68 @@ handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) -> flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> try couch_replicator_api_wrap:update_doc(Target, Doc, [], ?REPLICATED_CHANGES) of - {ok, _} -> - ok; - Error -> - couch_log:error( - "Replicator: error writing document `~s` to `~s`: ~s", - [Id, couch_replicator_api_wrap:db_uri(Target), couch_util:to_binary(Error)] - ), - Error + {ok, _} -> ok; + Error -> handle_doc_write_error(Error, Target, Id, Pos, RevId) catch - throw:{missing_stub, _} = MissingStub -> - throw(MissingStub); throw:{Error, Reason} -> - couch_log:error( - "Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`, reason: `~s`.", - [ - Id, - couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), - to_binary(Error), - to_binary(Reason) - ] - ), - {error, Error}; - throw:Err -> - couch_log:error( - "Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`.", - [ - Id, - couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), - to_binary(Err) - ] - ), - {error, Err} + handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) end. +% In most cases we fail the replication job by re-throwing the error. +% The only exceptions are +% 401 : unauthorized +% 403 : forbidden +% 413 : request_body_too_large +% +handle_doc_write_error({missing_stub, _} = MissingStub, _, _, _, _) -> + throw(MissingStub); +handle_doc_write_error({error, request_body_too_large} = Error, Target, Id, _, _) -> + couch_log:error( + "Replicator: skipping writing document `~s` to `~s` : request_body_too_large.", + [Id, couch_replicator_api_wrap:db_uri(Target)] + ), + Error; +handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) when + Error == unauthorized orelse Error == forbidden +-> + couch_log:error( + "Replicator: skipping writing document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error), + to_binary(Reason) + ] + ), + {error, Error}; +handle_doc_write_error({error, {Error, Reason}}, Target, Id, Pos, RevId) -> + couch_log:error( + "Replicator: error writting document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error), + to_binary(Reason) + ] + ), + exit({doc_write_failed, {Error, Reason}}); +handle_doc_write_error(Error, Target, Id, Pos, RevId) -> + couch_log:error( + "Replicator: error writting document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error) + ] + ), + exit({doc_write_failed, Error}). + find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) -> {IdRevs, AllCount} = lists:foldr( fun diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index d9c6a1048..d71cdda4d 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -24,6 +24,12 @@ error_reporting_test_() -> [ ?TDEF_FE(t_fail_bulk_docs), ?TDEF_FE(t_fail_changes_reader), + ?TDEF_FE(t_fail_doc_put_4xx_well_formed_json_error), + ?TDEF_FE(t_fail_doc_put_4xx_unexpected_json_error), + ?TDEF_FE(t_fail_doc_put_4xx_invalid_json_error), + ?TDEF_FE(t_skip_doc_put_401_errors), + ?TDEF_FE(t_skip_doc_put_403_errors), + ?TDEF_FE(t_skip_doc_put_413_errors), ?TDEF_FE(t_fail_revs_diff), ?TDEF_FE(t_fail_bulk_get, 15), ?TDEF_FE(t_fail_changes_queue), @@ -41,7 +47,7 @@ t_fail_bulk_docs({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -55,7 +61,7 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -63,13 +69,117 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) -> couch_replicator_notifier:stop(Listener). +t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>], + mock_fail_req(put, "/6", {ok, "400", [], ErrBody}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + ErrBody = [<<"{\"a\":\"b\"}">>], + mock_fail_req(put, "/6", {ok, "400", [], ErrBody}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertMatch({doc_write_failed, {invalid_json, _}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>], + mock_fail_req(put, "/6", {ok, "401", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>], + mock_fail_req(put, "/6", {ok, "403", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>], + mock_fail_req(put, "/6", {ok, "413", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + t_fail_revs_diff({_Ctx, {Source, Target}}) -> populate_db(Source, 1, 5), {ok, RepId} = replicate(Source, Target), wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -87,7 +197,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), % Tolerate a 500 error - mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}), + mock_fail_req(post, "/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}), meck:reset(couch_replicator_api_wrap), populate_db(Source, 6, 6), wait_target_in_sync(Source, Target), @@ -95,7 +205,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) -> ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)), % Tolerate a 400 error - mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), meck:reset(couch_replicator_api_wrap), populate_db(Source, 7, 7), wait_target_in_sync(Source, Target), @@ -157,7 +267,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) -> meck:new(couch_replicator_pg, [passthrough]), Pid = pid_from_another_node(), meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end), - Rep = make_rep(Source, Target), + Rep = make_rep(Source, Target, true), ExpectErr = {error, {already_started, Pid}}, ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)). @@ -205,16 +315,19 @@ pid_from_another_node() -> ?assertEqual('A@1', node(Pid)), Pid. -mock_fail_req(Path, Return) -> +mock_fail_req(Method, Path, Return) -> meck:expect( ibrowse, send_req_direct, fun(W, Url, Headers, Meth, Body, Opts, TOut) -> Args = [W, Url, Headers, Meth, Body, Opts, TOut], #{path := UPath} = uri_string:parse(Url), - case lists:suffix(Path, UPath) of - true -> Return; - false -> meck:passthrough(Args) + case {lists:suffix(Path, UPath), Method == Meth} of + {true, true} -> + _ = meck:passthrough(Args), + Return; + {_, _} -> + meck:passthrough(Args) end end ). @@ -237,10 +350,18 @@ wait_rep_result(RepId) -> end. populate_db(DbName, Start, End) -> + populate_db(DbName, Start, End, false). + +populate_db(DbName, Start, End, WithAttachments) -> Docs = lists:foldl( fun(DocIdCounter, Acc) -> Id = integer_to_binary(DocIdCounter), - Doc = #doc{id = Id, body = {[]}}, + Atts = + case WithAttachments of + true -> [att(<<"att1">>, 1024, <<"app/binary">>)]; + false -> [] + end, + Doc = #doc{id = Id, body = {[]}, atts = Atts}, [Doc | Acc] end, [], @@ -248,6 +369,14 @@ populate_db(DbName, Start, End) -> ), {ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). +att(Name, Size, Type) -> + couch_att:new([ + {name, Name}, + {type, Type}, + {att_len, Size}, + {data, fun(Count) -> crypto:strong_rand_bytes(Count) end} + ]). + wait_target_in_sync(Source, Target) -> {ok, SourceDocCount} = fabric:get_doc_count(Source), wait_target_in_sync_loop(SourceDocCount, Target, 300). @@ -271,17 +400,20 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> end. replicate(Source, Target) -> - Rep = make_rep(Source, Target), + replicate(Source, Target, true). + +replicate(Source, Target, Continuous) -> + Rep = make_rep(Source, Target, Continuous), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), {ok, Rep#rep.id}. -make_rep(Source, Target) -> +make_rep(Source, Target, Continuous) -> RepObject = {[ {<<"source">>, url(Source)}, {<<"target">>, url(Target)}, - {<<"continuous">>, true}, + {<<"continuous">>, Continuous}, {<<"worker_processes">>, 1}, {<<"retries_per_request">>, 1}, % Low connection timeout so _changes feed gets restarted quicker
