This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch fix-serializer-worker-startup-false-mode in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3216bc7928adb70acac81fbc90e37ef573917033 Author: Robert Newson <[email protected]> AuthorDate: Fri May 1 14:06:17 2026 +0100 fix serialize_worker_startup=false mode --- src/fabric/src/fabric_doc_update.erl | 82 ++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 28 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 89fb9618d..52f72b9af 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -18,6 +18,7 @@ -include_lib("couch/include/couch_db.hrl"). -record(acc, { + serialize_worker_startup = true, waiting_count, doc_count, w, @@ -44,6 +45,7 @@ go(DbName, AllDocs0, Opts) -> {Workers, _} = lists:unzip(GroupedDocs), RexiMon = fabric_util:create_monitors(Workers), Acc0 = #acc{ + serialize_worker_startup = serialize_worker_startup(), waiting_count = length(Workers), doc_count = length(AllDocs), w = fabric_util:w_from_opts(DbName, Options), @@ -65,7 +67,7 @@ go(DbName, AllDocs0, Opts) -> fabric_util:log_timeout(DefunctWorkers, "update_docs"), {Health, _, Resp} = dict:fold( fun force_reply/3, - {ok, W1, []}, + {ok, Acc0#acc.serialize_worker_startup, W1, []}, DocReplDict ), ensure_all_responses(Health, AllDocs, Resp); @@ -106,21 +108,31 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> reply = DocReplyDict0 } = Acc0, {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), - NewGrpDocs = remove_conflicted_docs(Docs, Replies, NewGrpDocs0), + NewGrpDocs = + case Acc0#acc.serialize_worker_startup of + true -> remove_conflicted_docs(Docs, Replies, NewGrpDocs0); + false -> NewGrpDocs0 + end, DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of {1, _} -> % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold( + {Health, _SWS, W, Reply} = dict:fold( fun force_reply/3, - {ok, W, []}, + {ok, Acc0#acc.serialize_worker_startup, W, []}, DocReplyDict ), start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}), {stop, {Health, Reply}}; {_, DocCount} -> % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of + case + dict:fold( + fun maybe_reply/3, + {stop, Acc0#acc.serialize_worker_startup, W, []}, + DocReplyDict + ) + of continue -> {ok, start_workers(Acc0#acc{ @@ -128,7 +140,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> grouped_docs = NewGrpDocs, reply = DocReplyDict })}; - {stop, W, FinalReplies} -> + {stop, _SWS, W, FinalReplies} -> start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}), {stop, {ok, FinalReplies}} end; @@ -198,17 +210,17 @@ untag_docs([#doc{} = Doc | Rest]) -> untag_doc(#doc{} = Doc) -> Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}. -force_reply(Doc, [], {_, W, Acc}) -> - {error, W, [{Doc, {error, internal_server_error}} | Acc]}; -force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> - case update_quorum_met(W, Replies) of +force_reply(Doc, [], {_, SWS, W, Acc}) -> + {error, SWS, W, [{Doc, {error, internal_server_error}} | Acc]}; +force_reply(Doc, [FirstReply | _] = Replies, {Health, SWS, W, Acc}) -> + case update_quorum_met(SWS, W, Replies) of {true, Reply} -> % corner case new_edits:false and vdu: [noreply, forbidden, noreply] case check_forbidden_msg(Replies) of {forbidden, ForbiddenReply} -> - {Health, W, [{Doc, ForbiddenReply} | Acc]}; + {Health, SWS, W, [{Doc, ForbiddenReply} | Acc]}; false -> - {Health, W, [{Doc, Reply} | Acc]} + {Health, SWS, W, [{Doc, Reply} | Acc]} end; false -> case [Reply || {ok, Reply} <- Replies] of @@ -218,15 +230,15 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> true -> CounterKey = [fabric, doc_update, errors], couch_stats:increment_counter(CounterKey), - {Health, W, [{Doc, FirstReply} | Acc]}; + {Health, SWS, W, [{Doc, FirstReply} | Acc]}; false -> CounterKey = [fabric, doc_update, mismatched_errors], couch_stats:increment_counter(CounterKey), case check_forbidden_msg(Replies) of {forbidden, ForbiddenReply} -> - {Health, W, [{Doc, ForbiddenReply} | Acc]}; + {Health, SWS, W, [{Doc, ForbiddenReply} | Acc]}; false -> - {error, W, [{Doc, FirstReply} | Acc]} + {error, SWS, W, [{Doc, FirstReply} | Acc]} end end; [AcceptedRev | _] -> @@ -237,17 +249,17 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> ok -> accepted; _ -> Health end, - {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]} + {NewHealth, SWS, W, [{Doc, {accepted, AcceptedRev}} | Acc]} end end. maybe_reply(_, _, continue) -> % we didn't meet quorum for all docs, so we're fast-forwarding the fold continue; -maybe_reply(Doc, Replies, {stop, W, Acc}) -> - case update_quorum_met(W, Replies) of +maybe_reply(Doc, Replies, {stop, SWS, W, Acc}) -> + case update_quorum_met(SWS, W, Replies) of {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; + {stop, SWS, W, [{Doc, Reply} | Acc]}; false -> continue end. @@ -296,17 +308,22 @@ check_forbidden_msg(Replies) -> end end. -update_quorum_met(W, Replies) -> +update_quorum_met(SWS, W, Replies) -> Counters = lists:foldl( fun(R, D) -> orddict:update_counter(R, 1, D) end, orddict:new(), Replies ), - GoodReplies = lists:filter(fun good_reply/1, Counters), + GoodReply = + if + SWS -> fun good_reply_with_conflict/1; + true -> fun good_reply/1 + end, + GoodReplies = lists:filter(GoodReply, Counters), case lists:dropwhile( fun - ({conflict, _}) -> false; + ({conflict, _}) when SWS -> false; ({_, Count}) -> Count < W end, GoodReplies @@ -322,11 +339,18 @@ good_reply({{ok, _}, _}) -> true; good_reply({noreply, _}) -> true; -good_reply({conflict, _}) -> - true; good_reply(_) -> false. +good_reply_with_conflict({{ok, _}, _}) -> + true; +good_reply_with_conflict({noreply, _}) -> + true; +good_reply_with_conflict({conflict, _}) -> + true; +good_reply_with_conflict(_) -> + false. + -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> dict:to_list( @@ -360,8 +384,8 @@ append_update_replies([Doc | Rest], [], Dict0) -> append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). -skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) -> - {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), +skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict, serialize_worker_startup = SWS}) -> + {Health, _SWS, W, Reply} = dict:fold(fun force_reply/3, {ok, SWS, W, []}, DocReplyDict), {stop, {Health, Reply}}; skip_message(#acc{} = Acc0) -> {ok, Acc0}. @@ -387,8 +411,7 @@ validate_atomic_update(_DbName, AllDocs, true) -> %% serialize worker startup for interactive edits unless disabled in config. start_workers_strategy(#acc{} = Acc) -> - SerializeWorkerStartup = config:get_boolean("fabric", "serialize_worker_startup", true), - case {update_type(Acc), SerializeWorkerStartup} of + case {update_type(Acc), Acc#acc.serialize_worker_startup} of {?REPLICATED_CHANGES, _} -> start_remaining_workers(Acc); {_, true} -> @@ -397,6 +420,9 @@ start_workers_strategy(#acc{} = Acc) -> start_remaining_workers(Acc) end. +serialize_worker_startup() -> + config:get_boolean("fabric", "serialize_worker_startup", true). + update_type(#acc{} = Acc) -> case proplists:get_value(?REPLICATED_CHANGES, Acc#acc.update_options) of true -> ?REPLICATED_CHANGES;
