This is an automated email from the ASF dual-hosted git repository.

nickva pushed a commit to branch fix-parallel-spawn-model-for-update-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 90467a88713ad5fa1ff421884c22c203864d363f
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Apr 30 00:06:07 2026 -0400

    Fix append_update_replies case clause in fabric_doc_update
    
    In `serialize_worker_startup=false` mode it was possible before to get a
    function_clause error in `fabric_doc_update:append_update_replies/3`. The
    function_clause was because of an empty `Docs` list and a non-empty 
`Replies`
    list. That happened when we had at least two conflicted replies for a doc 
one
    arriving, and others in-flight, the arriving one would knock out the doc 
from
    the expected replies list, such that when the second reply arrived, it 
wouldn't
    match up with anything.
    
    To fix this, keep track of conflicts separately and check that conflicted 
list
    before extra workers spawn, and also avoid altering the grouped docs 
structure
    for in-flight requests
---
 src/fabric/src/fabric_doc_update.erl | 146 +++++++++++++++++++++++++++++------
 1 file changed, 122 insertions(+), 24 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 89fb9618d..e7968134d 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -25,7 +25,8 @@
     reply,
     dbname,
     update_options = [],
-    started = []
+    started = [],
+    conflicts = []
 }).
 
 go(_, [], _) ->
@@ -103,11 +104,13 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         doc_count = DocCount,
         w = W,
         grouped_docs = GroupedDocs,
-        reply = DocReplyDict0
+        reply = DocReplyDict0,
+        conflicts = Conflicts0
     } = Acc0,
-    {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
-    NewGrpDocs = remove_conflicted_docs(Docs, Replies, NewGrpDocs0),
+    {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
+    Conflicts = collect_conflicts(Docs, Replies, Conflicts0),
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+    Acc1 = Acc0#acc{conflicts = Conflicts},
     case {WaitingCount, dict:size(DocReplyDict)} of
         {1, _} ->
             % last message has arrived, we need to conclude things
@@ -116,25 +119,25 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
                 {ok, W, []},
                 DocReplyDict
             ),
-            start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}),
+            start_remaining_workers(Acc1#acc{grouped_docs = NewGrpDocs}),
             {stop, {Health, Reply}};
         {_, DocCount} ->
             % we've got at least one reply for each document, let's take a look
             case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
                 continue ->
                     {ok,
-                        start_workers(Acc0#acc{
+                        start_workers(Acc1#acc{
                             waiting_count = WaitingCount - 1,
                             grouped_docs = NewGrpDocs,
                             reply = DocReplyDict
                         })};
                 {stop, W, FinalReplies} ->
-                    start_remaining_workers(Acc0#acc{grouped_docs = 
NewGrpDocs}),
+                    start_remaining_workers(Acc1#acc{grouped_docs = 
NewGrpDocs}),
                     {stop, {ok, FinalReplies}}
             end;
         _ ->
             {ok,
-                start_workers(Acc0#acc{
+                start_workers(Acc1#acc{
                     waiting_count = WaitingCount - 1,
                     grouped_docs = NewGrpDocs,
                     reply = DocReplyDict
@@ -427,33 +430,41 @@ start_remaining_workers(#acc{} = Acc) ->
         Acc#acc.grouped_docs
     ).
 
-start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when 
is_reference(Ref) ->
+start_worker(#shard{ref = Ref} = Worker, Docs0, #acc{} = Acc) when 
is_reference(Ref) ->
     #shard{name = Name, node = Node} = Worker,
-    #acc{update_options = UpdateOptions} = Acc,
+    #acc{update_options = UpdateOptions, conflicts = Conflicts} = Acc,
     case lists:member(Ref, Acc#acc.started) of
         true ->
             Acc;
         false ->
+            % If a doc is a settled conflict for a range, skip it
+            Docs = filter_conflicts(Docs0, Conflicts),
             Ref = rexi:cast_ref(
                 Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), 
UpdateOptions]}
             ),
-            Acc#acc{started = [Ref | Acc#acc.started]}
+            % We need to save what we just cast so we can match up results 
exactly
+            NewGrouped = lists:keystore(Worker, 1, Acc#acc.grouped_docs, 
{Worker, Docs}),
+            Acc#acc{
+                started = [Ref | Acc#acc.started],
+                grouped_docs = NewGrouped
+            }
     end;
 start_worker(#shard{}, _Docs, #acc{} = Acc) ->
     %% for unit tests below.
     Acc.
 
-%% Remove all remaining doc update attempts if a conflict occurred
-remove_conflicted_docs(Docs, Replies, GroupedDocs) when length(Docs) == 
length(Replies) ->
-    ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)],
-    UntaggedConflictDocs = untag_docs(ConflictDocs),
-    [
-        {W, [D || D <- Ds, not lists:member(untag_doc(D), 
UntaggedConflictDocs)]}
-     || {W, Ds} <- GroupedDocs
-    ];
-remove_conflicted_docs(_Docs, _Replies, GroupedDocs) ->
-    %% replies can be shorter for replicated changes as only errors show up in 
the result
-    GroupedDocs.
+% If we see conflict we declare is as final for that doc and record in
+% conflicts field. When we start workers skip the conflicted ones.
+collect_conflicts(Docs, Replies, Conflicts) when length(Docs) == 
length(Replies) ->
+    [untag_doc(D) || {D, conflict} <- lists:zip(Docs, Replies)] ++ Conflicts;
+collect_conflicts(_Docs, _Replies, Conflicts) ->
+    % Replicated changes return no replies by default
+    Conflicts.
+
+filter_conflicts(Docs, []) ->
+    Docs;
+filter_conflicts(Docs, Conflicts) ->
+    [D || D <- Docs, not lists:member(untag_doc(D), Conflicts)].
 
 -ifdef(TEST).
 
@@ -462,7 +473,12 @@ remove_conflicted_docs(_Docs, _Replies, GroupedDocs) ->
 setup_all() ->
     meck:new([couch_log, couch_stats]),
     meck:expect(couch_log, warning, fun(_, _) -> ok end),
-    meck:expect(couch_stats, increment_counter, fun(_) -> ok end).
+    meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
+    %% Default rexi:cast_ref/3 is a no-op that returns the ref it was
+    %% given; tests that exercise the with-reference start_worker/3
+    %% clause override this expectation locally to capture the cast.
+    meck:new(rexi, [passthrough]),
+    meck:expect(rexi, cast_ref, fun(Ref, _Node, _Msg) -> Ref end).
 
 teardown_all(_) ->
     meck:unload().
@@ -487,7 +503,10 @@ doc_update_test_() ->
             fun one_error_two_forbid/0,
             fun one_success_two_forbid/0,
             fun worker_before_doc_update_forbidden/0,
-            fun handle_bad_request/0
+            fun handle_bad_request/0,
+            fun filter_conflicts_drops_seen_docs/0,
+            fun parallel_in_flight_after_conflict/0,
+            fun serial_filters_conflicts_at_cast/0
         ]
     }.
 
