Add ability to gracefully terminate existing changes feeds

Calling fabric:end_changes() will cause all current changes
feeds to gracefully exit without preventing new changes requests from
starting (unlike maintenance_mode).

BugzID: 45762

This is a cherry-pick of:

https://github.com/cloudant/fabric/commit/1b45cede8f11e209f28e3d06b9fda4cbdcd719cc

Conflicts:
        src/fabric.erl
        src/fabric_view_changes.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/383f8f4d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/383f8f4d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/383f8f4d

Branch: refs/heads/merge-diff-from-cloudant-fork
Commit: 383f8f4dc46737fac646a6d90ff03b64c0d94109
Parents: d7d6be8
Author: Robert Newson <rnew...@apache.org>
Authored: Sun Mar 29 13:38:38 2015 +0100
Committer: Mike Wallace <mikewall...@apache.org>
Committed: Wed Jun 3 15:00:56 2015 +0100

----------------------------------------------------------------------
 src/fabric.erl              |  6 +++++-
 src/fabric_view_changes.erl | 17 ++++++++++++++++-
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/383f8f4d/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index 25205f8..26b9f62 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -29,7 +29,7 @@
 
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
-    query_view/6, get_view_group_info/2]).
+    query_view/6, get_view_group_info/2, end_changes/0]).
 
 % miscellany
 -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
@@ -362,6 +362,10 @@ query_view(DbName, DDoc, ViewName, Callback, Acc0, 
QueryArgs0) ->
 get_view_group_info(DbName, DesignId) ->
     fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
 
+-spec end_changes() -> ok.
+end_changes() ->
+    fabric_view_changes:increment_changes_epoch().
+
 %% @doc retrieve all the design docs from a database
 -spec design_docs(dbname()) -> {ok, [json_obj()]}.
 design_docs(DbName) ->

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/383f8f4d/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 92d08e7..7e6666c 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -13,6 +13,7 @@
 -module(fabric_view_changes).
 
 -export([go/5, pack_seqs/1, unpack_seqs/2]).
+-export([increment_changes_epoch/0]).
 
 %% exported for upgrade purposes.
 -export([keep_sending_changes/8]).
@@ -37,6 +38,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == 
"continuous" orelse
         UpdateListener = {spawn_link(fabric_db_update_listener, go,
                                      [Parent, Ref, DbName, Timeout]),
                           Ref},
+        put(changes_epoch, get_changes_epoch()),
         try
             keep_sending_changes(
                 DbName,
@@ -86,8 +88,9 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, 
Timeout, UpListen, T0)
     } = Collector,
     LastSeq = pack_seqs(NewSeqs),
     MaintenanceMode = config:get("couchdb", "maintenance_mode"),
+    NewEpoch = get_changes_epoch() > erlang:get(changes_epoch),
     if Limit > Limit2, Feed == "longpoll";
-      MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
+      MaintenanceMode == "true"; MaintenanceMode == "nolb"; NewEpoch ->
         Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
     true ->
         WaitForUpdate = wait_db_updated(UpListen),
@@ -459,6 +462,18 @@ validate_start_seq(DbName, Seq) ->
             {error, {bad_request, Reason}}
     end.
 
+get_changes_epoch() ->
+    case application:get_env(fabric, changes_epoch) of
+        undefined ->
+            increment_changes_epoch(),
+            get_changes_epoch();
+        {ok, Epoch} ->
+            Epoch
+    end.
+
+increment_changes_epoch() ->
+    application:set_env(fabric, changes_epoch, os:timestamp()).
+
 unpack_seqs_test() ->
     meck:new(mem3),
     meck:new(fabric_view),

Reply via email to