This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch replicator-skip-explicit-4xx-errors-only
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8d8354b0bf5dc3d655ce2e2d8dd3aba2eb231a83
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Jul 13 14:02:55 2023 -0400

    Crash replication jobs on unexpected 4xx errors
    
    Documents with attachments are written as individual HTTP PUT multipart
    requests. If a document PUT request fails with a 401, 403 or 413 error, the
    expectation is that the replicator would skip over that document, increment 
the
    `doc_write_failures` metric and continue replicating. That behavior is by
    design. However, previously, before this commit, documents were also 
skipped in
    case of any unknown 4xx error. This commit switches the default behavior 
such
    that unexpected 4xx errors do not continue. If the error is intermittent, 
the
    job will crash, retry, and would eventually succeed. If the error is
    persistent, exponential backoff will keep it from retrying indefinitely in a
    tight loop.
    
    Documents will be skipped only for 401/403/413 HTTP error and for 400 error
    code a reason related to attachment name validation failures.
    
    The second improvement in this commit is that http errors and error reasons
    bubble up though the API and are emitted as error logs. This should help 
users
    diagnose any issue quicker.
    
    Issue: https://github.com/apache/couchdb/issues/4676
---
 .../src/couch_replicator_api_wrap.erl              |  28 ++--
 .../src/couch_replicator_worker.erl                | 101 ++++++++----
 .../couch_replicator_error_reporting_tests.erl     | 183 +++++++++++++++++++--
 3 files changed, 254 insertions(+), 58 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl 
b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index a44a79da1..11cb22c6b 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -343,6 +343,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, 
Acc) ->
             throw(Stub);
         {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} 
->
             exit(max_backoff);
+        {'DOWN', Ref, process, Pid, {doc_write_failed, _} = Error} ->
+            exit(Error);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->
             NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
             case NewMaxLen < ?MIN_URL_LEN of
@@ -451,17 +453,23 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, 
Options, Type) ->
             (409, _, _) ->
                 throw(conflict);
             (Code, _, {Props}) ->
-                case {Code, get_value(<<"error">>, Props)} of
-                    {401, <<"unauthorized">>} ->
-                        throw({unauthorized, get_value(<<"reason">>, Props)});
-                    {403, <<"forbidden">>} ->
-                        throw({forbidden, get_value(<<"reason">>, Props)});
-                    {412, <<"missing_stub">>} ->
-                        throw({missing_stub, get_value(<<"reason">>, Props)});
-                    {413, _} ->
+                Error = get_value(<<"error">>, Props),
+                Reason = get_value(<<"reason">>, Props),
+                case {Code, Error, Reason} of
+                    {401, <<"unauthorized">>, _} ->
+                        throw({unauthorized, Reason});
+                    {403, <<"forbidden">>, _} ->
+                        throw({forbidden, Reason});
+                    {412, <<"missing_stub">>, _} ->
+                        throw({missing_stub, Reason});
+                    {413, _, _} ->
                         {error, request_body_too_large};
-                    {_, Error} ->
-                        {error, Error}
+                    {400, <<"bad_request">>, <<"Attachment name ", _/binary>>} 
->
+                        {error, {invalid_attachment_name, Reason}};
+                    {_, undefined, _} ->
+                        {error, {Code, Props}};
+                    {_, _, _} ->
+                        {error, {Error, Reason}}
                 end
         end
     ).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl 
b/src/couch_replicator/src/couch_replicator_worker.erl
index 46e4a6e94..4e09bee17 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -238,6 +238,8 @@ handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, 
State) ->
     {stop, {shutdown, Err}, State};
 handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
     {stop, {shutdown, Err}, State};
