This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 135e6bcac30208ea31a8b14d8587829057a0ad9a
Author: Paul J. Davis <paul.joseph.da...@gmail.com>
AuthorDate: Tue Apr 24 12:27:32 2018 -0500

    Fix read-repair for new clustered purge
---
 src/fabric/src/fabric_doc_open.erl      |  40 +++----
 src/fabric/src/fabric_doc_open_revs.erl | 146 +++++++++++++++++--------
 src/fabric/src/fabric_rpc.erl           | 187 ++++++++++++++++++++------------
 3 files changed, 241 insertions(+), 132 deletions(-)

diff --git a/src/fabric/src/fabric_doc_open.erl 
b/src/fabric/src/fabric_doc_open.erl
index b974880..0f8e377 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,8 +25,8 @@
     r,
     state,
     replies,
-    q_reply,
-    replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged
+    node_id_revs = [],
+    q_reply
 }).
 
 
@@ -84,8 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
-    NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
+    NewNodeIdRevs = case Reply of
+        {ok, #doc{id = Id, revs = {Pos, [Rev | _]}}} ->
+            [{Worker#shard.node, {Id, [{Pos, Rev}]}} | Acc#acc.node_id_revs];
+        _ ->
+            Acc#acc.node_id_revs
+    end,
+    NewAcc = Acc#acc{replies = NewReplies, node_id_revs = NewNodeIdRevs},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -124,15 +129,14 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_id_revs=NodeIdRevs}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
-    NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
+        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
@@ -323,7 +327,7 @@ handle_message_reply_test() ->
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
             replies=[fabric_util:kv(foo,1)],
-            replies_by_node=[{undefined, foo}]
+            node_id_revs=[]
         }},
         handle_message(foo, Worker2, Acc0)
     ),
@@ -332,7 +336,7 @@ handle_message_reply_test() ->
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
-            replies_by_node=[{undefined, bar}]
+            node_id_revs=[]
         }},
         handle_message(bar, Worker2, Acc0#acc{
             replies=[fabric_util:kv(foo,1)]
@@ -344,8 +348,7 @@ handle_message_reply_test() ->
     % is returned. Bit subtle on the assertions here.
 
     ?assertEqual(
-        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)],
-            replies_by_node=[{undefined, foo}]}},
+        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}},
         handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
     ),
 
@@ -353,12 +356,12 @@ handle_message_reply_test() ->
         {stop, Acc0#acc{
             workers=[],
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
-            replies_by_node =[{undefined, bar}, {undefined, foo}]
+             node_id_revs =[{undefined, foo}]
         }},
         handle_message(bar, Worker0, Acc0#acc{
             workers=[Worker0],
             replies=[fabric_util:kv(foo,1)],
-            replies_by_node=[{undefined, foo}]
+            node_id_revs=[{undefined, foo}]
         })
     ),
 
@@ -371,12 +374,12 @@ handle_message_reply_test() ->
             replies=[fabric_util:kv(foo,2)],
             state=r_met,
             q_reply=foo,
-            replies_by_node =[{undefined, foo}, {undefined, foo}]
+            node_id_revs =[{undefined, foo}]
         }},
         handle_message(foo, Worker1, Acc0#acc{
             workers=[Worker0, Worker1],
             replies=[fabric_util:kv(foo,1)],
-            replies_by_node =[{undefined, foo}]
+            node_id_revs =[{undefined, foo}]
         })
     ),
 
@@ -387,7 +390,7 @@ handle_message_reply_test() ->
             replies=[fabric_util:kv(foo,1)],
             state=r_met,
             q_reply=foo,
-            replies_by_node =[{undefined, foo}]
+            node_id_revs =[]
         }},
         handle_message(foo, Worker0, Acc0#acc{r=1})
     ),
@@ -398,13 +401,12 @@ handle_message_reply_test() ->
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)],
             state=r_met,
             q_reply=foo,
-            replies_by_node =[{undefined, foo}, {undefined, foo},
-                {undefined, bar}]
+            node_id_revs =[{undefined, foo}, {undefined, bar}]
         }},
         handle_message(foo, Worker0, Acc0#acc{
             workers=[Worker0],
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
-            replies_by_node =[{undefined, foo}, {undefined, bar}]
+            node_id_revs =[{undefined, foo}, {undefined, bar}]
         })
     ),
 
diff --git a/src/fabric/src/fabric_doc_open_revs.erl 
b/src/fabric/src/fabric_doc_open_revs.erl
index 096722f..bb93568 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,6 +29,7 @@
     revs,
     latest,
     replies = [],
+    node_id_revs = [],
     repair = false
 }).
 
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         worker_count = WorkerCount,
         workers = Workers,
         replies = PrevReplies,
