Backport changes in fabric_rpc2

To finish the update dance for BugzID: 14075 we need to backport the use
of rexi:stream/1 to fabric_rpc.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/73a58646
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/73a58646
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/73a58646

Branch: refs/heads/import
Commit: 73a5864600c9c5dedb268dcd7f24067e01583ee5
Parents: ed385ea
Author: Paul J. Davis <[email protected]>
Authored: Mon Nov 26 13:42:49 2012 -0600
Committer: Paul J. Davis <[email protected]>
Committed: Mon Nov 26 13:46:09 2012 -0600

----------------------------------------------------------------------
 src/fabric_rpc.erl | 26 ++++++++++++++++++--------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/73a58646/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index c4c5fa9..5c840f4 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -357,7 +357,7 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) ->
     true ->
         Doc = undefined
     end,
-    case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+    case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
         ok ->
             {ok, Acc#view_acc{limit=Limit-1}};
         timeout ->
@@ -400,13 +400,23 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when 
I > 0 ->
 
 
 send(Key, Value, #view_acc{limit=Limit} = Acc) ->
-    case rexi:sync_reply(#view_row{key=Key, value=Value}) of
-    ok ->
-        {ok, Acc#view_acc{limit=Limit-1}};
-    stop ->
-        exit(normal);
-    timeout ->
-        exit(timeout)
+    case put(fabric_sent_first_row, true) of
+    undefined ->
+        case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+        ok ->
+            {ok, Acc#view_acc{limit=Limit-1}};
+        stop ->
+            exit(normal);
+        timeout ->
+            exit(timeout)
+        end;
+    true ->
+        case rexi:stream(#view_row{key=Key, value=Value}) of
+        ok ->
+            {ok, Acc#view_acc{limit=Limit-1}};
+        timeout ->
+            exit(timeout)
+        end
     end.
 
 changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->

Reply via email to