Backport changes in fabric_rpc2 To finish the update dance for BugzID: 14075 we need to backport the use of rexi:stream/1 to fabric_rpc.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/73a58646 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/73a58646 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/73a58646 Branch: refs/heads/import Commit: 73a5864600c9c5dedb268dcd7f24067e01583ee5 Parents: ed385ea Author: Paul J. Davis <[email protected]> Authored: Mon Nov 26 13:42:49 2012 -0600 Committer: Paul J. Davis <[email protected]> Committed: Mon Nov 26 13:46:09 2012 -0600 ---------------------------------------------------------------------- src/fabric_rpc.erl | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/73a58646/src/fabric_rpc.erl ---------------------------------------------------------------------- diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index c4c5fa9..5c840f4 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -357,7 +357,7 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) -> true -> Doc = undefined end, - case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of ok -> {ok, Acc#view_acc{limit=Limit-1}}; timeout -> @@ -400,13 +400,23 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> send(Key, Value, #view_acc{limit=Limit} = Acc) -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - stop -> - exit(normal); - timeout -> - exit(timeout) + case put(fabric_sent_first_row, true) of + undefined -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end; + true -> + case rexi:stream(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + timeout -> + exit(timeout) + end end. changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