+        node_id_revs = PrevNodeIdRevs,
         r = R,
         revs = Revs,
         latest = Latest,
@@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) ->
     IsTree = Revs == all orelse Latest,
 
     % Do not count error replies when checking quorum
-
     RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
     QuorumReplies = RealReplyCount >= R,
     {NewReplies, QuorumMet, Repair} = case IsTree of
@@ -102,11 +103,21 @@ handle_message({ok, RawReplies}, Worker, State) ->
             NumLeafs = couch_key_tree:count_leafs(PrevReplies),
             SameNumRevs = length(RawReplies) == NumLeafs,
             QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
-            {NewReplies0, QMet, Repair0};
+            % Don't set repair=true on the first reply
+            {NewReplies0, QMet, (ReplyCount > 0) and Repair0};
         false ->
             {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
             {NewReplies0, MinCount >= R, false}
     end,
+    NewNodeIdRevs = if Worker == nil -> PrevNodeIdRevs; true ->
+        IdRevs = lists:foldl(fun
+            ({ok, #doc{id = Id, revs = {Pos, [Rev | _]}}}, Acc) ->
+                [{Id, {Pos, Rev}} | Acc];
+            (_, Acc) ->
+                Acc
+        end, [], RawReplies),
+        [{Worker#shard.node, IdRevs} | PrevNodeIdRevs]
+    end,
 
     Complete = (ReplyCount =:= (WorkerCount - 1)),
 
@@ -117,6 +128,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
                     DbName,
                     IsTree,
                     NewReplies,
+                    NewNodeIdRevs,
                     ReplyCount + 1,
                     InRepair orelse Repair
                 ),
@@ -124,6 +136,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         false ->
             {ok, State#state{
                 replies = NewReplies,
+                node_id_revs = NewNodeIdRevs,
                 reply_count = ReplyCount + 1,
                 workers = lists:delete(Worker, Workers),
                 repair = InRepair orelse Repair
@@ -180,7 +193,7 @@ dict_replies(Dict, [Reply | Rest]) ->
     dict_replies(NewDict, Rest).
 
 
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeIdRevs, ReplyCount, DoRepair) ->
     Docs = case IsTree of
         true -> tree_repair_docs(Replies, DoRepair);
         false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +202,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, 
DoRepair) ->
         [] ->
             ok;
         _ ->
-            erlang:spawn(fun() -> read_repair(Db, Docs) end)
+            erlang:spawn(fun() -> read_repair(Db, Docs, NodeIdRevs) end)
     end.
 
 
@@ -208,8 +221,9 @@ dict_repair_docs(Replies, ReplyCount) ->
     end.
 
 
-read_repair(Db, Docs) ->
-    Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeIdRevs) ->
+    Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
+    Res = fabric:update_docs(Db, Docs, Opts),
     case Res of
         {ok, []} ->
             couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +282,24 @@ filter_reply(Replies) ->
 setup() ->
     config:start_link([]),
     meck:new([fabric, couch_stats, couch_log]),
+    meck:new(fabric_util, [passthrough]),
     meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
     meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+    meck:expect(couch_log, notice, fun(_, _) -> ok end),
+    meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
 
 
 teardown(_) ->
-    (catch meck:unload([fabric, couch_stats, couch_log])),
+    (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
     config:stop().
 
 
 state0(Revs, Latest) ->
     #state{
         worker_count = 3,
-        workers = [w1, w2, w3],
+        workers =
+            [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
         r = 2,
         revs = Revs,
         latest = Latest
@@ -334,27 +352,35 @@ open_doc_revs_test_() ->
 check_empty_response_not_quorum() ->
     % Simple smoke test that we don't think we're
     % done with a first empty response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{workers = [w2, w3]}},
-        handle_message({ok, []}, w1, state0(all, false))
+        {ok, #state{workers = [W2, W3]}},
+        handle_message({ok, []}, W1, state0(all, false))
     ).
 
 
 check_basic_response() ->
     % Check that we've handle a response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{reply_count = 1, workers = [w2, w3]}},
-        handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+        {ok, #state{reply_count = 1, workers = [W2, W3]}},
+        handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
     ).
 
 
 check_finish_quorum() ->
     % Two messages with the same revisions means we're done
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
     end).
 
 
@@ -363,11 +389,13 @@ check_finish_quorum_newer() ->
     % foo1 should count for foo2 which means we're finished.
     % We also validate that read_repair was triggered.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo2()]},
         ok = meck:reset(fabric),
