Author: fdmanana
Date: Tue Nov 23 15:52:31 2010
New Revision: 1038176

URL: http://svn.apache.org/viewvc?rev=1038176&view=rev
Log:
Replicator DB: stop all ongoing replications when the replicator DB is deleted 
or renamed (in the .ini config).
Also updated the gen_server to stop when it receives unexpected calls.


Modified:
    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1038176&r1=1038175&r2=1038176&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Tue Nov 23 15:52:31 2010
@@ -22,9 +22,10 @@
 -define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
 
 -record(state, {
-    changes_feed_loop,
-    changes_queue,
-    changes_processor
+    changes_feed_loop = nil,
+    changes_queue = nil,
+    changes_processor = nil,
+    db_notifier = nil
 }).
 
 
@@ -40,35 +41,57 @@ init(_) ->
     Server = self(),
     ok = couch_config:register(
         fun("replicator", "db") ->
-            ok = gen_server:call(Server, rep_db_changed, infinity)
+            ok = gen_server:cast(Server, rep_db_changed)
         end
     ),
     {ok, #state{
         changes_feed_loop = Loop,
         changes_queue = Queue,
-        changes_processor = Processor}
+        changes_processor = Processor,
+        db_notifier = db_update_notifier()}
     }.
 
-handle_call(rep_db_changed, _From, State) ->
+
+handle_call(Msg, From, State) ->
+    ?LOG_ERROR("Replicator DB listener receive unexpected call ~p from ~p",
+        [Msg, From]),
+    {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast(rep_db_changed, State) ->
     #state{
         changes_feed_loop = Loop,
         changes_queue = Queue
     } = State,
     exit(Loop, rep_db_changed),
+    couch_work_queue:queue(Queue, stop_all_replications),
     {ok, NewLoop} = changes_feed_loop(Queue),
-    {reply, ok, State#state{changes_feed_loop = NewLoop}}.
+    {noreply, State#state{changes_feed_loop = NewLoop}};
 
+handle_cast(rep_db_created, #state{changes_feed_loop = nil} = State) ->
+    {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
+    {noreply, State#state{changes_feed_loop = NewLoop}};
 
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+handle_cast(Msg, State) ->
+    ?LOG_ERROR("Replicator DB listener receive unexpected cast ~p", [Msg]),
+    {stop, {error, {unexpected_cast, Msg}}, State}.
 
 
+handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) 
->
+    % replicator DB deleted
+    couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
+    {noreply, State#state{changes_feed_loop = nil}};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+    ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
+    {stop, {db_update_notifier_died, Reason}, State};
+
 handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) ->
     {noreply, State};
 
 handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) 
->
     ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
-    {stop, rep_db_changes_processor_error, State}.
+    {stop, {rep_db_changes_processor_died, Reason}, State}.
 
 
 terminate(_Reason, State) ->
@@ -118,6 +141,23 @@ changes_feed_loop(ChangesQueue) ->
     {ok, Pid}.
 
 
+db_update_notifier() ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({created, DbName}) ->
+            case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
+            DbName ->
+                ok = gen_server:cast(Server, rep_db_created);
+            _ ->
+                ok
+            end;
+        (_) ->
+            ok
+        end
+    ),
+    Notifier.
+
+
 changes_processor(ChangesQueue) ->
     Pid = spawn_link(
         fun() ->
@@ -148,6 +188,10 @@ has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _
 has_valid_rep_id(_Else) ->
     true.
 
+process_change(stop_all_replications) ->
+    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
+        "was deleted or changed", []),
+    stop_all_replications();
 
 process_change({Change}) ->
     {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
@@ -257,3 +301,12 @@ stop_replication(DocId) ->
     [] ->
         none
     end.
+
+stop_all_replications() ->
+    ets:foldl(
+        fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
+        ok,
+        ?DOC_TO_REP_ID_MAP
+    ),
+    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
+    true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).


Reply via email to