Limit the duration of a continuous _changes feed
This patch enables fabric to terminate long-running continuous or
longpoll _changes feeds if the accumulated response time has exceeded a
configurable value. When that happens fabric will execute the callback
function with {stop, LastSeq} at the next opportunity where it would
normally send a heartbeat.
[fabric]
changes_duration = 300000
BugzID: 17371
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c6ad114e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c6ad114e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c6ad114e
Branch: refs/heads/import
Commit: c6ad114ec797874a7eccf741fecb7139b117948b
Parents: 0553863
Author: Adam Kocoloski <[email protected]>
Authored: Mon Feb 18 21:33:12 2013 -0500
Committer: Adam Kocoloski <[email protected]>
Committed: Mon Feb 18 22:18:57 2013 -0500
----------------------------------------------------------------------
src/fabric_view_changes.erl | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c6ad114e/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8843562..c28c9b6 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -44,7 +44,8 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed ==
"continuous" orelse
Since,
Acc,
Timeout,
- UpdateListener
+ UpdateListener,
+ os:timestamp()
)
after
fabric_db_update_listener:stop(UpdateListener)
@@ -72,7 +73,7 @@ go(DbName, "normal", Options, Callback, Acc0) ->
Callback(Error, Acc0)
end.
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) ->
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen,
T0) ->
#changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn,
Timeout),
#collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
@@ -80,8 +81,15 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn,
Timeout, UpListen) ->
if Limit > Limit2, Feed == "longpoll" ->
Callback({stop, LastSeq}, AccOut);
true ->
- case {Heartbeat, wait_db_updated(UpListen)} of
- {undefined, timeout} ->
+ WaitForUpdate = wait_db_updated(UpListen),
+ AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
+ Max = list_to_integer(
+ config:get("fabric", "changes_duration", "300000")
+ ),
+ case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
+ {undefined, _, timeout} ->
+ Callback({stop, LastSeq}, AccOut);
+ {_, true, timeout} ->
Callback({stop, LastSeq}, AccOut);
_ ->
{ok, AccTimeout} = Callback(timeout, AccOut),
@@ -92,7 +100,8 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn,
Timeout, UpListen) ->
LastSeq,
AccTimeout,
Timeout,
- UpListen
+ UpListen,
+ T0
)
end
end.