This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch reduce-intra-cluster-conflicts
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b97768ce4e5994d78e5517d83988b34d2c0e5f25
Author: Robert Newson <[email protected]>
AuthorDate: Wed Feb 18 18:04:10 2026 +0000

    don't update followers where leader got conflicts
---
 src/fabric/src/fabric_doc_update.erl | 36 +++++++++++++++++++++++++++++-------
 1 file changed, 29 insertions(+), 7 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 5aba9d482..01a564e54 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -24,7 +24,7 @@
     grouped_docs,
     reply,
     dbname,
-    update_options,
+    update_options = [],
     leaders = [],
     started = []
 }).
@@ -115,8 +115,15 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         grouped_docs = GroupedDocs,
         reply = DocReplyDict0
     } = Acc0,
-    Acc1 = start_followers([Worker#shard.range], Acc0),
-    {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
+    {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
+    NewGrpDocs =
+        case lists:member(Worker#shard.ref, Acc0#acc.leaders) of
+            true ->
+                remove_conflicted_docs(Docs, Replies, NewGrpDocs0);
+            false ->
+                NewGrpDocs0
+        end,
+    Acc2 = start_followers([Worker#shard.range], Acc0#acc{grouped_docs = 
NewGrpDocs}),
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
     case {WaitingCount, dict:size(DocReplyDict)} of
         {1, _} ->
@@ -131,7 +138,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
             % we've got at least one reply for each document, let's take a look
             case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
                 continue ->
-                    {ok, Acc1#acc{
+                    {ok, Acc2#acc{
                         waiting_count = WaitingCount - 1,
                         grouped_docs = NewGrpDocs,
                         reply = DocReplyDict
@@ -140,7 +147,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
                     {stop, {ok, FinalReplies}}
             end;
         _ ->
-            {ok, Acc1#acc{
+            {ok, Acc2#acc{
                 waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, 
reply = DocReplyDict
             }}
     end;
@@ -194,8 +201,11 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
 
 untag_docs([]) ->
     [];
-untag_docs([#doc{meta = Meta} = Doc | Rest]) ->
-    [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
+untag_docs([#doc{} = Doc | Rest]) ->
+    [untag_doc(Doc) | untag_docs(Rest)].
+
+untag_doc(#doc{} = Doc) ->
+    Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}.
 
 force_reply(Doc, [], {_, W, Acc}) ->
     {error, W, [{Doc, {error, internal_server_error}} | Acc]};
@@ -416,6 +426,18 @@ start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
     % for unit tests below.
     ok.
 
+%% Remove all remaining doc update attempts if a conflict occurred at leader
+remove_conflicted_docs(Docs, Replies, GroupedDocs) when length(Docs) == 
length(Replies) ->
+    ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)],
+    UntaggedConflictDocs = untag_docs(ConflictDocs),
+    [
+        {W, [D || D <- Ds, not lists:member(untag_doc(D), 
UntaggedConflictDocs)]}
+     || {W, Ds} <- GroupedDocs
+    ];
+remove_conflicted_docs(_Docs, _Replies, GroupedDocs) ->
+    %% replies can be shorter for replicated changes as only errors show up in 
the result
+    GroupedDocs.
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").

Reply via email to