Cleanly stop replication at checkpoint time if no longer owner This is a cherry-pick of:
https://github.com/cloudant/couch_replicator/commit/e5ef7c8a0ee2566b9cd4c02397ee94883d015fa0 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/ce1934f0 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/ce1934f0 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/ce1934f0 Branch: refs/heads/master Commit: ce1934f080387bfba287c1c2863b611e1d030f00 Parents: fb4da8d Author: Robert Newson <rnew...@apache.org> Authored: Fri May 15 16:31:24 2015 +0100 Committer: Mike Wallace <mikewall...@apache.org> Committed: Fri Jun 5 17:20:26 2015 +0100 ---------------------------------------------------------------------- src/couch_replicator.erl | 21 ++++++++++++++------- src/couch_replicator_manager.erl | 13 ++++++++++++- 2 files changed, 26 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ce1934f0/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl index 6e86a8f..3a744cd 100644 --- a/src/couch_replicator.erl +++ b/src/couch_replicator.erl @@ -475,13 +475,20 @@ handle_cast({db_compacted, DbName}, {noreply, State#rep_state{target = NewTarget}}; handle_cast(checkpoint, State) -> - case do_checkpoint(State) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {noreply, NewState#rep_state{timer = start_timer(State)}}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} + #rep_state{rep_details = #rep{id = RepId} = Rep} = State, + case couch_replicator_manager:owner(RepId) of + Owner when Owner == node() -> + case do_checkpoint(State) of + {ok, NewState} -> + couch_stats:increment_counter([couch_replicator, checkpoints, success]), + {noreply, NewState#rep_state{timer = start_timer(State)}}; + Error -> + couch_stats:increment_counter([couch_replicator, checkpoints, failure]), + {stop, Error, State} + end; + Owner -> + couch_replicator_manager:replication_usurped(Rep, Owner), + {stop, shutdown, State} end; handle_cast({report_seq, Seq}, http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ce1934f0/src/couch_replicator_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl index 6e83879..7bb5078 100644 --- a/src/couch_replicator_manager.erl +++ b/src/couch_replicator_manager.erl @@ -17,7 +17,7 @@ % public API -export([replication_started/1, replication_completed/2, replication_error/2]). --export([owner/1]). +-export([owner/1, replication_usurped/2]). -export([before_doc_update/2, after_doc_read/2]). @@ -108,6 +108,17 @@ replication_completed(#rep{id = RepId}, Stats) -> end. +replication_usurped(#rep{id = RepId}, By) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)", + [pp_rep_id(RepId), By, DocId]) + end. + + replication_error(#rep{id = {BaseId, _} = RepId}, Error) -> case rep_state(RepId) of nil ->