+handle_info({'EXIT', _Pid, {doc_write_failed, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
 handle_info({'EXIT', Pid, Reason}, State) ->
     {stop, {process_died, Pid, Reason}, State}.
 
@@ -583,44 +585,75 @@ handle_flush_docs_result({error, {bulk_docs_failed, _, _} 
= Err}, _, _) ->
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], 
?REPLICATED_CHANGES) of
-        {ok, _} ->
-            ok;
-        Error ->
-            couch_log:error(
-                "Replicator: error writing document `~s` to `~s`: ~s",
-                [Id, couch_replicator_api_wrap:db_uri(Target), 
couch_util:to_binary(Error)]
-            ),
-            Error
+        {ok, _} -> ok;
+        Error -> handle_doc_write_error(Error, Target, Id, Pos, RevId)
     catch
-        throw:{missing_stub, _} = MissingStub ->
-            throw(MissingStub);
         throw:{Error, Reason} ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`, reason: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Error),
-                    to_binary(Reason)
-                ]
-            ),
-            {error, Error};
-        throw:Err ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Err)
-                ]
-            ),
-            {error, Err}
+            handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId)
     end.
 
+% In most cases we fail the replication job by re-throwing the error.
+% The only exceptions are expected validation and VDU failures
+%  401 : unauthorized
+%  403 : forbidden
+%  413 : request_body_too_large
+%  400 : bad_request where reason starts with "Attachment name "
+%
+handle_doc_write_error({missing_stub, _} = MissingStub, _, _, _, _) ->
+    throw(MissingStub);
+handle_doc_write_error({error, request_body_too_large} = Error, Target, Id, _, 
_) ->
+    couch_log:error(
+        "Replicator: skipping writing document `~s` to `~s` : 
request_body_too_large.",
+        [Id, couch_replicator_api_wrap:db_uri(Target)]
+    ),
+    Error;
+handle_doc_write_error({error, {invalid_attachment_name, Reason}} = Error, 
Target, Id, _, _) ->
+    couch_log:error(
+        "Replicator: skipping writing document `~s` to `~s` : 
invalid_attachment_name ~s",
+        [Id, couch_replicator_api_wrap:db_uri(Target), Reason]
+    ),
+    Error;
+handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) when
+    Error == unauthorized orelse Error == forbidden
+->
+    couch_log:error(
+        "Replicator: skipping writing document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    {error, Error};
+handle_doc_write_error({error, {Error, Reason}}, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writting document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    exit({doc_write_failed, {Error, Reason}});
+handle_doc_write_error(Error, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writting document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error)
+        ]
+    ),
+    exit({doc_write_failed, Error}).
+
 find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) ->
     {IdRevs, AllCount} = lists:foldr(
         fun
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index d9c6a1048..636a42ee6 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -24,6 +24,13 @@ error_reporting_test_() ->
         [
             ?TDEF_FE(t_fail_bulk_docs),
             ?TDEF_FE(t_fail_changes_reader),
+            ?TDEF_FE(t_fail_doc_put_4xx_well_formed_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_unexpected_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_invalid_json_error),
+            ?TDEF_FE(t_skip_doc_put_401_errors),
+            ?TDEF_FE(t_skip_doc_put_403_errors),
+            ?TDEF_FE(t_skip_doc_put_413_errors),
+            ?TDEF_FE(t_skip_doc_put_invalid_attachment_name),
             ?TDEF_FE(t_fail_revs_diff),
             ?TDEF_FE(t_fail_bulk_get, 15),
             ?TDEF_FE(t_fail_changes_queue),
@@ -41,7 +48,7 @@ t_fail_bulk_docs({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -55,7 +62,7 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -63,13 +70,139 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
 
     couch_replicator_notifier:stop(Listener).
 
+t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"a\":\"b\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertMatch({doc_write_failed, {invalid_json, _}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "401", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "403", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>],
+    mock_fail_req(put, "/6", {ok, "413", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_invalid_attachment_name({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [
+        <<"{\"error\":\"bad_request\", \"reason\":\"Attachment name '_foo' 
starts with prohibited character '_'\"}">>
+    ],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
 t_fail_revs_diff({_Ctx, {Source, Target}}) ->
     populate_db(Source, 1, 5),
     {ok, RepId} = replicate(Source, Target),
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -87,7 +220,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     % Tolerate a 500 error
-    mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "501", [], 
[<<"not_implemented">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 6, 6),
     wait_target_in_sync(Source, Target),
@@ -95,7 +228,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 
6)),
 
     % Tolerate a 400 error
-    mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 7, 7),
     wait_target_in_sync(Source, Target),
@@ -157,7 +290,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
     meck:new(couch_replicator_pg, [passthrough]),
     Pid = pid_from_another_node(),
     meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
-    Rep = make_rep(Source, Target),
+    Rep = make_rep(Source, Target, true),
     ExpectErr = {error, {already_started, Pid}},
     ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
 
@@ -205,16 +338,19 @@ pid_from_another_node() ->
     ?assertEqual('A@1', node(Pid)),
     Pid.
 
-mock_fail_req(Path, Return) ->
+mock_fail_req(Method, Path, Return) ->
     meck:expect(
         ibrowse,
         send_req_direct,
         fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
             Args = [W, Url, Headers, Meth, Body, Opts, TOut],
             #{path := UPath} = uri_string:parse(Url),
-            case lists:suffix(Path, UPath) of
-                true -> Return;
-                false -> meck:passthrough(Args)
+            case {lists:suffix(Path, UPath), Method == Meth} of
+                {true, true} ->
+                    _ = meck:passthrough(Args),
+                    Return;
+                {_, _} ->
+                    meck:passthrough(Args)
             end
         end
     ).
@@ -237,10 +373,18 @@ wait_rep_result(RepId) ->
     end.
 
 populate_db(DbName, Start, End) ->
+    populate_db(DbName, Start, End, false).
+
+populate_db(DbName, Start, End, WithAttachments) ->
     Docs = lists:foldl(
         fun(DocIdCounter, Acc) ->
             Id = integer_to_binary(DocIdCounter),
-            Doc = #doc{id = Id, body = {[]}},
+            Atts =
+                case WithAttachments of
+                    true -> [att(<<"att1">>, 1024, <<"app/binary">>)];
+                    false -> []
+                end,
+            Doc = #doc{id = Id, body = {[]}, atts = Atts},
             [Doc | Acc]
         end,
         [],
@@ -248,6 +392,14 @@ populate_db(DbName, Start, End) ->
     ),
     {ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
 
+att(Name, Size, Type) ->
+    couch_att:new([
+        {name, Name},
+        {type, Type},
+        {att_len, Size},
+        {data, fun(Count) -> crypto:strong_rand_bytes(Count) end}
+    ]).
+
 wait_target_in_sync(Source, Target) ->
     {ok, SourceDocCount} = fabric:get_doc_count(Source),
     wait_target_in_sync_loop(SourceDocCount, Target, 300).
@@ -271,17 +423,20 @@ wait_target_in_sync_loop(DocCount, TargetName, 
RetriesLeft) ->
     end.
 
 replicate(Source, Target) ->
-    Rep = make_rep(Source, Target),
+    replicate(Source, Target, true).
+
+replicate(Source, Target, Continuous) ->
+    Rep = make_rep(Source, Target, Continuous),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
 
-make_rep(Source, Target) ->
+make_rep(Source, Target, Continuous) ->
     RepObject =
         {[
             {<<"source">>, url(Source)},
             {<<"target">>, url(Target)},
-            {<<"continuous">>, true},
+            {<<"continuous">>, Continuous},
             {<<"worker_processes">>, 1},
             {<<"retries_per_request">>, 1},
             % Low connection timeout so _changes feed gets restarted quicker

Reply via email to