-        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
         ok = meck:wait(fabric, update_docs, '_', 5000),
         ?assertMatch(
             [{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +408,14 @@ check_no_quorum_on_second() ->
     % Quorum not yet met for the foo revision so we
     % would wait for w3
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         ?assertMatch(
-            {ok, #state{workers = [w3]}},
-            handle_message({ok, [bar1()]}, w2, S1)
+            {ok, #state{workers = [W3]}},
+            handle_message({ok, [bar1()]}, W2, S1)
         )
     end).
 
@@ -394,11 +425,14 @@ check_done_on_third() ->
     % what. Every revision seen in this pattern should be
     % included.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
-        {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+        {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+        ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
     end).
 
 
@@ -407,108 +441,128 @@ check_done_on_third() ->
 
 check_specific_revs_first_msg() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), false),
         ?assertMatch(
-            {ok, #state{reply_count = 1, workers = [w2, w3]}},
-            handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+            {ok, #state{reply_count = 1, workers = [W2, W3]}},
+            handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
         )
     end).
 
 
 check_revs_done_on_agreement() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), false),
         Msg = {ok, [foo1(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg, w1, S0),
+        {ok, S1} = handle_message(Msg, W1, S0),
         Expect = {stop, [bar1(), foo1(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg, W2, S1))
     end).
 
 
 check_latest_true() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo2(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg1, w1, S0),
+        {ok, S1} = handle_message(Msg1, W1, S0),
         Expect = {stop, [bar1(), foo2(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1))
     end).
 
 
 check_ancestor_counted_in_quorum() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
         Expect = {stop, [bar1(), foo2(), bazNF()]},
 
         % Older first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % Newer first
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_not_found_counts_for_descendant() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
         % not_found first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % not_found second
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_worker_error_skipped() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), baz1()]},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_quorum_only_counts_valid_responses() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_empty_list_when_no_workers_reply() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
         Expect = {stop, all_workers_died},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a64d546..783764e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -244,15 +244,16 @@ update_docs(DbName, Docs0, Options) ->
         true -> replicated_changes;
         _ -> interactive_edit
     end,
-    DocsByNode = couch_util:get_value(read_repair, Options),
-    case {X, DocsByNode} of
-        {_, undefined} ->
-            Docs = make_att_readers(Docs0),
-            with_db(DbName, Options,
-                {couch_db, update_docs, [Docs, Options, X]});
-        {replicated_changes, _} ->
-            update_docs_read_repair(DbName, DocsByNode, Options)
-    end.
+    NodeIdRevs = couch_util:get_value(read_repair, Options),
+    Docs1 = case {X, is_list(NodeIdRevs)} of
+        {replicated_changes, true} ->
+            read_repair_filter(DbName, NodeIdRevs, Docs0, Options);
+        _ ->
+            Docs0
+    end,
+    Docs2 = make_att_readers(Docs1),
+    with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, X]}).
+
 
 get_purge_seq(DbName, Options) ->
     with_db(DbName, Options, {couch_db, get_purge_seq, []}).
@@ -314,72 +315,124 @@ with_db(DbName, Options, {M,F,A}) ->
     end.
 
 
-update_docs_read_repair(DbName, DocsByNode, Options) ->
+read_repair_filter(DbName, NodeIdRevs, Docs, Options) ->
     set_io_priority(DbName, Options),
     case get_or_create_db(DbName, Options) of
-    {ok, Db} ->
-        % omit Revisions that have been purged
-        Docs = filter_purged_revs(Db, DocsByNode),
-        Docs2 = make_att_readers(Docs),
-        {M,F,A} = {couch_db, update_docs, [Docs2, Options, 
replicated_changes]},
-        rexi:reply(try
-            apply(M, F, [Db | A])
-        catch Exception ->
-            Exception;
-        error:Reason ->
-            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
-                clean_stack()]),
-            {error, Reason}
-        end);
-    Error ->
-        rexi:reply(Error)
+        {ok, Db} ->
+            try
+                filter_purged_revs(Db, NodeIdRevs, Docs)
+            after
+                couch_db:close(Db)
+            end;
+        Error ->
+            rexi:reply(Error)
     end.
 
 
