This is an automated email from the ASF dual-hosted git repository.

nickva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 425d8aa9e Fix append_update_replies case clause in fabric_doc_update
425d8aa9e is described below

commit 425d8aa9eb2c95c68da0ea805767a916ebd93c51
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Apr 30 00:06:07 2026 -0400

    Fix append_update_replies case clause in fabric_doc_update
    
    In `serialize_worker_startup=false` mode it was possible before to get a
    function_clause error in `fabric_doc_update:append_update_replies/3`. The
    function_clause was because of an empty `Docs` list and a non-empty 
`Replies`
    list. That happened when we had at least two conflicted replies for a doc 
one
    arriving, and others in-flight, the arriving one would knock out the doc 
from
    the expected replies list, such that when the second reply arrived, it 
wouldn't
    match up with anything.
    
    To fix this, keep track of conflicts separately and check that conflicted 
list
    before extra workers spawn, and also avoid altering the grouped docs 
structure
    for in-flight requests
    
    Also fix another regression, and that's in case of sws=false to keep going 
and
    perform the quorum the way we did before sws feature. Previously, we only
    spawned the workers in parallel but still short-cut the quorum on a 
conflict.
---
 src/fabric/src/fabric_doc_update.erl | 310 ++++++++++++++++++++++++++---------
 1 file changed, 232 insertions(+), 78 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 89fb9618d..96f7c1b11 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -25,7 +25,9 @@
     reply,
     dbname,
     update_options = [],
-    started = []
+    started = [],
+    conflicts = [],
+    serialize_worker_startup = true
 }).
 
 go(_, [], _) ->
@@ -50,7 +52,8 @@ go(DbName, AllDocs0, Opts) ->
         grouped_docs = GroupedDocs,
         reply = dict:new(),
         dbname = DbName,
-        update_options = Options
+        update_options = Options,
+        serialize_worker_startup = serialize_worker_startup(Options)
     },
     Timeout = fabric_util:request_timeout(),
     Acc1 = start_workers_strategy(Acc0),
@@ -60,12 +63,17 @@ go(DbName, AllDocs0, Opts) ->
         ->
             ensure_all_responses(Health, AllDocs, Results);
         {timeout, Acc} ->
-            #acc{w = W1, grouped_docs = GroupedDocs1, reply = DocReplDict} = 
Acc,
+            #acc{
+                w = W1,
+                grouped_docs = GroupedDocs1,
+                reply = DocReplDict,
+                serialize_worker_startup = SWS
+            } = Acc,
             {DefunctWorkers, _} = lists:unzip(GroupedDocs1),
             fabric_util:log_timeout(DefunctWorkers, "update_docs"),
-            {Health, _, Resp} = dict:fold(
+            {Health, _, _, Resp} = dict:fold(
                 fun force_reply/3,
-                {ok, W1, []},
+                {ok, W1, SWS, []},
                 DocReplDict
             ),
             ensure_all_responses(Health, AllDocs, Resp);
@@ -103,38 +111,41 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         doc_count = DocCount,
         w = W,
         grouped_docs = GroupedDocs,
-        reply = DocReplyDict0
+        reply = DocReplyDict0,
+        conflicts = Conflicts0,
+        serialize_worker_startup = SWS
     } = Acc0,
