This is an automated email from the ASF dual-hosted git repository. nickva pushed a commit to branch fix-parallel-spawn-model-for-update-docs in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 90467a88713ad5fa1ff421884c22c203864d363f Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Apr 30 00:06:07 2026 -0400 Fix append_update_replies case clause in fabric_doc_update In `serialize_worker_startup=false` mode it was possible before to get a function_clause error in `fabric_doc_update:append_update_replies/3`. The function_clause was because of an empty `Docs` list and a non-empty `Replies` list. That happened when we had at least two conflicted replies for a doc one arriving, and others in-flight, the arriving one would knock out the doc from the expected replies list, such that when the second reply arrived, it wouldn't match up with anything. To fix this, keep track of conflicts separately and check that conflicted list before extra workers spawn, and also avoid altering the grouped docs structure for in-flight requests --- src/fabric/src/fabric_doc_update.erl | 146 +++++++++++++++++++++++++++++------ 1 file changed, 122 insertions(+), 24 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 89fb9618d..e7968134d 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -25,7 +25,8 @@ reply, dbname, update_options = [], - started = [] + started = [], + conflicts = [] }). go(_, [], _) -> @@ -103,11 +104,13 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> doc_count = DocCount, w = W, grouped_docs = GroupedDocs, - reply = DocReplyDict0 + reply = DocReplyDict0, + conflicts = Conflicts0 } = Acc0, - {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), - NewGrpDocs = remove_conflicted_docs(Docs, Replies, NewGrpDocs0), + {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), + Conflicts = collect_conflicts(Docs, Replies, Conflicts0), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), + Acc1 = Acc0#acc{conflicts = Conflicts}, case {WaitingCount, dict:size(DocReplyDict)} of {1, _} -> % last message has arrived, we need to conclude things @@ -116,25 +119,25 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> {ok, W, []}, DocReplyDict ), - start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}), + start_remaining_workers(Acc1#acc{grouped_docs = NewGrpDocs}), {stop, {Health, Reply}}; {_, DocCount} -> % we've got at least one reply for each document, let's take a look case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of continue -> {ok, - start_workers(Acc0#acc{ + start_workers(Acc1#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict })}; {stop, W, FinalReplies} -> - start_remaining_workers(Acc0#acc{grouped_docs = NewGrpDocs}), + start_remaining_workers(Acc1#acc{grouped_docs = NewGrpDocs}), {stop, {ok, FinalReplies}} end; _ -> {ok, - start_workers(Acc0#acc{ + start_workers(Acc1#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict @@ -427,33 +430,41 @@ start_remaining_workers(#acc{} = Acc) -> Acc#acc.grouped_docs ). -start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when is_reference(Ref) -> +start_worker(#shard{ref = Ref} = Worker, Docs0, #acc{} = Acc) when is_reference(Ref) -> #shard{name = Name, node = Node} = Worker, - #acc{update_options = UpdateOptions} = Acc, + #acc{update_options = UpdateOptions, conflicts = Conflicts} = Acc, case lists:member(Ref, Acc#acc.started) of true -> Acc; false -> + % If a doc is a settled conflict for a range, skip it + Docs = filter_conflicts(Docs0, Conflicts), Ref = rexi:cast_ref( Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]} ), - Acc#acc{started = [Ref | Acc#acc.started]} + % We need to save what we just cast so we can match up results exactly + NewGrouped = lists:keystore(Worker, 1, Acc#acc.grouped_docs, {Worker, Docs}), + Acc#acc{ + started = [Ref | Acc#acc.started], + grouped_docs = NewGrouped + } end; start_worker(#shard{}, _Docs, #acc{} = Acc) -> %% for unit tests below. Acc. -%% Remove all remaining doc update attempts if a conflict occurred -remove_conflicted_docs(Docs, Replies, GroupedDocs) when length(Docs) == length(Replies) -> - ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)], - UntaggedConflictDocs = untag_docs(ConflictDocs), - [ - {W, [D || D <- Ds, not lists:member(untag_doc(D), UntaggedConflictDocs)]} - || {W, Ds} <- GroupedDocs - ]; -remove_conflicted_docs(_Docs, _Replies, GroupedDocs) -> - %% replies can be shorter for replicated changes as only errors show up in the result - GroupedDocs. +% If we see conflict we declare is as final for that doc and record in +% conflicts field. When we start workers skip the conflicted ones. +collect_conflicts(Docs, Replies, Conflicts) when length(Docs) == length(Replies) -> + [untag_doc(D) || {D, conflict} <- lists:zip(Docs, Replies)] ++ Conflicts; +collect_conflicts(_Docs, _Replies, Conflicts) -> + % Replicated changes return no replies by default + Conflicts. + +filter_conflicts(Docs, []) -> + Docs; +filter_conflicts(Docs, Conflicts) -> + [D || D <- Docs, not lists:member(untag_doc(D), Conflicts)]. -ifdef(TEST). @@ -462,7 +473,12 @@ remove_conflicted_docs(_Docs, _Replies, GroupedDocs) -> setup_all() -> meck:new([couch_log, couch_stats]), meck:expect(couch_log, warning, fun(_, _) -> ok end), - meck:expect(couch_stats, increment_counter, fun(_) -> ok end). + meck:expect(couch_stats, increment_counter, fun(_) -> ok end), + %% Default rexi:cast_ref/3 is a no-op that returns the ref it was + %% given; tests that exercise the with-reference start_worker/3 + %% clause override this expectation locally to capture the cast. + meck:new(rexi, [passthrough]), + meck:expect(rexi, cast_ref, fun(Ref, _Node, _Msg) -> Ref end). teardown_all(_) -> meck:unload(). @@ -487,7 +503,10 @@ doc_update_test_() -> fun one_error_two_forbid/0, fun one_success_two_forbid/0, fun worker_before_doc_update_forbidden/0, - fun handle_bad_request/0 + fun handle_bad_request/0, + fun filter_conflicts_drops_seen_docs/0, + fun parallel_in_flight_after_conflict/0, + fun serial_filters_conflicts_at_cast/0 ] }. @@ -990,6 +1009,85 @@ handle_bad_request() -> handle_message({bad_request, err, reason}, hd(Shards), Acc) ). +filter_conflicts_drops_seen_docs() -> + Doc1 = #doc{revs = {1, [<<"foo">>]}}, + Doc2 = #doc{revs = {1, [<<"bar">>]}}, + [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]), + ?assertEqual([Doc1, Doc2], filter_conflicts([Doc1, Doc2], [])), + ?assertEqual([Doc2], filter_conflicts([Doc1, Doc2], [Doc1])), + ?assertEqual([], filter_conflicts([Doc1, Doc2], [Doc1, Doc2])), + % Match against untagged + ?assertEqual([Tagged2], filter_conflicts([Tagged1, Tagged2], [Doc1])). + +% In flight parallel mode, conflict reply from one worker shouldn't touch +% another in-flight workers's grouped_docs entry, otherwise we wouldn't be able +% to match replies exactly +parallel_in_flight_after_conflict() -> + Doc1 = #doc{revs = {1, [<<"foo">>]}}, + Doc2 = #doc{revs = {1, [<<"bar">>]}}, + Docs = [Doc1, Doc2], + [S1, S2 | _] = + Shards = + mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{D, []} || D <- Docs]) + }, + + % S1 returns Doc1=conflict / Doc2=ok. Doc1 should be added to conflicts. S2 + % entry should be left alone + {ok, #acc{conflicts = [Doc1]} = Acc1} = + handle_message({ok, [conflict, {ok, Doc2}]}, S1, Acc0), + {S2, S2Docs} = lists:keyfind(S2, 1, Acc1#acc.grouped_docs), + ?assertEqual([Doc1, Doc2], S2Docs), + + %% S2 reply comes back, and we want lengths to match exactly + {stop, {ok, Reply}} = + handle_message({ok, [conflict, {ok, Doc2}]}, S2, Acc1), + ?assertEqual( + lists:sort([{Doc1, conflict}, {Doc2, {ok, Doc2}}]), + lists:sort(Reply) + ). + +% In serial mode, a worker started after a peer's conflict response must be +% cast with conflicted doc filtred out and its grouped_docs updated to that +% filtered list. We want cast and reply pairs to always stay in sync +serial_filters_conflicts_at_cast() -> + Doc1 = #doc{id = <<"a">>, revs = {1, [<<"foo">>]}}, + Doc2 = #doc{id = <<"b">>, revs = {1, [<<"bar">>]}}, + [Tagged1, Tagged2] = tag_docs([Doc1, Doc2]), + Ref = make_ref(), + Worker = #shard{ + node = node1, + name = <<"db/shard">>, + ref = Ref, + range = [0, 100] + }, + Acc0 = #acc{ + conflicts = [Doc1], + grouped_docs = [{Worker, [Tagged1, Tagged2]}], + update_options = [] + }, + + Self = self(), + meck:expect(rexi, cast_ref, fun(R, _Node, Msg) -> + Self ! {cast, R, Msg}, + R + end), + Acc1 = start_worker(Worker, [Tagged1, Tagged2], Acc0), + receive + {cast, Ref, {fabric_rpc, update_docs, [<<"db/shard">>, CastDocs, []]}} -> + ?assertEqual([Doc2], CastDocs) + end, + + ?assert(lists:member(Ref, Acc1#acc.started)), + {Worker, Stored} = lists:keyfind(Worker, 1, Acc1#acc.grouped_docs), + ?assertEqual([Tagged2], Stored). + % needed for testing to avoid having to start the mem3 application group_docs_by_shard_hack(_DbName, Shards, Docs) -> dict:to_list(