@@ -990,6 +1009,85 @@ handle_bad_request() ->
         handle_message({bad_request, err, reason}, hd(Shards), Acc)
     ).
 
+filter_conflicts_drops_seen_docs() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]),
+    ?assertEqual([Doc1, Doc2], filter_conflicts([Doc1, Doc2], [])),
+    ?assertEqual([Doc2], filter_conflicts([Doc1, Doc2], [Doc1])),
+    ?assertEqual([], filter_conflicts([Doc1, Doc2], [Doc1, Doc2])),
+    % Match against untagged
+    ?assertEqual([Tagged2], filter_conflicts([Tagged1, Tagged2], [Doc1])).
+
+% In flight parallel mode, conflict reply from one worker shouldn't touch
+% another in-flight workers's grouped_docs entry, otherwise we wouldn't be able
+% to match replies exactly
+parallel_in_flight_after_conflict() ->
+    Doc1 = #doc{revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{revs = {1, [<<"bar">>]}},
+    Docs = [Doc1, Doc2],
+    [S1, S2 | _] =
+        Shards =
+        mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", 
"node3"]),
+    GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
+    Acc0 = #acc{
+        waiting_count = length(Shards),
+        doc_count = length(Docs),
+        w = 2,
+        grouped_docs = GroupedDocs,
+        reply = dict:from_list([{D, []} || D <- Docs])
+    },
+
+    % S1 returns Doc1=conflict / Doc2=ok. Doc1 should be added to conflicts. S2
+    % entry should be left alone
+    {ok, #acc{conflicts = [Doc1]} = Acc1} =
+        handle_message({ok, [conflict, {ok, Doc2}]}, S1, Acc0),
+    {S2, S2Docs} = lists:keyfind(S2, 1, Acc1#acc.grouped_docs),
+    ?assertEqual([Doc1, Doc2], S2Docs),
+
+    %% S2 reply comes back, and we want lengths to match exactly
+    {stop, {ok, Reply}} =
+        handle_message({ok, [conflict, {ok, Doc2}]}, S2, Acc1),
+    ?assertEqual(
+        lists:sort([{Doc1, conflict}, {Doc2, {ok, Doc2}}]),
+        lists:sort(Reply)
+    ).
+
+% In serial mode, a worker started after a peer's conflict response must be
+% cast with conflicted doc filtred out and its grouped_docs updated to that
+% filtered list. We want cast and reply pairs to always stay in sync
+serial_filters_conflicts_at_cast() ->
+    Doc1 = #doc{id = <<"a">>, revs = {1, [<<"foo">>]}},
+    Doc2 = #doc{id = <<"b">>, revs = {1, [<<"bar">>]}},
+    [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]),
+    Ref = make_ref(),
+    Worker = #shard{
+        node = node1,
+        name = <<"db/shard">>,
+        ref = Ref,
+        range = [0, 100]
+    },
+    Acc0 = #acc{
+        conflicts = [Doc1],
+        grouped_docs = [{Worker, [Tagged1, Tagged2]}],
+        update_options = []
+    },
+
+    Self = self(),
+    meck:expect(rexi, cast_ref, fun(R, _Node, Msg) ->
+        Self ! {cast, R, Msg},
+        R
+    end),
+    Acc1 = start_worker(Worker, [Tagged1, Tagged2], Acc0),
+    receive
+        {cast, Ref, {fabric_rpc, update_docs, [<<"db/shard">>, CastDocs, []]}} 
->
+            ?assertEqual([Doc2], CastDocs)
+    end,
+
+    ?assert(lists:member(Ref, Acc1#acc.started)),
+    {Worker, Stored} = lists:keyfind(Worker, 1, Acc1#acc.grouped_docs),
+    ?assertEqual([Tagged2], Stored).
+
 % needed for testing to avoid having to start the mem3 application
 group_docs_by_shard_hack(_DbName, Shards, Docs) ->
     dict:to_list(

Reply via email to