rnewson commented on code in PR #5371:
URL: https://github.com/apache/couchdb/pull/5371#discussion_r1904499092


##########
src/fabric/src/fabric_doc_update.erl:
##########
@@ -367,13 +374,43 @@ start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = 
Acc0) when is_reference(
 start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
     ok.
 
-append_update_replies([], [], DocReplyDict) ->
+append_update_replies([], [], _W, DocReplyDict) ->
     DocReplyDict;
-append_update_replies([Doc | Rest], [], Dict0) ->
+append_update_replies([Doc | Rest], [], W, Dict0) ->
     % icky, if replicated_changes only errors show up in result
-    append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
-    append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+    append_update_replies(Rest, [], W, dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc | Rest1], [conflict | Rest2], W, Dict0) ->
+    %% fake conflict replies from followers as we won't ask them
+    append_update_replies(
+        Rest1, Rest2, W, dict:append_list(Doc, lists:duplicate(W, conflict), 
Dict0)
+    );
+append_update_replies([Doc | Rest1], [Reply | Rest2], W, Dict0) ->

Review Comment:
   hmmm good question. the 2nd reply is only happening if we started the 
followers, and if we do that we're doing them all in parallel, so whether we 
send conflict based on just the 2nd reply or conflict|accepted if we got 
201/409 from both, there's still the same chance of a stored conflict, it's 
only the status code of the response that would be wrong (we'd send 409 instead 
of 202, but that can happen anyway).
   
   I had considered doing this in tiers. that is, if we hear that the 'leader' 
shard is down (rexi DOWN or the rexi EXIT for maintenance mode) we could 
recalculate and deem one of the N-1 remaining shards as the leader, and do the 
same logic. I didn't do it because I didn't like the latency implications, but 
it leads to the situation above.
   
   I agree it would be clearer if the fake reply thing only triggers if the 
reply is from a leader. It won't make things much better but it'll be easier to 
understand later. 



##########
src/fabric/src/fabric_doc_update.erl:
##########
@@ -292,18 +336,94 @@ group_docs_by_shard(DbName, Docs) ->
         )
     ).
 
-append_update_replies([], [], DocReplyDict) ->
+%% use 'lowest' node that hosts this shard range as leader
+is_leader(Worker, Workers) ->
+    Worker == lists:min([W || W <- Workers, W#shard.range == 
Worker#shard.range]).
+
+start_leaders(#acc{} = Acc0) ->
+    #acc{grouped_docs = GroupedDocs} = Acc0,
+    {Workers, _} = lists:unzip(GroupedDocs),
+    Started = lists:foldl(
+        fun({Worker, Docs}, RefAcc) ->
+            case is_leader(Worker, Workers) of
+                true ->
+                    start_worker(Worker, Docs, Acc0),
+                    [Worker#shard.ref | RefAcc];
+                false ->
+                    RefAcc
+            end
+        end,
+        [],
+        GroupedDocs
+    ),
+    Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.
+
+start_followers(#shard{} = Leader, #acc{} = Acc0) ->
+    Followers = [
+        {Worker, _Docs}
+     || {Worker, _Docs} <- Acc0#acc.grouped_docs,
+        Worker#shard.range == Leader#shard.range,
+        not lists:member(Worker#shard.ref, Acc0#acc.started)
+    ],
+    lists:foreach(
+        fun({Worker, Docs}) ->
+            start_worker(Worker, Docs, Acc0)
+        end,
+        Followers
+    ),
+    Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers],
+    Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.
+
+start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when 
is_reference(Ref) ->
+    #shard{name = Name, node = Node} = Worker,
+    #acc{update_options = UpdateOptions} = Acc0,
+    rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, 
untag_docs(Docs), UpdateOptions]}),
+    ok;
+start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
+    ok.
+
+append_update_replies([], [], _W, DocReplyDict) ->
     DocReplyDict;
-append_update_replies([Doc | Rest], [], Dict0) ->
+append_update_replies([Doc | Rest], [], W, Dict0) ->
     % icky, if replicated_changes only errors show up in result
-    append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
-    append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-skip_message({0, _, W, _, DocReplyDict}) ->
+    append_update_replies(Rest, [], W, dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc | Rest1], [conflict | Rest2], W, Dict0) ->
+    %% fake conflict replies from followers as we won't ask them
+    append_update_replies(
+        Rest1, Rest2, W, dict:append_list(Doc, lists:duplicate(W, conflict), 
Dict0)
+    );
+append_update_replies([Doc | Rest1], [Reply | Rest2], W, Dict0) ->
+    append_update_replies(Rest1, Rest2, W, dict:append(Doc, Reply, Dict0)).

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to