Ensure Live node set is consistent with up/down messages BugzID: 46617
This is a cherry-pick of: https://github.com/cloudant/couch_replicator/commit/2418c26b0fa7cffb97c2d8348654c42d6a0f1a06 Conflicts: src/couch_replicator_manager.erl Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/8d69019b Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/8d69019b Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/8d69019b Branch: refs/heads/master Commit: 8d69019b1e4fbaaf425fec8a30542dfee41a5cf1 Parents: 5b630ff Author: Robert Newson <rnew...@apache.org> Authored: Wed May 13 19:40:55 2015 +0100 Committer: Mike Wallace <mikewall...@apache.org> Committed: Fri Jun 5 17:20:20 2015 +0100 ---------------------------------------------------------------------- src/couch_replicator_manager.erl | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/8d69019b/src/couch_replicator_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl index eefa0cd..5369ad5 100644 --- a/src/couch_replicator_manager.erl +++ b/src/couch_replicator_manager.erl @@ -68,7 +68,8 @@ event_listener = nil, scan_pid = nil, rep_start_pids = [], - max_retries + max_retries, + live = [] }). start_link() -> @@ -138,6 +139,7 @@ handle_config_terminate(Self, _, _) -> init(_) -> process_flag(trap_exit, true), net_kernel:monitor_nodes(true), + Live = [node() | nodes()], ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]), ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]), ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]), @@ -153,7 +155,8 @@ init(_) -> scan_pid = ScanPid, max_retries = retries_value( config:get("replicator", "max_replication_retry_count", "10")), - rep_start_pids = [Pid] + rep_start_pids = [Pid], + live = Live }}. @@ -221,11 +224,13 @@ handle_cast(Msg, State) -> couch_log:error("Replication manager received unexpected cast ~p", [Msg]), {stop, {error, {unexpected_cast, Msg}}, State}. -handle_info({nodeup, _Node}, State) -> - {noreply, rescan(State)}; +handle_info({nodeup, Node}, State) -> + Live = lists:usort([Node | State#state.live]), + {noreply, rescan(State#state{live=Live})}; -handle_info({nodedown, _Node}, State) -> - {noreply, rescan(State)}; +handle_info({nodedown, Node}, State) -> + Live = State#state.live -- [Node], + {noreply, rescan(State#state{live=Live})}; handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) -> couch_log:debug("Background scan has completed.", []), @@ -286,6 +291,8 @@ terminate(_Reason, State) -> couch_event:stop_listener(Listener). +code_change(1, State, _Extra) -> + {ok, erlang:append_element(State, [node() | nodes()])}; code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -381,7 +388,7 @@ rescan(#state{scan_pid = ScanPid} = State) -> process_update(State, DbName, {Change}) -> {RepProps} = JsonRepDoc = get_json_value(doc, Change), DocId = get_json_value(<<"_id">>, RepProps), - case {owner(DbName, DocId), get_json_value(deleted, Change, false)} of + case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)} of {_, true} -> rep_doc_deleted(DbName, DocId), State; @@ -406,12 +413,11 @@ process_update(State, DbName, {Change}) -> end end. -owner(<<"shards/", _/binary>> = DbName, DocId) -> - Live = [node()|nodes()], +owner(<<"shards/", _/binary>> = DbName, DocId, Live) -> Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId), lists:member(N, Live)]), node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes)); -owner(_DbName, _DocId) -> +owner(_DbName, _DocId, _Live) -> true. rep_db_update_error(Error, DbName, DocId) ->