This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch reduce-intra-cluster-conflicts in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b97768ce4e5994d78e5517d83988b34d2c0e5f25 Author: Robert Newson <[email protected]> AuthorDate: Wed Feb 18 18:04:10 2026 +0000 don't update followers where leader got conflicts --- src/fabric/src/fabric_doc_update.erl | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 5aba9d482..01a564e54 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -24,7 +24,7 @@ grouped_docs, reply, dbname, - update_options, + update_options = [], leaders = [], started = [] }). @@ -115,8 +115,15 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, - Acc1 = start_followers([Worker#shard.range], Acc0), - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), + {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), + NewGrpDocs = + case lists:member(Worker#shard.ref, Acc0#acc.leaders) of + true -> + remove_conflicted_docs(Docs, Replies, NewGrpDocs0); + false -> + NewGrpDocs0 + end, + Acc2 = start_followers([Worker#shard.range], Acc0#acc{grouped_docs = NewGrpDocs}), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of {1, _} -> @@ -131,7 +138,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> % we've got at least one reply for each document, let's take a look case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of continue -> - {ok, Acc1#acc{ + {ok, Acc2#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict @@ -140,7 +147,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> {stop, {ok, FinalReplies}} end; _ -> - {ok, Acc1#acc{ + {ok, Acc2#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict }} end; @@ -194,8 +201,11 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) -> untag_docs([]) -> []; -untag_docs([#doc{meta = Meta} = Doc | Rest]) -> - [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. +untag_docs([#doc{} = Doc | Rest]) -> + [untag_doc(Doc) | untag_docs(Rest)]. + +untag_doc(#doc{} = Doc) -> + Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}. force_reply(Doc, [], {_, W, Acc}) -> {error, W, [{Doc, {error, internal_server_error}} | Acc]}; @@ -416,6 +426,18 @@ start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> % for unit tests below. ok. +%% Remove all remaining doc update attempts if a conflict occurred at leader +remove_conflicted_docs(Docs, Replies, GroupedDocs) when length(Docs) == length(Replies) -> + ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)], + UntaggedConflictDocs = untag_docs(ConflictDocs), + [ + {W, [D || D <- Ds, not lists:member(untag_doc(D), UntaggedConflictDocs)]} + || {W, Ds} <- GroupedDocs + ]; +remove_conflicted_docs(_Docs, _Replies, GroupedDocs) -> + %% replies can be shorter for replicated changes as only errors show up in the result + GroupedDocs. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl").