-    {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
-    NewGrpDocs = remove_conflicted_docs(Docs, Replies, NewGrpDocs0),
+    {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
+    Conflicts = collect_conflicts(Docs, Replies, Conflicts0, SWS),
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+    Acc1 = Acc0#acc{conflicts = Conflicts},
     case {WaitingCount, dict:size(DocReplyDict)} of
         {1, _} ->
             % last message has arrived, we need to conclude things
-            {Health, W, Reply} = dict:fold(
+            {Health, W, _, Reply} = dict:fold(
                 fun force_reply/3,
-                {ok, W, []},
+                {ok, W, SWS, []},
                 DocReplyDict
             ),
-            start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}),
+            start_remaining_workers(Acc1#acc{grouped_docs = NewGrpDocs}),
             {stop, {Health, Reply}};
         {_, DocCount} ->
             % we've got at least one reply for each document, let's take a look
-            case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
+            case dict:fold(fun maybe_reply/3, {stop, W, SWS, []}, 
DocReplyDict) of
                 continue ->
                     {ok,
-                        start_workers(Acc0#acc{
+                        start_workers(Acc1#acc{
                             waiting_count = WaitingCount - 1,
                             grouped_docs = NewGrpDocs,
                             reply = DocReplyDict
                         })};
-                {stop, W, FinalReplies} ->
-                    start_remaining_workers(Acc0#acc{grouped_docs = 
NewGrpDocs}),
+                {stop, W, _, FinalReplies} ->
+                    start_remaining_workers(Acc1#acc{grouped_docs = 
NewGrpDocs}),
                     {stop, {ok, FinalReplies}}
             end;
         _ ->
             {ok,
-                start_workers(Acc0#acc{
+                start_workers(Acc1#acc{
                     waiting_count = WaitingCount - 1,
                     grouped_docs = NewGrpDocs,
                     reply = DocReplyDict
@@ -198,17 +209,17 @@ untag_docs([#doc{} = Doc | Rest]) ->
 untag_doc(#doc{} = Doc) ->
     Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}.
 
-force_reply(Doc, [], {_, W, Acc}) ->
-    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
-force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
+force_reply(Doc, [], {_, W, SWS, Acc}) ->
+    {error, W, SWS, [{Doc, {error, internal_server_error}} | Acc]};
+force_reply(Doc, [FirstReply | _] = Replies, {Health, W, SWS, Acc}) ->
+    case update_quorum_met(W, Replies, SWS) of
         {true, Reply} ->
             % corner case new_edits:false and vdu: [noreply, forbidden, 
noreply]
             case check_forbidden_msg(Replies) of
                 {forbidden, ForbiddenReply} ->
-                    {Health, W, [{Doc, ForbiddenReply} | Acc]};
+                    {Health, W, SWS, [{Doc, ForbiddenReply} | Acc]};
                 false ->
-                    {Health, W, [{Doc, Reply} | Acc]}
+                    {Health, W, SWS, [{Doc, Reply} | Acc]}
             end;
         false ->
             case [Reply || {ok, Reply} <- Replies] of
@@ -218,15 +229,15 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, 
Acc}) ->
                         true ->
                             CounterKey = [fabric, doc_update, errors],
                             couch_stats:increment_counter(CounterKey),
-                            {Health, W, [{Doc, FirstReply} | Acc]};
+                            {Health, W, SWS, [{Doc, FirstReply} | Acc]};
                         false ->
                             CounterKey = [fabric, doc_update, 
mismatched_errors],
                             couch_stats:increment_counter(CounterKey),
                             case check_forbidden_msg(Replies) of
                                 {forbidden, ForbiddenReply} ->
-                                    {Health, W, [{Doc, ForbiddenReply} | Acc]};
+                                    {Health, W, SWS, [{Doc, ForbiddenReply} | 
Acc]};
                                 false ->
-                                    {error, W, [{Doc, FirstReply} | Acc]}
+                                    {error, W, SWS, [{Doc, FirstReply} | Acc]}
                             end
                     end;
                 [AcceptedRev | _] ->
@@ -237,17 +248,17 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, 
Acc}) ->
                             ok -> accepted;
                             _ -> Health
                         end,
-                    {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]}
+                    {NewHealth, W, SWS, [{Doc, {accepted, AcceptedRev}} | Acc]}
             end
     end.
 
 maybe_reply(_, _, continue) ->
     % we didn't meet quorum for all docs, so we're fast-forwarding the fold
     continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
+maybe_reply(Doc, Replies, {stop, W, SWS, Acc}) ->
+    case update_quorum_met(W, Replies, SWS) of
         {true, Reply} ->
-            {stop, W, [{Doc, Reply} | Acc]};
+            {stop, W, SWS, [{Doc, Reply} | Acc]};
         false ->
             continue
     end.
@@ -296,35 +307,34 @@ check_forbidden_msg(Replies) ->
             end
     end.
 
-update_quorum_met(W, Replies) ->
+update_quorum_met(W, Replies, SWS) ->
     Counters = lists:foldl(
         fun(R, D) -> orddict:update_counter(R, 1, D) end,
         orddict:new(),
         Replies
     ),
