This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch fix-serializer-worker-startup-false-mode
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3216bc7928adb70acac81fbc90e37ef573917033
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 1 14:06:17 2026 +0100

    fix serialize_worker_startup=false mode
---
 src/fabric/src/fabric_doc_update.erl | 82 ++++++++++++++++++++++++------------
 1 file changed, 54 insertions(+), 28 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 89fb9618d..52f72b9af 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -18,6 +18,7 @@
 -include_lib("couch/include/couch_db.hrl").
 
 -record(acc, {
+    serialize_worker_startup = true,
     waiting_count,
     doc_count,
     w,
@@ -44,6 +45,7 @@ go(DbName, AllDocs0, Opts) ->
     {Workers, _} = lists:unzip(GroupedDocs),
     RexiMon = fabric_util:create_monitors(Workers),
     Acc0 = #acc{
+        serialize_worker_startup = serialize_worker_startup(),
         waiting_count = length(Workers),
         doc_count = length(AllDocs),
         w = fabric_util:w_from_opts(DbName, Options),
@@ -65,7 +67,7 @@ go(DbName, AllDocs0, Opts) ->
             fabric_util:log_timeout(DefunctWorkers, "update_docs"),
             {Health, _, Resp} = dict:fold(
                 fun force_reply/3,
-                {ok, W1, []},
+                {ok, Acc0#acc.serialize_worker_startup, W1, []},
                 DocReplDict
             ),
             ensure_all_responses(Health, AllDocs, Resp);
@@ -106,21 +108,31 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         reply = DocReplyDict0
     } = Acc0,
     {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
-    NewGrpDocs = remove_conflicted_docs(Docs, Replies, NewGrpDocs0),
+    NewGrpDocs =
+        case Acc0#acc.serialize_worker_startup of
+            true -> remove_conflicted_docs(Docs, Replies, NewGrpDocs0);
+            false -> NewGrpDocs0
+        end,
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
     case {WaitingCount, dict:size(DocReplyDict)} of
         {1, _} ->
             % last message has arrived, we need to conclude things
-            {Health, W, Reply} = dict:fold(
+            {Health, _SWS, W, Reply} = dict:fold(
                 fun force_reply/3,
-                {ok, W, []},
+                {ok, Acc0#acc.serialize_worker_startup, W, []},
                 DocReplyDict
             ),
             start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}),
             {stop, {Health, Reply}};
         {_, DocCount} ->
             % we've got at least one reply for each document, let's take a look
-            case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
+            case
+                dict:fold(
+                    fun maybe_reply/3,
+                    {stop, Acc0#acc.serialize_worker_startup, W, []},
+                    DocReplyDict
+                )
+            of
                 continue ->
                     {ok,
                         start_workers(Acc0#acc{
@@ -128,7 +140,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
                             grouped_docs = NewGrpDocs,
                             reply = DocReplyDict
                         })};
-                {stop, W, FinalReplies} ->
+                {stop, _SWS, W, FinalReplies} ->
                     start_remaining_workers(Acc0#acc{grouped_docs = 
NewGrpDocs}),
                     {stop, {ok, FinalReplies}}
             end;
@@ -198,17 +210,17 @@ untag_docs([#doc{} = Doc | Rest]) ->
 untag_doc(#doc{} = Doc) ->
     Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}.
 
-force_reply(Doc, [], {_, W, Acc}) ->
-    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
-force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
+force_reply(Doc, [], {_, SWS, W, Acc}) ->
+    {error, SWS, W, [{Doc, {error, internal_server_error}} | Acc]};
+force_reply(Doc, [FirstReply | _] = Replies, {Health, SWS, W, Acc}) ->
+    case update_quorum_met(SWS, W, Replies) of
         {true, Reply} ->
             % corner case new_edits:false and vdu: [noreply, forbidden, 
noreply]
             case check_forbidden_msg(Replies) of
                 {forbidden, ForbiddenReply} ->
-                    {Health, W, [{Doc, ForbiddenReply} | Acc]};
+                    {Health, SWS, W, [{Doc, ForbiddenReply} | Acc]};
                 false ->
-                    {Health, W, [{Doc, Reply} | Acc]}
+                    {Health, SWS, W, [{Doc, Reply} | Acc]}
             end;
         false ->
             case [Reply || {ok, Reply} <- Replies] of
@@ -218,15 +230,15 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, 
Acc}) ->
                         true ->
                             CounterKey = [fabric, doc_update, errors],
                             couch_stats:increment_counter(CounterKey),
-                            {Health, W, [{Doc, FirstReply} | Acc]};
+                            {Health, SWS, W, [{Doc, FirstReply} | Acc]};
                         false ->
                             CounterKey = [fabric, doc_update, 
mismatched_errors],
                             couch_stats:increment_counter(CounterKey),
                             case check_forbidden_msg(Replies) of
                                 {forbidden, ForbiddenReply} ->
-                                    {Health, W, [{Doc, ForbiddenReply} | Acc]};
+                                    {Health, SWS, W, [{Doc, ForbiddenReply} | 
Acc]};
                                 false ->
-                                    {error, W, [{Doc, FirstReply} | Acc]}
+                                    {error, SWS, W, [{Doc, FirstReply} | Acc]}
                             end
                     end;
                 [AcceptedRev | _] ->
@@ -237,17 +249,17 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, 
Acc}) ->
                             ok -> accepted;
                             _ -> Health
                         end,
