Monitor changes consumer in the tests
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/e4504205 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/e4504205 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/e4504205 Branch: refs/heads/master Commit: e45042053783b128e246382471699bff89028248 Parents: 5a009b1 Author: Eric Avdey <e...@eiri.ca> Authored: Fri Jun 24 11:57:13 2016 -0300 Committer: Eric Avdey <e...@eiri.ca> Committed: Wed Jul 6 11:27:48 2016 -0300 ---------------------------------------------------------------------- test/couch_changes_tests.erl | 48 ++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e4504205/test/couch_changes_tests.erl ---------------------------------------------------------------------- diff --git a/test/couch_changes_tests.erl b/test/couch_changes_tests.erl index d8adead..4f81a00 100644 --- a/test/couch_changes_tests.erl +++ b/test/couch_changes_tests.erl @@ -692,7 +692,7 @@ save_doc(Db, Json) -> {ok, Rev} = couch_db:update_doc(Db, Doc, []), {ok, couch_doc:rev_to_str(Rev)}. -get_rows(Consumer) -> +get_rows({Consumer, _}) -> Ref = make_ref(), Consumer ! {get_rows, Ref}, Resp = receive @@ -704,7 +704,7 @@ get_rows(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -get_heartbeats(Consumer) -> +get_heartbeats({Consumer, _}) -> Ref = make_ref(), Consumer ! {get_heartbeats, Ref}, Resp = receive @@ -716,7 +716,7 @@ get_heartbeats(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -clear_rows(Consumer) -> +clear_rows({Consumer, _}) -> Ref = make_ref(), Consumer ! {reset, Ref}, Resp = receive @@ -728,7 +728,7 @@ clear_rows(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -stop_consumer(Consumer) -> +stop_consumer({Consumer, _}) -> Ref = make_ref(), Consumer ! {stop, Ref}, Resp = receive @@ -740,7 +740,7 @@ stop_consumer(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -pause(Consumer) -> +pause({Consumer, _}) -> Ref = make_ref(), Consumer ! {pause, Ref}, Resp = receive @@ -752,7 +752,7 @@ pause(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -unpause(Consumer) -> +unpause({Consumer, _}) -> Ref = make_ref(), Consumer ! {continue, Ref}, Resp = receive @@ -764,19 +764,29 @@ unpause(Consumer) -> ?assertNotEqual(timeout, Resp), Resp. -wait_finished(_Consumer) -> - Resp = receive +wait_finished({_, ConsumerRef}) -> + receive {consumer_finished, Rows, LastSeq} -> - {Rows, LastSeq} + {Rows, LastSeq}; + {'DOWN', ConsumerRef, _, _, Msg} when Msg == normal; Msg == ok -> + ok; + {'DOWN', ConsumerRef, _, _, Msg} -> + erlang:error({consumer_died, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, Msg} + ]}) after ?TIMEOUT -> - timeout - end, - ?assertNotEqual(timeout, Resp), - Resp. + erlang:error({consumer_died, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, timeout} + ]}) + end. spawn_consumer(DbName, ChangesArgs0, Req) -> Parent = self(), - spawn(fun() -> + spawn_monitor(fun() -> put(heartbeat_count, 0), Callback = fun ({change, {Change}, _}, _, Acc) -> @@ -804,10 +814,12 @@ spawn_consumer(DbName, ChangesArgs0, Req) -> FeedFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), try FeedFun({Callback, []}) - catch throw:{stop, _} -> - ok - end, - catch couch_db:close(Db) + catch + throw:{stop, _} -> ok; + _:Error -> Error + after + couch_db:close(Db) + end end). maybe_pause(Parent, Acc) ->