This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor-2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 135e6bcac30208ea31a8b14d8587829057a0ad9a Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Tue Apr 24 12:27:32 2018 -0500 Fix read-repair for new clustered purge --- src/fabric/src/fabric_doc_open.erl | 40 +++---- src/fabric/src/fabric_doc_open_revs.erl | 146 +++++++++++++++++-------- src/fabric/src/fabric_rpc.erl | 187 ++++++++++++++++++++------------ 3 files changed, 241 insertions(+), 132 deletions(-) diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl index b974880..0f8e377 100644 --- a/src/fabric/src/fabric_doc_open.erl +++ b/src/fabric/src/fabric_doc_open.erl @@ -25,8 +25,8 @@ r, state, replies, - q_reply, - replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged + node_id_revs = [], + q_reply }). @@ -84,8 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> end; handle_message(Reply, Worker, Acc) -> NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies), - NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node], - NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies}, + NewNodeIdRevs = case Reply of + {ok, #doc{id = Id, revs = {Pos, [Rev | _]}}} -> + [{Worker#shard.node, {Id, [{Pos, Rev}]}} | Acc#acc.node_id_revs]; + _ -> + Acc#acc.node_id_revs + end, + NewAcc = Acc#acc{replies = NewReplies, node_id_revs = NewNodeIdRevs}, case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of {true, QuorumReply} -> fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)), @@ -124,15 +129,14 @@ is_r_met(Workers, Replies, R) -> no_more_workers end. -read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) -> +read_repair(#acc{dbname=DbName, replies=Replies, node_id_revs=NodeIdRevs}) -> Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies], - NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0], case Docs of % omit local docs from read repair [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] -> choose_reply(Docs); [#doc{id=Id} | _] -> - Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}], + Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}], Res = fabric:update_docs(DbName, Docs, Opts), case Res of {ok, []} -> @@ -323,7 +327,7 @@ handle_message_reply_test() -> {ok, Acc0#acc{ workers=[Worker0, Worker1], replies=[fabric_util:kv(foo,1)], - replies_by_node=[{undefined, foo}] + node_id_revs=[] }}, handle_message(foo, Worker2, Acc0) ), @@ -332,7 +336,7 @@ handle_message_reply_test() -> {ok, Acc0#acc{ workers=[Worker0, Worker1], replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)], - replies_by_node=[{undefined, bar}] + node_id_revs=[] }}, handle_message(bar, Worker2, Acc0#acc{ replies=[fabric_util:kv(foo,1)] @@ -344,8 +348,7 @@ handle_message_reply_test() -> % is returned. Bit subtle on the assertions here. ?assertEqual( - {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)], - replies_by_node=[{undefined, foo}]}}, + {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}}, handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]}) ), @@ -353,12 +356,12 @@ handle_message_reply_test() -> {stop, Acc0#acc{ workers=[], replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)], - replies_by_node =[{undefined, bar}, {undefined, foo}] + node_id_revs =[{undefined, foo}] }}, handle_message(bar, Worker0, Acc0#acc{ workers=[Worker0], replies=[fabric_util:kv(foo,1)], - replies_by_node=[{undefined, foo}] + node_id_revs=[{undefined, foo}] }) ), @@ -371,12 +374,12 @@ handle_message_reply_test() -> replies=[fabric_util:kv(foo,2)], state=r_met, q_reply=foo, - replies_by_node =[{undefined, foo}, {undefined, foo}] + node_id_revs =[{undefined, foo}] }}, handle_message(foo, Worker1, Acc0#acc{ workers=[Worker0, Worker1], replies=[fabric_util:kv(foo,1)], - replies_by_node =[{undefined, foo}] + node_id_revs =[{undefined, foo}] }) ), @@ -387,7 +390,7 @@ handle_message_reply_test() -> replies=[fabric_util:kv(foo,1)], state=r_met, q_reply=foo, - replies_by_node =[{undefined, foo}] + node_id_revs =[] }}, handle_message(foo, Worker0, Acc0#acc{r=1}) ), @@ -398,13 +401,12 @@ handle_message_reply_test() -> replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)], state=r_met, q_reply=foo, - replies_by_node =[{undefined, foo}, {undefined, foo}, - {undefined, bar}] + node_id_revs =[{undefined, foo}, {undefined, bar}] }}, handle_message(foo, Worker0, Acc0#acc{ workers=[Worker0], replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)], - replies_by_node =[{undefined, foo}, {undefined, bar}] + node_id_revs =[{undefined, foo}, {undefined, bar}] }) ), diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl index 096722f..bb93568 100644 --- a/src/fabric/src/fabric_doc_open_revs.erl +++ b/src/fabric/src/fabric_doc_open_revs.erl @@ -29,6 +29,7 @@ revs, latest, replies = [], + node_id_revs = [], repair = false }). @@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) -> worker_count = WorkerCount, workers = Workers, replies = PrevReplies, + node_id_revs = PrevNodeIdRevs, r = R, revs = Revs, latest = Latest, @@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) -> IsTree = Revs == all orelse Latest, % Do not count error replies when checking quorum - RealReplyCount = ReplyCount + 1 - ReplyErrorCount, QuorumReplies = RealReplyCount >= R, {NewReplies, QuorumMet, Repair} = case IsTree of @@ -102,11 +103,21 @@ handle_message({ok, RawReplies}, Worker, State) -> NumLeafs = couch_key_tree:count_leafs(PrevReplies), SameNumRevs = length(RawReplies) == NumLeafs, QMet = AllInternal andalso SameNumRevs andalso QuorumReplies, - {NewReplies0, QMet, Repair0}; + % Don't set repair=true on the first reply + {NewReplies0, QMet, (ReplyCount > 0) and Repair0}; false -> {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies), {NewReplies0, MinCount >= R, false} end, + NewNodeIdRevs = if Worker == nil -> PrevNodeIdRevs; true -> + IdRevs = lists:foldl(fun + ({ok, #doc{id = Id, revs = {Pos, [Rev | _]}}}, Acc) -> + [{Id, {Pos, Rev}} | Acc]; + (_, Acc) -> + Acc + end, [], RawReplies), + [{Worker#shard.node, IdRevs} | PrevNodeIdRevs] + end, Complete = (ReplyCount =:= (WorkerCount - 1)), @@ -117,6 +128,7 @@ handle_message({ok, RawReplies}, Worker, State) -> DbName, IsTree, NewReplies, + NewNodeIdRevs, ReplyCount + 1, InRepair orelse Repair ), @@ -124,6 +136,7 @@ handle_message({ok, RawReplies}, Worker, State) -> false -> {ok, State#state{ replies = NewReplies, + node_id_revs = NewNodeIdRevs, reply_count = ReplyCount + 1, workers = lists:delete(Worker, Workers), repair = InRepair orelse Repair @@ -180,7 +193,7 @@ dict_replies(Dict, [Reply | Rest]) -> dict_replies(NewDict, Rest). -maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> +maybe_read_repair(Db, IsTree, Replies, NodeIdRevs, ReplyCount, DoRepair) -> Docs = case IsTree of true -> tree_repair_docs(Replies, DoRepair); false -> dict_repair_docs(Replies, ReplyCount) @@ -189,7 +202,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> [] -> ok; _ -> - erlang:spawn(fun() -> read_repair(Db, Docs) end) + erlang:spawn(fun() -> read_repair(Db, Docs, NodeIdRevs) end) end. @@ -208,8 +221,9 @@ dict_repair_docs(Replies, ReplyCount) -> end. -read_repair(Db, Docs) -> - Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]), +read_repair(Db, Docs, NodeIdRevs) -> + Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}], + Res = fabric:update_docs(Db, Docs, Opts), case Res of {ok, []} -> couch_stats:increment_counter([fabric, read_repairs, success]); @@ -268,20 +282,24 @@ filter_reply(Replies) -> setup() -> config:start_link([]), meck:new([fabric, couch_stats, couch_log]), + meck:new(fabric_util, [passthrough]), meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), meck:expect(couch_stats, increment_counter, fun(_) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end). + meck:expect(couch_log, notice, fun(_, _) -> ok end), + meck:expect(fabric_util, cleanup, fun(_) -> ok end). + teardown(_) -> - (catch meck:unload([fabric, couch_stats, couch_log])), + (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])), config:stop(). state0(Revs, Latest) -> #state{ worker_count = 3, - workers = [w1, w2, w3], + workers = + [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}], r = 2, revs = Revs, latest = Latest @@ -334,27 +352,35 @@ open_doc_revs_test_() -> check_empty_response_not_quorum() -> % Simple smoke test that we don't think we're % done with a first empty response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{workers = [w2, w3]}}, - handle_message({ok, []}, w1, state0(all, false)) + {ok, #state{workers = [W2, W3]}}, + handle_message({ok, []}, W1, state0(all, false)) ). check_basic_response() -> % Check that we've handle a response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false)) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false)) ). check_finish_quorum() -> % Two messages with the same revisions means we're done ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1)) + ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1)) end). @@ -363,11 +389,13 @@ check_finish_quorum_newer() -> % foo1 should count for foo2 which means we're finished. % We also validate that read_repair was triggered. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo2()]}, ok = meck:reset(fabric), - ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)), + ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)), ok = meck:wait(fabric, update_docs, '_', 5000), ?assertMatch( [{_, {fabric, update_docs, [_, _, _]}, _}], @@ -380,11 +408,14 @@ check_no_quorum_on_second() -> % Quorum not yet met for the foo revision so we % would wait for w3 ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), ?assertMatch( - {ok, #state{workers = [w3]}}, - handle_message({ok, [bar1()]}, w2, S1) + {ok, #state{workers = [W3]}}, + handle_message({ok, [bar1()]}, W2, S1) ) end). @@ -394,11 +425,14 @@ check_done_on_third() -> % what. Every revision seen in this pattern should be % included. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), - {ok, S2} = handle_message({ok, [bar1()]}, w2, S1), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), + {ok, S2} = handle_message({ok, [bar1()]}, W2, S1), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2)) + ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2)) end). @@ -407,108 +441,128 @@ check_done_on_third() -> check_specific_revs_first_msg() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), false), ?assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0) ) end). check_revs_done_on_agreement() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), false), Msg = {ok, [foo1(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg, w1, S0), + {ok, S1} = handle_message(Msg, W1, S0), Expect = {stop, [bar1(), foo1(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg, w2, S1)) + ?assertEqual(Expect, handle_message(Msg, W2, S1)) end). check_latest_true() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo2(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg1, w1, S0), + {ok, S1} = handle_message(Msg1, W1, S0), Expect = {stop, [bar1(), foo2(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg2, w2, S1)) + ?assertEqual(Expect, handle_message(Msg2, W2, S1)) end). check_ancestor_counted_in_quorum() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, Expect = {stop, [bar1(), foo2(), bazNF()]}, % Older first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % Newer first - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_not_found_counts_for_descendant() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, % not_found first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % not_found second - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_worker_error_skipped() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), baz1()]}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_quorum_only_counts_valid_responses() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_empty_list_when_no_workers_reply() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil}, Expect = {stop, all_workers_died}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index a64d546..783764e 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -244,15 +244,16 @@ update_docs(DbName, Docs0, Options) -> true -> replicated_changes; _ -> interactive_edit end, - DocsByNode = couch_util:get_value(read_repair, Options), - case {X, DocsByNode} of - {_, undefined} -> - Docs = make_att_readers(Docs0), - with_db(DbName, Options, - {couch_db, update_docs, [Docs, Options, X]}); - {replicated_changes, _} -> - update_docs_read_repair(DbName, DocsByNode, Options) - end. + NodeIdRevs = couch_util:get_value(read_repair, Options), + Docs1 = case {X, is_list(NodeIdRevs)} of + {replicated_changes, true} -> + read_repair_filter(DbName, NodeIdRevs, Docs0, Options); + _ -> + Docs0 + end, + Docs2 = make_att_readers(Docs1), + with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, X]}). + get_purge_seq(DbName, Options) -> with_db(DbName, Options, {couch_db, get_purge_seq, []}). @@ -314,72 +315,124 @@ with_db(DbName, Options, {M,F,A}) -> end. -update_docs_read_repair(DbName, DocsByNode, Options) -> +read_repair_filter(DbName, NodeIdRevs, Docs, Options) -> set_io_priority(DbName, Options), case get_or_create_db(DbName, Options) of - {ok, Db} -> - % omit Revisions that have been purged - Docs = filter_purged_revs(Db, DocsByNode), - Docs2 = make_att_readers(Docs), - {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]}, - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, - clean_stack()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) + {ok, Db} -> + try + filter_purged_revs(Db, NodeIdRevs, Docs) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) end. -% given [{Node, Doc}] diff revs of the same DocID from diff nodes -% returns [Doc] filtering out purged docs. -% This is done for read-repair from fabric_doc_open, -% so that not to recreate Docs that have been purged before -% on this node() from Nodes that are out of sync. -filter_purged_revs(Db, DocsByNode) -> - AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100), - {ok, DbPSeq} = couch_db:get_purge_seq(Db), - PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) -> [{Id, Revs}|Acc] end, - V = "v" ++ config:get("purge", "version", "1") ++ "-", - StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem3-"), - EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"), - Opts = [{start_key, StartKey}, {end_key_gt, EndKey}], - % go through _local/purge-mem3-.. docs - % find Node that this LDoc corresponds to - % check if update from Node has not been recently purged on current node - LDocsFoldFun = fun(#doc{body={Props}}, Acc) -> - {VOps} = couch_util:get_value(<<"verify_options">>, Props), - Node = couch_util:get_value(<<"node">>, VOps), - Result = lists:keyfind(Node, 1, DocsByNode), - NewAcc = if not Result -> Acc; true -> - {Node, Doc} = Result, - NodePSeq = couch_util:get_value(<<"purge_seq">>, Props), - if NodePSeq == DbPSeq -> - [Doc|Acc]; - (NodePSeq+AllowedPSeqLag) < DbPSeq -> - % Node is very out of sync, ignore updates from it - Acc; - true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq - % if Doc has been purged recently, than ignore it - {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db, - NodePSeq, PurgeFoldFun, [], []), - {Start, [FirstRevId|_]} = Doc#doc.revs, - DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]}, - case lists:member(DocIdRevs, PurgedIdsRevs) of - true -> Acc; - false -> [Doc|Acc] +% A read repair operation may have been triggered by a node +% that was out of sync with the local node. Thus, any time +% we receive a read repair request we need to check if we +% may have recently purged any of the given revisions and +% ignore them if so. +% +% This is accomplished by looking at the purge infos that we +% have locally that have not been replicated to the remote +% node. The logic here is that we may have received the purge +% request before the remote shard copy. So to check that we +% need to look at the purge infos that we have locally but +% have not yet sent to the remote copy. +% +% NodeIdRevs are the list of {node(), {docid(), [rev()]}} +% tuples passed as the read_repair option to update_docs. +filter_purged_revs(Db, NodeIdRevs, Docs) -> + Nodes0 = [Node || {Node, _IdRevs} <- NodeIdRevs, Node /= node()], + Nodes = lists:usort(Nodes0), + + % Gather the list of {Node, PurgeSeq} pairs for all nodes + % that are present in our read repair group + StartKey = <<?LOCAL_DOC_PREFIX, "/purge-mem3-">>, + Opts = [{start_key, StartKey}], + FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> + case Id of + <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> -> + TargetNodeBin = couch_util:get_value(<<"target_node">>, Props), + PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props), + NewAcc = try + TargetNode = binary_to_existing_atom(TargetNodeBin, latin1), + case lists:member(TargetNode, Nodes) of + true -> + {ok, [{TargetNode, PurgeSeq} | Acc]}; + false -> + {ok, Acc} end - end + catch error:badarg -> + % A really old doc referring to a node that's + % no longer in the cluster + {ok, Acc} + end, + {ok, NewAcc}; + _ -> + % We've processed all _local mem3 purge docs + {stop, Acc} + end + end, + {ok, NodeSeqs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts), + + {ok, DbPSeq} = couch_db:get_purge_seq(Db), + Lag = config:get_integer("couchdb", "read_repair_lag", 100), + + CheckSeqFun = fun({Node, IdRevs}, {GoodToGo, MaybeGood}) -> + NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of + {Node, PS} -> PS; + false -> 0 end, - {ok, NewAcc} + case NodeSeq of + DbPSeq -> + {DocId, Revs} = IdRevs, + NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo, + {NewGTG, MaybeGood}; + _ when NodeSeq >= DbPSeq - Lag -> + {GoodToGo, [{NodeSeq, IdRevs} | MaybeGood]}; + _ -> + % The remote node `Node` is so far out of date + % we'll just ignore its read-repair updates rather + % than scan an unbounded number of purge infos + {GoodToGo, MaybeGood} + end + end, + {TotesGood, NeedChecking} = lists:foldl(CheckSeqFun, {[], []}, NodeIdRevs), + + % For any node that's not up to date with internal + % replication we have to check if any of the revisions + % have been purged before running our updates + RestGood = if NeedChecking == [] -> []; true -> + CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) -> + FilterFun = fun({NS, FiltDocId, FiltRev}) -> + % The `NS =< PSeq` portion of this translates to the + % fact that we haven't yet replicated PSeq to the + % target node, hence we would need to filter this read + % repair update or risk undoing a purge operation. + NS =< PSeq andalso FiltDocId == DocId + andalso lists:member(FiltRev, Revs) + end, + {ok, lists:filter(FilterFun, Acc)} + end, + StartSeq = lists:min([S || {S, _} <- NeedChecking]), + InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) -> + [{NodeSeq, DocId, Rev} || Rev <- Revs] + end, NeedChecking), + {ok, Result} = + couch_db:fold_purge_infos(Db, StartSeq, CheckFoldFun, InitAcc), + [{DocId, Rev} || {_NSeq, DocId, Rev} <- Result] + end, + + % Finally, only return docs that have a revision that + % has not been filtered out of the initial set + AllGood = lists:usort(TotesGood ++ RestGood), + DocFiltFun = fun(#doc{id = Id, revs = {Pos, [Rev | _]}}) -> + lists:member({Id, {Pos, Rev}}, AllGood) end, - {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts), - Docs. + lists:filter(DocFiltFun, Docs). get_or_create_db(DbName, Options) -> -- To stop receiving notification emails like this one, please contact dav...@apache.org.