Repository: couchdb-fabric Updated Branches: refs/heads/windsor-merge 094ce20f6 -> ac8198092
Fix use of the rexi:stream2 API Bit of a mixup during the merge on this one as it included both the switch to stream2 as well as the switch to using couch_mrview. Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ac819809 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ac819809 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ac819809 Branch: refs/heads/windsor-merge Commit: ac8198092a1d23402c81e74607d8a0f613131c0f Parents: 094ce20 Author: Paul J. Davis <paul.joseph.da...@gmail.com> Authored: Thu Aug 14 13:12:15 2014 -0500 Committer: Paul J. Davis <paul.joseph.da...@gmail.com> Committed: Thu Aug 14 13:12:15 2014 -0500 ---------------------------------------------------------------------- src/fabric_rpc.erl | 64 ++++++++++------------------------------- src/fabric_view_reduce.erl | 3 +- 2 files changed, 17 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_rpc.erl ---------------------------------------------------------------------- diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index fd5b3a7..17dc19b 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -253,14 +253,8 @@ get_or_create_db(DbName, Options) -> view_cb({meta, Meta}, Acc) -> % Map function starting - case rexi:stream2({meta, Meta}) of - ok -> - {ok, Acc}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end; + ok = rexi:stream2({meta, Meta}), + {ok, Acc}; view_cb({row, Row}, Acc) -> % Adding another row ViewRow = #view_row{ @@ -269,59 +263,31 @@ view_cb({row, Row}, Acc) -> value = couch_util:get_value(value, Row), doc = couch_util:get_value(doc, Row) }, - case rexi:stream2(ViewRow) of - ok -> - {ok, Acc}; - timeout -> - exit(timeout) - end; + ok = rexi:stream2(ViewRow), + {ok, Acc}; view_cb(complete, Acc) -> % Finish view output - rexi:reply(complete), + ok = rexi:stream_last(complete), {ok, Acc}. reduce_cb({meta, Meta}, Acc) -> % Map function starting - case rexi:sync_reply({meta, Meta}) of - ok -> - {ok, Acc}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end; + ok = rexi:stream2({meta, Meta}), + {ok, Acc}; reduce_cb({row, Row}, Acc) -> % Adding another row - Key = couch_util:get_value(key, Row), - Value = couch_util:get_value(value, Row), - send(Key, Value, Acc); + ok = rexi:stream2(#view_row{ + key = couch_util:get_value(key, Row), + value = couch_util:get_value(value, Row) + }), + {ok, Acc}; reduce_cb(complete, Acc) -> % Finish view output - rexi:reply(complete), + ok = rexi:stream_last(complete), {ok, Acc}. -send(Key, Value, Acc) -> - case put(fabric_sent_first_row, true) of - undefined -> - case rexi:stream2(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end; - true -> - case rexi:stream2(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc}; - timeout -> - exit(timeout) - end - end. - changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) -> {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}}; changes_enumerator(DocInfo, Acc) -> @@ -347,8 +313,8 @@ changes_enumerator(DocInfo, Acc) -> {deleted, Del} | if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end ]}, - Go = rexi:stream2(ChangesRow), - {Go, Acc#cacc{seq = Seq, pending = Pending-1}} + ok = rexi:stream2(ChangesRow), + {ok, Acc#cacc{seq = Seq, pending = Pending-1}} end. doc_member(Shard, DocInfo, Opts) -> http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_view_reduce.erl ---------------------------------------------------------------------- diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index 583c8ff..8fb5b0b 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -107,7 +107,8 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> %% message as a clean way to indicate to couch_mrview_http:view_cb that the %% reduce response is starting. handle_message({meta, Meta}, {_Worker, From}, State) -> - gen_server:reply(From, ok), + rexi:stream_ack(From), + #collector{ callback = Callback, user_acc = AccIn