-    GoodReplies = lists:filter(fun good_reply/1, Counters),
-    case
-        lists:dropwhile(
-            fun
-                ({conflict, _}) -> false;
-                ({_, Count}) -> Count < W
-            end,
-            GoodReplies
-        )
-    of
+    GoodReplies = lists:filter(fun(C) -> good_reply(C, SWS) end, Counters),
+    case lists:dropwhile(quorum_pred(W, SWS), GoodReplies) of
         [] ->
             false;
         [{FinalReply, _} | _] ->
             {true, FinalReply}
     end.
 
-good_reply({{ok, _}, _}) ->
-    true;
-good_reply({noreply, _}) ->
+% With a conflict in sws we stop the quorum early
+quorum_pred(W, SWS) when is_boolean(SWS) ->
+    fun
+        ({conflict, _}) when SWS -> false;
+        ({_, Count}) -> Count < W
+    end.
+
+good_reply({{ok, _}, _}, _) ->
     true;
-good_reply({conflict, _}) ->
+good_reply({noreply, _}, _) ->
     true;
-good_reply(_) ->
+good_reply({conflict, _}, SWS) ->
+    SWS;
+good_reply(_, _) ->
     false.
 
 -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
@@ -360,8 +370,9 @@ append_update_replies([Doc | Rest], [], Dict0) ->
 append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
     append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
 
-skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) ->
-    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, 
DocReplyDict),
+skip_message(#acc{waiting_count = 0} = Acc) ->
+    #acc{w = W, reply = DocReplyDict, serialize_worker_startup = SWS} = Acc,
+    {Health, W, _, Reply} = dict:fold(fun force_reply/3, {ok, W, SWS, []}, 
DocReplyDict),
     {stop, {Health, Reply}};
 skip_message(#acc{} = Acc0) ->
     {ok, Acc0}.
@@ -385,23 +396,17 @@ validate_atomic_update(_DbName, AllDocs, true) ->
     ),
     throw({aborted, PreCommitFailures}).
 
-%% serialize worker startup for interactive edits unless disabled in config.
-start_workers_strategy(#acc{} = Acc) ->
-    SerializeWorkerStartup = config:get_boolean("fabric", 
"serialize_worker_startup", true),
-    case {update_type(Acc), SerializeWorkerStartup} of
-        {?REPLICATED_CHANGES, _} ->
-            start_remaining_workers(Acc);
-        {_, true} ->
-            start_workers(Acc);
-        _ ->
-            start_remaining_workers(Acc)
+% replicated changes are always in parallel
+serialize_worker_startup(Options) ->
+    case proplists:get_value(?REPLICATED_CHANGES, Options) of
+        true -> false;
+        _ -> config:get_boolean("fabric", "serialize_worker_startup", true)
     end.
 
-update_type(#acc{} = Acc) ->
-    case proplists:get_value(?REPLICATED_CHANGES, Acc#acc.update_options) of
-        true -> ?REPLICATED_CHANGES;
-        _ -> ?INTERACTIVE_EDIT
-    end.
+start_workers_strategy(#acc{serialize_worker_startup = true} = Acc) ->
+    start_workers(Acc);
+start_workers_strategy(#acc{serialize_worker_startup = false} = Acc) ->
+    start_remaining_workers(Acc).
 
 % Start one worker per range per invocation of this function
 start_workers(#acc{} = Acc) ->
@@ -427,33 +432,43 @@ start_remaining_workers(#acc{} = Acc) ->
         Acc#acc.grouped_docs
     ).
 
-start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when 
is_reference(Ref) ->
+start_worker(#shard{ref = Ref} = Worker, Docs0, #acc{} = Acc) when 
is_reference(Ref) ->
     #shard{name = Name, node = Node} = Worker,
-    #acc{update_options = UpdateOptions} = Acc,
+    #acc{update_options = UpdateOptions, conflicts = Conflicts} = Acc,
     case lists:member(Ref, Acc#acc.started) of
         true ->
             Acc;
         false ->
+            % If a doc is a settled conflict for a range, skip it
+            Docs = filter_conflicts(Docs0, Conflicts),
             Ref = rexi:cast_ref(
                 Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), 
UpdateOptions]}
             ),
-            Acc#acc{started = [Ref | Acc#acc.started]}
+            % We need to save what we just cast so we can match up results 
exactly
+            NewGrouped = lists:keystore(Worker, 1, Acc#acc.grouped_docs, 
{Worker, Docs}),
+            Acc#acc{
+                started = [Ref | Acc#acc.started],
+                grouped_docs = NewGrouped
+            }
     end;
 start_worker(#shard{}, _Docs, #acc{} = Acc) ->
     %% for unit tests below.
     Acc.
 
-%% Remove all remaining doc update attempts if a conflict occurred
-remove_conflicted_docs(Docs, Replies, GroupedDocs) when length(Docs) == 
length(Replies) ->
-    ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)],
-    UntaggedConflictDocs = untag_docs(ConflictDocs),
-    [
-        {W, [D || D <- Ds, not lists:member(untag_doc(D), 
UntaggedConflictDocs)]}
-     || {W, Ds} <- GroupedDocs
-    ];
-remove_conflicted_docs(_Docs, _Replies, GroupedDocs) ->
-    %% replies can be shorter for replicated changes as only errors show up in 
the result
-    GroupedDocs.
+% With sws=false conflicts act like normal values and conflicts list stays
+% empty (filter_conflicts/2 is a no-op)
+collect_conflicts(_Docs, _Replies, Conflicts, false) ->
+    Conflicts;
+collect_conflicts(Docs, Replies, Conflicts, true) when length(Docs) == 
length(Replies) ->
+    [untag_doc(D) || {D, conflict} <- lists:zip(Docs, Replies)] ++ Conflicts;
+collect_conflicts(_Docs, _Replies, Conflicts, true) ->
+    % Replicated changes return no replies by default
+    Conflicts.
+
+filter_conflicts(Docs, []) ->
+    Docs;
+filter_conflicts(Docs, Conflicts) ->
+    [D || D <- Docs, not lists:member(untag_doc(D), Conflicts)].
 
 -ifdef(TEST).
 
@@ -462,7 +477,9 @@ remove_conflicted_docs(_Docs, _Replies, GroupedDocs) ->
 setup_all() ->
     meck:new([couch_log, couch_stats]),
     meck:expect(couch_log, warning, fun(_, _) -> ok end),
-    meck:expect(couch_stats, increment_counter, fun(_) -> ok end).
+    meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
+    meck:new(rexi, [passthrough]),
+    meck:expect(rexi, cast_ref, fun(Ref, _Node, _Msg) -> Ref end).
 
 teardown_all(_) ->
     meck:unload().
@@ -487,7 +504,12 @@ doc_update_test_() ->
             fun one_error_two_forbid/0,
             fun one_success_two_forbid/0,
             fun worker_before_doc_update_forbidden/0,
-            fun handle_bad_request/0
+            fun handle_bad_request/0,
+            fun filter_conflicts_drops_seen_docs/0,
+            fun parallel_in_flight_after_conflict/0,
+            fun serial_filters_conflicts_at_cast/0,
+            fun sws_false_mode_conflict_not_final/0,
+            fun sws_false_mode_ok_can_outvote_conflict/0
         ]
     }.
 
@@ -990,6 +1012,138 @@ handle_bad_request() ->
         handle_message({bad_request, err, reason}, hd(Shards), Acc)
     ).
 
+filter_conflicts_drops_seen_docs() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]),
+    ?assertEqual([Doc1, Doc2], filter_conflicts([Doc1, Doc2], [])),
+    ?assertEqual([Doc2], filter_conflicts([Doc1, Doc2], [Doc1])),
+    ?assertEqual([], filter_conflicts([Doc1, Doc2], [Doc1, Doc2])),
+    % Match against untagged
+    ?assertEqual([Tagged2], filter_conflicts([Tagged1, Tagged2], [Doc1])).
+
+% In flight parallel mode, conflict reply from one worker shouldn't touch
+% another in-flight workers's grouped_docs entry, otherwise we wouldn't be able
+% to match replies exactly
+parallel_in_flight_after_conflict() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    Docs = [Doc1, Doc2],
+    [S1, S2 | _] =
+        Shards =
+        mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", 
"node3"]),
+    GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
+    Acc0 = #acc{
+        waiting_count = length(Shards),
+        doc_count = length(Docs),
+        w = 2,
+        grouped_docs = GroupedDocs,
+        reply = dict:from_list([{D, []} || D <- Docs])
+    },
+
+    % S1 returns Doc1=conflict / Doc2=ok. Doc1 should be added to conflicts. S2
+    % entry should be left alone
+    {ok, #acc{conflicts = [Doc1]} = Acc1} =
+        handle_message({ok, [conflict, {ok, Doc2}]}, S1, Acc0),
+    {S2, S2Docs} = lists:keyfind(S2, 1, Acc1#acc.grouped_docs),
+    ?assertEqual([Doc1, Doc2], S2Docs),
+
+    %% S2 reply comes back, and we want lengths to match exactly
+    {stop, {ok, Reply}} =
+        handle_message({ok, [conflict, {ok, Doc2}]}, S2, Acc1),
+    ?assertEqual(
+        lists:sort([{Doc1, conflict}, {Doc2, {ok, Doc2}}]),
+        lists:sort(Reply)
+    ).
+
+% In serial mode, a worker started after a peer's conflict response must be
+% cast with conflicted doc filtred out and its grouped_docs updated to that
+% filtered list. We want cast and reply pairs to always stay in sync
+serial_filters_conflicts_at_cast() ->
+    Doc1 = #doc{id = <<"a">>, revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{id = <<"b">>, revs = {1, [<<"bar">>]}},
+    [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]),
+    Ref = make_ref(),
+    Worker = #shard{
+        node = node1,
+        name = <<"db/shard">>,
+        ref = Ref,
+        range = [0, 100]
+    },
+    Acc0 = #acc{
+        conflicts = [Doc1],
+        grouped_docs = [{Worker, [Tagged1, Tagged2]}],
+        update_options = []
+    },
+
+    Self = self(),
+    meck:expect(rexi, cast_ref, fun(R, _Node, Msg) ->
+        Self ! {cast, R, Msg},
+        R
+    end),
+    Acc1 = start_worker(Worker, [Tagged1, Tagged2], Acc0),
+    receive
+        {cast, Ref, {fabric_rpc, update_docs, [<<"db/shard">>, CastDocs, []]}} 
->
+            ?assertEqual([Doc2], CastDocs)
+    end,
+
+    ?assert(lists:member(Ref, Acc1#acc.started)),
+    {Worker, Stored} = lists:keyfind(Worker, 1, Acc1#acc.grouped_docs),
+    ?assertEqual([Tagged2], Stored).
+
+sws_false_mode_conflict_not_final() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    Docs = [Doc1, Doc2],
+    [S1, S2 | _] =
+        Shards =
+        mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", 
"node3"]),
+    GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
+    Acc0 = #acc{
+        waiting_count = length(Shards),
+        doc_count = length(Docs),
+        w = 2,
+        grouped_docs = GroupedDocs,
+        reply = dict:from_list([{D, []} || D <- Docs]),
+        serialize_worker_startup = false
+    },
+
+    % S1 conflict on Doc1, ok on Doc2. Conflicts list should be []
+    % Doc1 has no good replies and Doc2 has an ok (1 < W=2) and we
+    % should be continuing (not stopping).
+    {ok, #acc{conflicts = [], waiting_count = 2} = Acc1} =
+        handle_message({ok, [conflict, {ok, Doc2}]}, S1, Acc0),
+
+    % S2 reports the same. Doc1 still has no good replies, quorum
+    % not met, so we keep waiting
+    {ok, _Acc2} = handle_message({ok, [conflict, {ok, Doc2}]}, S2, Acc1).
+
+sws_false_mode_ok_can_outvote_conflict() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    Docs = [Doc1, Doc2],
+    [S1, S2, S3] =
+        Shards =
+        mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", 
"node3"]),
+    GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
+    Acc0 = #acc{
+        waiting_count = length(Shards),
+        doc_count = length(Docs),
+        w = 2,
+        grouped_docs = GroupedDocs,
+        reply = dict:from_list([{D, []} || D <- Docs]),
+        serialize_worker_startup = false
+    },
+    % With enough peers returning ok for both docs, resolve doc1 as ok
+    % even if one reported aconflict.
+    {ok, Acc1} = handle_message({ok, [conflict, {ok, Doc2}]}, S1, Acc0),
+    {ok, Acc2} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, S2, Acc1),
+    {stop, {ok, Reply}} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, S3, 
Acc2),
+    ?assertEqual(
+        lists:sort([{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]),
+        lists:sort(Reply)
+    ).
+
 % needed for testing to avoid having to start the mem3 application
 group_docs_by_shard_hack(_DbName, Shards, Docs) ->
     dict:to_list(

Reply via email to