-                    {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]}
+                    {NewHealth, SWS, W, [{Doc, {accepted, AcceptedRev}} | Acc]}
             end
     end.
 
 maybe_reply(_, _, continue) ->
     % we didn't meet quorum for all docs, so we're fast-forwarding the fold
     continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
+maybe_reply(Doc, Replies, {stop, SWS, W, Acc}) ->
+    case update_quorum_met(SWS, W, Replies) of
         {true, Reply} ->
-            {stop, W, [{Doc, Reply} | Acc]};
+            {stop, SWS, W, [{Doc, Reply} | Acc]};
         false ->
             continue
     end.
@@ -296,17 +308,22 @@ check_forbidden_msg(Replies) ->
             end
     end.
 
-update_quorum_met(W, Replies) ->
+update_quorum_met(SWS, W, Replies) ->
     Counters = lists:foldl(
         fun(R, D) -> orddict:update_counter(R, 1, D) end,
         orddict:new(),
         Replies
     ),
-    GoodReplies = lists:filter(fun good_reply/1, Counters),
+    GoodReply =
+        if
+            SWS -> fun good_reply_with_conflict/1;
+            true -> fun good_reply/1
+        end,
+    GoodReplies = lists:filter(GoodReply, Counters),
     case
         lists:dropwhile(
             fun
-                ({conflict, _}) -> false;
+                ({conflict, _}) when SWS -> false;
                 ({_, Count}) -> Count < W
             end,
             GoodReplies
@@ -322,11 +339,18 @@ good_reply({{ok, _}, _}) ->
     true;
 good_reply({noreply, _}) ->
     true;
-good_reply({conflict, _}) ->
-    true;
 good_reply(_) ->
     false.
 
+good_reply_with_conflict({{ok, _}, _}) ->
+    true;
+good_reply_with_conflict({noreply, _}) ->
+    true;
+good_reply_with_conflict({conflict, _}) ->
+    true;
+good_reply_with_conflict(_) ->
+    false.
+
 -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
 group_docs_by_shard(DbName, Docs) ->
     dict:to_list(
@@ -360,8 +384,8 @@ append_update_replies([Doc | Rest], [], Dict0) ->
 append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
     append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
 
-skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) ->
-    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, 
DocReplyDict),
+skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict, 
serialize_worker_startup = SWS}) ->
+    {Health, _SWS, W, Reply} = dict:fold(fun force_reply/3, {ok, SWS, W, []}, 
DocReplyDict),
     {stop, {Health, Reply}};
 skip_message(#acc{} = Acc0) ->
     {ok, Acc0}.
@@ -387,8 +411,7 @@ validate_atomic_update(_DbName, AllDocs, true) ->
 
 %% serialize worker startup for interactive edits unless disabled in config.
 start_workers_strategy(#acc{} = Acc) ->
-    SerializeWorkerStartup = config:get_boolean("fabric", 
"serialize_worker_startup", true),
-    case {update_type(Acc), SerializeWorkerStartup} of
+    case {update_type(Acc), Acc#acc.serialize_worker_startup} of
         {?REPLICATED_CHANGES, _} ->
             start_remaining_workers(Acc);
         {_, true} ->
@@ -397,6 +420,9 @@ start_workers_strategy(#acc{} = Acc) ->
             start_remaining_workers(Acc)
     end.
 
+serialize_worker_startup() ->
+    config:get_boolean("fabric", "serialize_worker_startup", true).
+
 update_type(#acc{} = Acc) ->
     case proplists:get_value(?REPLICATED_CHANGES, Acc#acc.update_options) of
         true -> ?REPLICATED_CHANGES;

Reply via email to