Limit the duration of a continuous _changes feed

This patch enables fabric to terminate long-running continuous or
longpoll _changes feeds if the accumulated response time has exceeded a
configurable value.  When that happens fabric will execute the callback
function with {stop, LastSeq} at the next opportunity where it would
normally send a heartbeat.

[fabric]
changes_duration = 300000

BugzID: 17371


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c6ad114e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c6ad114e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c6ad114e

Branch: refs/heads/import
Commit: c6ad114ec797874a7eccf741fecb7139b117948b
Parents: 0553863
Author: Adam Kocoloski <[email protected]>
Authored: Mon Feb 18 21:33:12 2013 -0500
Committer: Adam Kocoloski <[email protected]>
Committed: Mon Feb 18 22:18:57 2013 -0500

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c6ad114e/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8843562..c28c9b6 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -44,7 +44,8 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == 
"continuous" orelse
                 Since,
                 Acc,
                 Timeout,
-                UpdateListener
+                UpdateListener,
+                os:timestamp()
             )
         after
             fabric_db_update_listener:stop(UpdateListener)
@@ -72,7 +73,7 @@ go(DbName, "normal", Options, Callback, Acc0) ->
         Callback(Error, Acc0)
     end.
 
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) ->
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, 
T0) ->
     #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
     {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, 
Timeout),
     #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
@@ -80,8 +81,15 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, 
Timeout, UpListen) ->
     if Limit > Limit2, Feed == "longpoll" ->
         Callback({stop, LastSeq}, AccOut);
     true ->
-        case {Heartbeat, wait_db_updated(UpListen)} of
-        {undefined, timeout} ->
+        WaitForUpdate = wait_db_updated(UpListen),
+        AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
+        Max = list_to_integer(
+            config:get("fabric", "changes_duration", "300000")
+        ),
+        case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
+        {undefined, _, timeout} ->
+            Callback({stop, LastSeq}, AccOut);
+        {_, true, timeout} ->
             Callback({stop, LastSeq}, AccOut);
         _ ->
             {ok, AccTimeout} = Callback(timeout, AccOut),
@@ -92,7 +100,8 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, 
Timeout, UpListen) ->
                 LastSeq,
                 AccTimeout,
                 Timeout,
-                UpListen
+                UpListen,
+                T0
             )
         end
     end.

Reply via email to