Context: This patch used to be in master before 1.3.x branched.
In the course of releasing 1.3.0 we found that this patch breaks on Windows and we didn’t manage to find a fix in time for the release so we decided to back this out of 1.3.x to unblock the release. We left it in master in the hopes that a quick fix would follow. That didn’t happen and now it is time to ship 1.4.0, so Dave backed the patch out then. Best Jan -- On Jul 23, 2013, at 13:17 , [email protected] wrote: > Updated Branches: > refs/heads/1334-revert-feature-view-server-pipelining [created] 6ffc52796 > > > COUCHDB-1334 - revert "More efficient communication with the view server" > > This reverts commit a851c6e > - COUCHDB-1334 breaks with Windows + couchjs in unexplained ways > - reducing to 1 concurrent query server is not sufficient > - Testing with open_port options overlapped_io was not in itself sufficient > - http://erlang.org/doc/man/erlang.html find overlapped_io > - Refer history in COUCHDB-1346 > > > Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo > Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/6ffc5279 > Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/6ffc5279 > Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/6ffc5279 > > Branch: refs/heads/1334-revert-feature-view-server-pipelining > Commit: 6ffc52796182a8d3673f3648c59c7e9abddf4fa2 > Parents: 635022b > Author: Dave Cottlehuber <[email protected]> > Authored: Wed Dec 12 21:37:18 2012 +0100 > Committer: Dave Cottlehuber <[email protected]> > Committed: Tue Jul 23 13:14:33 2013 +0200 > > ---------------------------------------------------------------------- > src/couch_mrview/src/couch_mrview_updater.erl | 46 +++++++++------------- > src/couchdb/couch_native_process.erl | 11 +----- > src/couchdb/couch_os_process.erl | 38 +----------------- > src/couchdb/couch_query_servers.erl | 17 +++----- > 4 files changed, 25 insertions(+), 87 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/couchdb/blob/6ffc5279/src/couch_mrview/src/couch_mrview_updater.erl > ---------------------------------------------------------------------- > diff --git a/src/couch_mrview/src/couch_mrview_updater.erl > b/src/couch_mrview/src/couch_mrview_updater.erl > index 3014664..9604ea9 100644 > --- a/src/couch_mrview/src/couch_mrview_updater.erl > +++ b/src/couch_mrview/src/couch_mrview_updater.erl > @@ -130,42 +130,32 @@ map_docs(Parent, State0) -> > couch_query_servers:stop_doc_map(State0#mrst.qserver), > couch_work_queue:close(State0#mrst.write_queue); > {ok, Dequeued} -> > + % Run all the non deleted docs through the view engine and > + % then pass the results on to the writer process. > State1 = case State0#mrst.qserver of > nil -> start_query_server(State0); > _ -> State0 > end, > - {ok, MapResults} = compute_map_results(State1, Dequeued), > - couch_work_queue:queue(State1#mrst.write_queue, MapResults), > + QServer = State1#mrst.qserver, > + DocFun = fun > + ({nil, Seq, _}, {SeqAcc, Results}) -> > + {erlang:max(Seq, SeqAcc), Results}; > + ({Id, Seq, deleted}, {SeqAcc, Results}) -> > + {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; > + ({Id, Seq, Doc}, {SeqAcc, Results}) -> > + {ok, Res} = couch_query_servers:map_doc_raw(QServer, > Doc), > + {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} > + end, > + FoldFun = fun(Docs, Acc) -> > + update_task(length(Docs)), > + lists:foldl(DocFun, Acc, Docs) > + end, > + Results = lists:foldl(FoldFun, {0, []}, Dequeued), > + couch_work_queue:queue(State1#mrst.write_queue, Results), > map_docs(Parent, State1) > end. > > > -compute_map_results(#mrst{qserver = Qs}, Dequeued) -> > - % Run all the non deleted docs through the view engine and > - % then pass the results on to the writer process. > - DocFun = fun > - ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) -> > - {erlang:max(Seq, SeqAcc), AccDel, AccNotDel}; > - ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) -> > - {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel}; > - ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) -> > - {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]} > - end, > - FoldFun = fun(Docs, Acc) -> > - lists:foldl(DocFun, Acc, Docs) > - end, > - {MaxSeq, DeletedResults, Docs} = > - lists:foldl(FoldFun, {0, [], []}, Dequeued), > - {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs), > - NotDeletedResults = lists:zipwith( > - fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end, > - Docs, > - MapResultList), > - AllMapResults = DeletedResults ++ NotDeletedResults, > - update_task(length(AllMapResults)), > - {ok, {MaxSeq, AllMapResults}}. > - > - > write_results(Parent, State) -> > case couch_work_queue:dequeue(State#mrst.write_queue) of > closed -> > > http://git-wip-us.apache.org/repos/asf/couchdb/blob/6ffc5279/src/couchdb/couch_native_process.erl > ---------------------------------------------------------------------- > diff --git a/src/couchdb/couch_native_process.erl > b/src/couchdb/couch_native_process.erl > index b1d51ed..5a32e75 100644 > --- a/src/couchdb/couch_native_process.erl > +++ b/src/couchdb/couch_native_process.erl > @@ -42,7 +42,7 @@ > > -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, > handle_info/2]). > --export([set_timeout/2, prompt/2, prompt_many/2]). > +-export([set_timeout/2, prompt/2]). > > -define(STATE, native_proc_state). > -record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, > timeout=5000}). > @@ -62,15 +62,6 @@ set_timeout(Pid, TimeOut) -> > prompt(Pid, Data) when is_list(Data) -> > gen_server:call(Pid, {prompt, Data}). > > -prompt_many(Pid, DataList) -> > - prompt_many(Pid, DataList, []). > - > -prompt_many(_Pid, [], Acc) -> > - {ok, lists:reverse(Acc)}; > -prompt_many(Pid, [Data | Rest], Acc) -> > - Result = prompt(Pid, Data), > - prompt_many(Pid, Rest, [Result | Acc]). > - > % gen_server callbacks > init([]) -> > {ok, #evstate{ddocs=dict:new()}}. > > http://git-wip-us.apache.org/repos/asf/couchdb/blob/6ffc5279/src/couchdb/couch_os_process.erl > ---------------------------------------------------------------------- > diff --git a/src/couchdb/couch_os_process.erl > b/src/couchdb/couch_os_process.erl > index 3a267be..db62d49 100644 > --- a/src/couchdb/couch_os_process.erl > +++ b/src/couchdb/couch_os_process.erl > @@ -14,7 +14,7 @@ > -behaviour(gen_server). > > -export([start_link/1, start_link/2, start_link/3, stop/1]). > --export([set_timeout/2, prompt/2, prompt_many/2]). > +-export([set_timeout/2, prompt/2]). > -export([send/2, writeline/2, readline/1, writejson/2, readjson/1]). > -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, > code_change/3]). > > @@ -57,40 +57,6 @@ prompt(Pid, Data) -> > throw(Error) > end. > > -prompt_many(Pid, DataList) -> > - OsProc = gen_server:call(Pid, get_os_proc, infinity), > - true = port_connect(OsProc#os_proc.port, self()), > - try > - send_many(OsProc, DataList), > - receive_many(length(DataList), OsProc, []) > - after > - % Can throw badarg error, when OsProc Pid is dead or port was closed > - % by the readline function on error/timeout. > - (catch port_connect(OsProc#os_proc.port, Pid)), > - unlink(OsProc#os_proc.port), > - drop_port_messages(OsProc#os_proc.port) > - end. > - > -send_many(_OsProc, []) -> > - ok; > -send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) -> > - Writer(OsProc, Data), > - send_many(OsProc, Rest). > - > -receive_many(0, _OsProc, Acc) -> > - {ok, lists:reverse(Acc)}; > -receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) -> > - Line = Reader(OsProc), > - receive_many(N - 1, OsProc, [Line | Acc]). > - > -drop_port_messages(Port) -> > - receive > - {Port, _} -> > - drop_port_messages(Port) > - after 0 -> > - ok > - end. > - > % Utility functions for reading and writing > % in custom functions > writeline(OsProc, Data) when is_record(OsProc, os_proc) -> > @@ -209,8 +175,6 @@ terminate(_Reason, #os_proc{port=Port}) -> > catch port_close(Port), > ok. > > -handle_call(get_os_proc, _From, OsProc) -> > - {reply, OsProc, OsProc}; > handle_call({set_timeout, TimeOut}, _From, OsProc) -> > {reply, ok, OsProc#os_proc{timeout=TimeOut}}; > handle_call({prompt, Data}, _From, OsProc) -> > > http://git-wip-us.apache.org/repos/asf/couchdb/blob/6ffc5279/src/couchdb/couch_query_servers.erl > ---------------------------------------------------------------------- > diff --git a/src/couchdb/couch_query_servers.erl > b/src/couchdb/couch_query_servers.erl > index e29f23b..3b58cbe 100644 > --- a/src/couchdb/couch_query_servers.erl > +++ b/src/couchdb/couch_query_servers.erl > @@ -16,7 +16,7 @@ > -export([start_link/0, config_change/1]). > > -export([init/1, terminate/2, handle_call/3, handle_cast/2, > handle_info/2,code_change/3]). > --export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, > raw_to_ejson/1]). > +-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, > raw_to_ejson/1]). > -export([reduce/3, rereduce/3,validate_doc_update/5]). > -export([filter_docs/5]). > -export([filter_view/3]). > @@ -33,7 +33,6 @@ > lang, > ddoc_keys = [], > prompt_fun, > - prompt_many_fun, > set_timeout_fun, > stop_fun > }). > @@ -84,15 +83,10 @@ map_docs(Proc, Docs) -> > Docs), > {ok, Results}. > > -map_docs_raw(Proc, DocList) -> > - {Mod, Fun} = Proc#proc.prompt_many_fun, > - CommandList = lists:map( > - fun(Doc) -> > - EJson = couch_doc:to_json_obj(Doc, []), > - [<<"map_doc">>, EJson] > - end, > - DocList), > - Mod:Fun(Proc#proc.pid, CommandList). > +map_doc_raw(Proc, Doc) -> > + Json = couch_doc:to_json_obj(Doc, []), > + {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}. > + > > stop_doc_map(nil) -> > ok; > @@ -487,7 +481,6 @@ new_process(Langs, LangLimits, Lang) -> > pid=Pid, > % Called via proc_prompt, proc_set_timeout, and > proc_stop > prompt_fun={Mod, prompt}, > - prompt_many_fun={Mod, prompt_many}, > set_timeout_fun={Mod, set_timeout}, > stop_fun={Mod, stop}}}; > _ -> >
