rnewson commented on code in PR #5371:
URL: https://github.com/apache/couchdb/pull/5371#discussion_r1904499092
##########
src/fabric/src/fabric_doc_update.erl:
##########
@@ -367,13 +374,43 @@ start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} =
Acc0) when is_reference(
start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
ok.
-append_update_replies([], [], DocReplyDict) ->
+append_update_replies([], [], _W, DocReplyDict) ->
DocReplyDict;
-append_update_replies([Doc | Rest], [], Dict0) ->
+append_update_replies([Doc | Rest], [], W, Dict0) ->
% icky, if replicated_changes only errors show up in result
- append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
- append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+ append_update_replies(Rest, [], W, dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc | Rest1], [conflict | Rest2], W, Dict0) ->
+ %% fake conflict replies from followers as we won't ask them
+ append_update_replies(
+ Rest1, Rest2, W, dict:append_list(Doc, lists:duplicate(W, conflict),
Dict0)
+ );
+append_update_replies([Doc | Rest1], [Reply | Rest2], W, Dict0) ->
Review Comment:
hmmm good question. the 2nd reply is only happening if we started the
followers, and if we do that we're doing them all in parallel, so whether we
send conflict based on just the 2nd reply or conflict|accepted if we got
201/409 from both, there's still the same chance of a stored conflict, it's
only the status code of the response that would be wrong (we'd send 409 instead
of 202, but that can happen anyway).
I had considered doing this in tiers. that is, if we hear that the 'leader'
shard is down (rexi DOWN or the rexi EXIT for maintenance mode) we could
recalculate and deem one of the N-1 remaining shards as the leader, and do the
same logic. I didn't do it because I didn't like the latency implications, but
it leads to the situation above.
I agree it would be clearer if the fake reply thing only triggers if the
reply is from a leader. It won't make things much better but it'll be easier to
understand later.
##########
src/fabric/src/fabric_doc_update.erl:
##########
@@ -292,18 +336,94 @@ group_docs_by_shard(DbName, Docs) ->
)
).
-append_update_replies([], [], DocReplyDict) ->
+%% use 'lowest' node that hosts this shard range as leader
+is_leader(Worker, Workers) ->
+ Worker == lists:min([W || W <- Workers, W#shard.range ==
Worker#shard.range]).
+
+start_leaders(#acc{} = Acc0) ->
+ #acc{grouped_docs = GroupedDocs} = Acc0,
+ {Workers, _} = lists:unzip(GroupedDocs),
+ Started = lists:foldl(
+ fun({Worker, Docs}, RefAcc) ->
+ case is_leader(Worker, Workers) of
+ true ->
+ start_worker(Worker, Docs, Acc0),
+ [Worker#shard.ref | RefAcc];
+ false ->
+ RefAcc
+ end
+ end,
+ [],
+ GroupedDocs
+ ),
+ Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.
+
+start_followers(#shard{} = Leader, #acc{} = Acc0) ->
+ Followers = [
+ {Worker, _Docs}
+ || {Worker, _Docs} <- Acc0#acc.grouped_docs,
+ Worker#shard.range == Leader#shard.range,
+ not lists:member(Worker#shard.ref, Acc0#acc.started)
+ ],
+ lists:foreach(
+ fun({Worker, Docs}) ->
+ start_worker(Worker, Docs, Acc0)
+ end,
+ Followers
+ ),
+ Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers],
+ Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.
+
+start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when
is_reference(Ref) ->
+ #shard{name = Name, node = Node} = Worker,
+ #acc{update_options = UpdateOptions} = Acc0,
+ rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name,
untag_docs(Docs), UpdateOptions]}),
+ ok;
+start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
+ ok.
+
+append_update_replies([], [], _W, DocReplyDict) ->
DocReplyDict;
-append_update_replies([Doc | Rest], [], Dict0) ->
+append_update_replies([Doc | Rest], [], W, Dict0) ->
% icky, if replicated_changes only errors show up in result
- append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
- append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-skip_message({0, _, W, _, DocReplyDict}) ->
+ append_update_replies(Rest, [], W, dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc | Rest1], [conflict | Rest2], W, Dict0) ->
+ %% fake conflict replies from followers as we won't ask them
+ append_update_replies(
+ Rest1, Rest2, W, dict:append_list(Doc, lists:duplicate(W, conflict),
Dict0)
+ );
+append_update_replies([Doc | Rest1], [Reply | Rest2], W, Dict0) ->
+ append_update_replies(Rest1, Rest2, W, dict:append(Doc, Reply, Dict0)).
Review Comment:
sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]