-% given [{Node, Doc}] diff revs of the same DocID from diff nodes
-% returns [Doc] filtering out purged docs.
-% This is done for read-repair from fabric_doc_open,
-% so that not to recreate Docs that have been purged before
-% on this node() from Nodes that are out of sync.
-filter_purged_revs(Db, DocsByNode) ->
-    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
-    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
-    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
-    V = "v" ++ config:get("purge", "version", "1") ++ "-",
-    StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++  V ++ "mem3-"),
-    EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
-    Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
-    % go through _local/purge-mem3-.. docs
-    % find Node that this LDoc corresponds to
-    % check if update from Node has not been recently purged on current node
-    LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
-        {VOps} = couch_util:get_value(<<"verify_options">>, Props),
-        Node = couch_util:get_value(<<"node">>, VOps),
-        Result = lists:keyfind(Node, 1, DocsByNode),
-        NewAcc = if not Result -> Acc; true ->
-            {Node, Doc} = Result,
-            NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
-            if  NodePSeq == DbPSeq ->
-                    [Doc|Acc];
-                (NodePSeq+AllowedPSeqLag) < DbPSeq ->
-                    % Node is very out of sync, ignore updates from it
-                    Acc;
-                true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
-                    % if Doc has been purged recently, than ignore it
-                    {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
-                            NodePSeq, PurgeFoldFun, [], []),
-                    {Start, [FirstRevId|_]} = Doc#doc.revs,
-                    DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
-                    case lists:member(DocIdRevs, PurgedIdsRevs) of
-                        true -> Acc;
-                        false -> [Doc|Acc]
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeIdRevs are the list of {node(), {docid(), [rev()]}}
+% tuples passed as the read_repair option to update_docs.
+filter_purged_revs(Db, NodeIdRevs, Docs) ->
+    Nodes0 = [Node || {Node, _IdRevs} <- NodeIdRevs, Node /= node()],
+    Nodes = lists:usort(Nodes0),
+
+    % Gather the list of {Node, PurgeSeq} pairs for all nodes
+    % that are present in our read repair group
+    StartKey = <<?LOCAL_DOC_PREFIX, "/purge-mem3-">>,
+    Opts = [{start_key, StartKey}],
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
+                TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
+                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
+                NewAcc = try
+                    TargetNode = binary_to_existing_atom(TargetNodeBin, 
latin1),
+                    case lists:member(TargetNode, Nodes) of
+                        true ->
+                            {ok, [{TargetNode, PurgeSeq} | Acc]};
+                        false ->
+                            {ok, Acc}
                     end
-            end
+                catch error:badarg ->
+                    % A really old doc referring to a node that's
+                    % no longer in the cluster
+                    {ok, Acc}
+                end,
+                {ok, NewAcc};
+            _ ->
+                % We've processed all _local mem3 purge docs
+                {stop, Acc}
+        end
+    end,
+    {ok, NodeSeqs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+
+    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
+    Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+    CheckSeqFun = fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
+        NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of
+            {Node, PS} -> PS;
+            false -> 0
         end,
-        {ok, NewAcc}
+        case NodeSeq of
+            DbPSeq ->
+                {DocId, Revs} = IdRevs,
+                NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo,
+                {NewGTG, MaybeGood};
+            _ when NodeSeq >= DbPSeq - Lag ->
+                {GoodToGo, [{NodeSeq, IdRevs} | MaybeGood]};
+            _ ->
+                % The remote node `Node` is so far out of date
+                % we'll just ignore its read-repair updates rather
+                % than scan an unbounded number of purge infos
+                {GoodToGo, MaybeGood}
+        end
+    end,
+    {TotesGood, NeedChecking} = lists:foldl(CheckSeqFun, {[], []}, NodeIdRevs),
+
+    % For any node that's not up to date with internal
+    % replication we have to check if any of the revisions
+    % have been purged before running our updates
+    RestGood = if NeedChecking == [] -> []; true ->
+        CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
+            FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+                % The `NS =< PSeq` portion of this translates to the
+                % fact that we haven't yet replicated PSeq to the
+                % target node, hence we would need to filter this read
+                % repair update or risk undoing a purge operation.
+                NS =< PSeq andalso FiltDocId == DocId
+                        andalso lists:member(FiltRev, Revs)
+            end,
+            {ok, lists:filter(FilterFun, Acc)}
+        end,
+        StartSeq = lists:min([S || {S, _} <- NeedChecking]),
+        InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
+            [{NodeSeq, DocId, Rev} || Rev <- Revs]
+        end, NeedChecking),
+        {ok, Result} =
+                couch_db:fold_purge_infos(Db, StartSeq, CheckFoldFun, InitAcc),
+        [{DocId, Rev} || {_NSeq, DocId, Rev} <- Result]
+    end,
+
+    % Finally, only return docs that have a revision that
+    % has not been filtered out of the initial set
+    AllGood = lists:usort(TotesGood ++ RestGood),
+    DocFiltFun = fun(#doc{id = Id, revs = {Pos, [Rev | _]}}) ->
+        lists:member({Id, {Pos, Rev}}, AllGood)
     end,
-    {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
-    Docs.
+    lists:filter(DocFiltFun, Docs).
 
 
 get_or_create_db(DbName, Options) ->

-- 
To stop receiving notification emails like this one, please contact
dav...@apache.org.

Reply via email to