On 28/07/2009 12:53 PM, Paul Davis wrote:
Hey,

I zoned out a bit and finished getting the native Erlang view server
passing all of the rspec tests. The code can be found online at [1]
and I'll also attach a patch.

If anyone wants to play with it, that'd be pretty cool. There's
probably a bit of weakness in error reporting right now, but I think
it should work other than that.

Thanks Paul,
I think this is getting very close to being ready for inclusion in the trunk and I'd really love to get it in 0.10. Paul indicated on #couchdb that his only remaining concerns were cosmetic - some error messages and comments need updating (and even they have been tweaked since then) - so I think the current state is good enough for general review and to stimulate some discussion, or even simple agreement it is good enough to run with.

I've put a patch of Paul's branch against the trunk in:
https://issues.apache.org/jira/browse/COUCHDB-377
but also attached a copy here for your convenience.

The patch consists of 3 main parts:

* Changes to couch_query_servers.erl to allow arbitrary erlang code to be used as a query server. The existing calls to couch_os_process have been changed to this more abstract form - IOW, it could be viewed that couch_os_process is now a specialized 'native query server'. Note couch_os_process hasn't changed at all, and that Damien previously reviewed this portion of the patch via Jira and all those comments have been addressed.

* A native (ie, erlang) view server implementation. This has undergone a number of iterations, but the current version exposes an environment similar to the Javascript query server - functions 'Emit', 'Log', 'GetRow' etc are all available to the erlang view functions. Along with normal views, 'list' and 'show' functionality is fully supported and it has comprehensive tests.

* Other misc changes: the ruby view server tests exercise this new server; futon now reads the new 'native_query_servers' config section; makefile changes for the new .erl file. The ruby test script could do with a little more work, but neither Paul or I know Ruby well enough to do this without some help.

There are currently no docs for this as of yet, but if accepted, I will commit to adding a page to the Wiki with documentation and examples using this new server.

So - please let me know what else needs to be done for this to be accepted. If nothing particularly obvious stands out, then please give this patch a review.

Thanks,

Mark
diff --git a/share/www/script/futon.browse.js b/share/www/script/futon.browse.js
index 29c0d86..7fa880f 100644
--- a/share/www/script/futon.browse.js
+++ b/share/www/script/futon.browse.js
@@ -206,32 +206,48 @@
 
       // Populate the languages dropdown, and listen to selection changes
       this.populateLanguagesMenu = function() {
+        var all_langs = {};
+        fill_language = function() {
+          var select = $("#language");
+          for (var language in all_langs) {
+            var option = $(document.createElement("option"))
+              .attr("value", language).text(language)
+              .appendTo(select);
+          }
+          if (select[0].options.length == 1) {
+            select[0].disabled = true;
+          } else {
+            select[0].disabled = false;
+            select.val(page.viewLanguage);
+            select.change(function() {
+              var language = $("#language").val();
+              if (language != page.viewLanguage) {
+                var mapFun = $("#viewcode_map").val();
+                if (mapFun == "" || mapFun == templates[page.viewLanguage]) {
+                  // no edits made, so change to the new default
+                  $("#viewcode_map").val(templates[language]);
+                }
+                page.viewLanguage = language;
+                $("#viewcode_map")[0].focus();
+              }
+              return false;
+            });
+          }
+        }
         $.couch.config({
           success: function(resp) {
-            var select = $("#language");
             for (var language in resp) {
-              var option = $(document.createElement("option"))
-                .attr("value", language).text(language)
-                .appendTo(select);
+              all_langs[language] = resp[language];
             }
-            if (select[0].options.length == 1) {
-              select[0].disabled = true;
-            } else {
-              select.val(page.viewLanguage);
-              select.change(function() {
-                var language = $("#language").val();
-                if (language != page.viewLanguage) {
-                  var mapFun = $("#viewcode_map").val();
-                  if (mapFun == "" || mapFun == templates[page.viewLanguage]) {
-                    // no edits made, so change to the new default
-                    $("#viewcode_map").val(templates[language]);
-                  }
-                  page.viewLanguage = language;
-                  $("#viewcode_map")[0].focus();
+
+            $.couch.config({
+              success: function(resp) {
+                for (var language in resp) {
+                  all_langs[language] = resp[language];
                 }
-                return false;
-              });
-            }
+                fill_language();
+              }
+            }, "native_query_servers");
           }
         }, "query_servers");
       }
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index fce89f3..83a1bf9 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -68,6 +68,7 @@ source_files = \
     couch_httpd_stats_handlers.erl \
     couch_key_tree.erl \
     couch_log.erl \
+    couch_native_process.erl \
     couch_os_process.erl \
     couch_query_servers.erl \
     couch_ref_counter.erl \
@@ -113,6 +114,7 @@ compiled_files = \
     couch_httpd_stats_handlers.beam \
     couch_key_tree.beam \
     couch_log.beam \
+    couch_native_process.beam \
     couch_os_process.beam \
     couch_query_servers.beam \
     couch_ref_counter.beam \
diff --git a/src/couchdb/couch_native_process.erl 
b/src/couchdb/couch_native_process.erl
new file mode 100644
index 0000000..f766349
--- /dev/null
+++ b/src/couchdb/couch_native_process.erl
@@ -0,0 +1,340 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); 
+% you may not use this file except in compliance with the License. 
+%
+% You may obtain a copy of the License at
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, 
+% software distributed under the License is distributed on an 
+% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 
+% either express or implied. 
+%
+% See the License for the specific language governing permissions
+% and limitations under the License. 
+%
+% This file drew much inspiration from erlview, which was written by and
+% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
+%
+%
+% This module provides the smallest possible native view-server.
+% With this module in-place, you can add the following to your couch INI files:
+%  [native_query_servers]
+%  erlang={couch_native_process, start_link, []}
+%
+% Which will then allow following example map function to be used:
+%
+%  fun({Doc}) ->
+%    % Below, we emit a single record - the _id as key, null as value
+%    DocId = proplists:get_value(Doc, <<"_id">>, null),
+%    Emit(DocId, null)
+%  end.
+%
+% which should be roughly the same as the javascript:
+%    emit(doc._id, null);
+%
+% This module exposes enough functions such that a native erlang server can 
+% act as a fully-fleged view server, but no 'helper' functions specifically
+% for simplifying your erlang view code.  It is expected other third-party
+% extensions will evolve which offer useful layers on top of this view server
+% to help simplify your view code.
+-module(couch_native_process).
+
+-export([start_link/0]).
+-export([set_timeout/2, prompt/2, stop/1]).
+
+-define(STATE, native_proc_state).
+-record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}).
+
+-include("couch_db.hrl").
+
+start_link() ->
+    {ok, self()}.
+
+stop(_Pid) ->
+    ok.
+
+set_timeout(_Pid, TimeOut) ->
+    NewState = case get(?STATE) of
+    undefined ->
+        #evstate{timeout=TimeOut};
+    State ->
+        State#evstate{timeout=TimeOut}
+    end,
+    put(?STATE, NewState),
+    ok.
+
+prompt(Pid, Data) when is_pid(Pid), is_list(Data) ->
+    case get(?STATE) of
+    undefined ->
+        State = #evstate{},
+        put(?STATE, State);
+    State ->
+        State
+    end,
+    case is_pid(State#evstate.list_pid) of
+        true ->
+            case hd(Data) of
+                <<"list_row">> -> ok;
+                <<"list_end">> -> ok;
+                _ -> throw({error, query_server_error})
+            end;
+        _ ->
+            ok % Not listing
+    end,
+    {NewState, Resp} = run(State, Data),
+    put(?STATE, NewState),
+    case Resp of
+        {error, Reason} ->
+            Msg = io_lib:format("couch native server error: ~p", [Reason]),
+            {[{<<"error">>, list_to_binary(Msg)}]};
+        _ ->
+            Resp
+    end.
+
+run(_, [<<"reset">>]) ->
+    {#evstate{}, true};
+run(_, [<<"reset">>, QueryConfig]) ->
+    {#evstate{query_config=QueryConfig}, true};
+run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+    FunInfo = makefun(State, BinFunc),
+    {State#evstate{funs=Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">> , Doc]) ->
+    Resp = lists:map(fun({Sig, Fun}) ->
+        erlang:put(Sig, []),
+        Fun(Doc),
+        lists:reverse(erlang:get(Sig))
+    end, State#evstate.funs),
+    {State, Resp};
+run(State, [<<"reduce">>, Funs, KVs]) ->
+    {Keys, Vals} =
+    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
+        {[K | KAcc], [V | VAcc]}
+    end, {[], []}, KVs),
+    Keys2 = lists:reverse(Keys),
+    Vals2 = lists:reverse(Vals),
+    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
+run(State, [<<"rereduce">>, Funs, Vals]) ->
+    {State, catch reduce(State, Funs, null, Vals, true)};
+run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) ->
+    {_Sig, Fun} = makefun(State, BFun),
+    {State, catch Fun(NDoc, ODoc, Ctx)};
+run(State, [<<"filter">>, Docs, Req, Ctx]) ->
+    {_Sig, Fun} = hd(State#evstate.funs),
+    Resp = lists:map(fun(Doc) ->
+        case (catch Fun(Doc, Req, Ctx)) of
+            true -> true;
+            _ -> false
+        end
+    end, Docs),
+    {State, [true, Resp]};
+run(State, [<<"show">>, BFun, Doc, Req]) ->
+    {_Sig, Fun} = makefun(State, BFun),
+    Resp = case (catch Fun(Doc, Req)) of
+        FunResp when is_list(FunResp) ->
+            FunResp;
+        FunResp when is_tuple(FunResp), size(FunResp) == 1 ->
+            [<<"resp">>, FunResp];
+        FunResp ->
+            FunResp
+    end,
+    {State, Resp};
+run(State, [<<"list">>, Head, Req]) ->
+    {Sig, Fun} = hd(State#evstate.funs),
+    % This is kinda dirty
+    case is_function(Fun, 2) of
+        false -> throw({error, render_error});
+        true -> ok
+    end,
+    Self = self(),
+    SpawnFun = fun() ->
+        LastChunk = (catch Fun(Head, Req)),
+        case start_list_resp(Self, Sig) of
+            started ->
+                receive
+                    {Self, list_row, _Row} -> ignore;
+                    {Self, list_end} -> ignore
+                after State#evstate.timeout ->
+                    throw({timeout, list_cleanup_pid})
+                end;
+            _ ->
+                ok
+        end,
+        LastChunks =
+        case erlang:get(Sig) of
+            undefined -> [LastChunk];
+            OtherChunks -> [LastChunk | OtherChunks]
+        end,
+        Self ! {self(), list_end, lists:reverse(LastChunks)}
+    end,
+    erlang:put(do_trap, process_flag(trap_exit, true)),
+    Pid = spawn_link(SpawnFun),
+    Resp =
+    receive
+        {Pid, start, Chunks, JsonResp} ->
+            [<<"start">>, Chunks, JsonResp]
+    after State#evstate.timeout ->
+        throw({timeout, list_start})
+    end,
+    {State#evstate{list_pid=Pid}, Resp};
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+    Pid ! {self(), list_row, Row},
+    receive
+        {Pid, chunks, Data} ->
+            {State, [<<"chunks">>, Data]};
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            process_flag(trap_exit, erlang:get(do_trap)),
+            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+    after State#evstate.timeout ->
+        throw({timeout, list_row})
+    end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+    Pid ! {self(), list_end},
+    Resp =
+    receive
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            [<<"end">>, Data]
+    after State#evstate.timeout ->
+        throw({timeout, list_end})
+    end,
+    process_flag(trap_exit, erlang:get(do_trap)),
+    {State#evstate{list_pid=nil}, Resp};
+run(_, Unknown) ->
+    ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
+    throw({error, query_server_error}).
+
+bindings(State, Sig) ->
+    Self = self(),
+
+    Log = fun(Msg) ->
+        ?LOG_INFO(Msg, [])
+    end,
+
+    Emit = fun(Id, Value) ->
+        Curr = erlang:get(Sig),
+        erlang:put(Sig, [[Id, Value] | Curr])
+    end,
+
+    Start = fun(Headers) ->
+        erlang:put(list_headers, Headers)
+    end,
+
+    Send = fun(Chunk) ->
+        Curr =
+        case erlang:get(Sig) of
+            undefined -> [];
+            Else -> Else
+        end,
+        erlang:put(Sig, [Chunk | Curr])
+    end,
+
+    GetRow = fun() ->
+        case start_list_resp(Self, Sig) of
+            started ->
+                ok;
+            _ ->
+                Chunks =
+                case erlang:get(Sig) of
+                    undefined -> [];
+                    CurrChunks -> CurrChunks
+                end,
+                Self ! {self(), chunks, lists:reverse(Chunks)}
+        end,
+        erlang:put(Sig, []),
+        receive
+            {Self, list_row, Row} -> Row;
+            {Self, list_end} -> nil
+        after State#evstate.timeout ->
+            throw({timeout, list_pid_getrow})
+        end
+    end,
+   
+    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
+
+    [
+        {'Log', Log},
+        {'Emit', Emit},
+        {'Start', Start},
+        {'Send', Send},
+        {'GetRow', GetRow},
+        {'FoldRows', FoldRows}
+    ].
+
+% thanks to erlview, via:
+% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
+makefun(State, Source) ->
+    Sig = erlang:md5(Source),
+    BindFuns = bindings(State, Sig),
+    {Sig, makefun(State, Source, BindFuns)}.
+
+makefun(_State, Source, BindFuns) ->
+    FunStr = binary_to_list(Source),
+    {ok, Tokens, _} = erl_scan:string(FunStr),
+    Form = case (catch erl_parse:parse_exprs(Tokens)) of
+        {ok, [ParsedForm]} ->
+            ParsedForm;
+        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
+            io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]),
+            io:format(standard_error, "~s~p~n", [Mesg, Params]),
+            throw(Error)
+    end,
+    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
+        erl_eval:add_binding(Name, Fun, Acc)
+    end, erl_eval:new_bindings(), BindFuns),
+    {value, Fun, _} = erl_eval:expr(Form, Bindings),
+    Fun.
+
+reduce(State, BinFuns, Keys, Vals, ReReduce) ->
+    Funs = case is_list(BinFuns) of
+        true ->
+            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+        _ ->
+            [makefun(State, BinFuns)]
+    end,
+    Reds = lists:map(fun({_Sig, Fun}) ->
+        Fun(Keys, Vals, ReReduce)
+    end, Funs),
+    [true, Reds].
+
+foldrows(GetRow, ProcRow, Acc) ->
+    case GetRow() of
+        nil ->
+            {ok, Acc};
+        Row ->
+            case (catch ProcRow(Row, Acc)) of
+                {ok, Acc2} ->
+                    foldrows(GetRow, ProcRow, Acc2);
+                {stop, Acc2} ->
+                    {ok, Acc2}
+            end
+    end.
+
+start_list_resp(Self, Sig) ->
+    case erlang:get(list_started) of
+        undefined ->
+            Headers =
+            case erlang:get(list_headers) of
+                undefined -> {[{<<"headers">>, {[]}}]};
+                CurrHdrs -> CurrHdrs
+            end,
+            Chunks = 
+            case erlang:get(Sig) of
+                undefined -> [];
+                CurrChunks -> CurrChunks
+            end,
+            Self ! {self(), start, lists:reverse(Chunks), Headers},
+            erlang:put(list_started, true),
+            erlang:put(Sig, []),
+            started;
+        _ ->
+            ok
+    end.
diff --git a/src/couchdb/couch_query_servers.erl 
b/src/couchdb/couch_query_servers.erl
index bb0cc85..f450992 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -25,6 +25,14 @@
 
 -include("couch_db.hrl").
 
+-record(proc, {
+    pid,
+    lang,
+    prompt_fun,
+    set_timeout_fun,
+    stop_fun
+}).
+
 start_link() ->
     gen_server:start_link({local, couch_query_servers}, couch_query_servers, 
[], []).
 
@@ -32,19 +40,19 @@ stop() ->
     exit(whereis(couch_query_servers), close).
 
 start_doc_map(Lang, Functions) ->
-    Pid = get_os_process(Lang),
+    Proc = get_os_process(Lang),
     lists:foreach(fun(FunctionSource) ->
-        true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
+        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
     end, Functions),
-    {ok, {Lang, Pid}}.
+    {ok, Proc}.
 
-map_docs({_Lang, Pid}, Docs) ->
+map_docs(Proc, Docs) ->
     % send the documents
     Results = lists:map(
         fun(Doc) ->
             Json = couch_doc:to_json_obj(Doc, []),
 
-            FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
+            FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]),
             % the results are a json array of function map yields like this:
             % [FunResults1, FunResults2 ...]
             % where funresults is are json arrays of key value pairs:
@@ -63,8 +71,8 @@ map_docs({_Lang, Pid}, Docs) ->
 
 stop_doc_map(nil) ->
     ok;
-stop_doc_map({Lang, Pid}) ->
-    ok = ret_os_process(Lang, Pid).
+stop_doc_map(Proc) ->
+    ok = ret_os_process(Proc).
 
 group_reductions_results([]) ->
     [];
@@ -83,7 +91,7 @@ group_reductions_results(List) ->
 rereduce(_Lang, [], _ReducedValues) ->
     {ok, []};
 rereduce(Lang, RedSrcs, ReducedValues) ->
-    Pid = get_os_process(Lang),
+    Proc = get_os_process(Lang),
     Grouped = group_reductions_results(ReducedValues),
     Results = try lists:zipwith(
         fun
@@ -92,11 +100,11 @@ rereduce(Lang, RedSrcs, ReducedValues) ->
             Result;
         (FunSrc, Values) ->
             [true, [Result]] =
-                couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], 
Values]),
+                proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]),
             Result
         end, RedSrcs, Grouped)
     after
-        ok = ret_os_process(Lang, Pid)
+        ok = ret_os_process(Proc)
     end,
     {ok, Results}.
 
@@ -121,12 +129,11 @@ recombine_reduce_results([_OsFun|RedSrcs], 
[OsR|OsResults], BuiltinResults, Acc)
 os_reduce(_Lang, [], _KVs) ->
     {ok, []};
 os_reduce(Lang, OsRedSrcs, KVs) ->
-    Pid = get_os_process(Lang),
-    OsResults = try couch_os_process:prompt(Pid,
-            [<<"reduce">>, OsRedSrcs, KVs]) of
+    Proc = get_os_process(Lang),
+    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
         [true, Reductions] -> Reductions
     after
-        ok = ret_os_process(Lang, Pid)
+        ok = ret_os_process(Proc)
     end,
     {ok, OsResults}.
 
@@ -151,7 +158,7 @@ builtin_sum_rows(KVs) ->
     end, 0, KVs).
 
 validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
-    Pid = get_os_process(Lang),
+    Proc = get_os_process(Lang),
     JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
     JsonDiskDoc =
     if DiskDoc == nil ->
@@ -159,7 +166,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
     true ->
         couch_doc:to_json_obj(DiskDoc, [revs])
     end,
-    try couch_os_process:prompt(Pid,
+    try proc_prompt(Proc,
             [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
     1 ->
         ok;
@@ -168,13 +175,13 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) 
->
     {[{<<"unauthorized">>, Message}]} ->
         throw({unauthorized, Message})
     after
-        ok = ret_os_process(Lang, Pid)
+        ok = ret_os_process(Proc)
     end.
 append_docid(DocId, JsonReqIn) ->
     [{<<"docId">>, DocId} | JsonReqIn].
 
 render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
-    Pid = get_os_process(Lang),
+    Proc = get_os_process(Lang),
     {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
 
     {JsonReq, JsonDoc} = case {DocId, Doc} of
@@ -182,34 +189,34 @@ render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
         {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
         _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, 
[revs])}
     end,
-    try couch_os_process:prompt(Pid,
+    try proc_prompt(Proc,
         [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of
     FormResp ->
         FormResp
     after
-        ok = ret_os_process(Lang, Pid)
+        ok = ret_os_process(Proc)
     end.
 
 start_view_list(Lang, ListSrc) ->
-    Pid = get_os_process(Lang),
-    true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]),
-    {ok, {Lang, Pid}}.
+    Proc = get_os_process(Lang),
+    true = proc_prompt(Proc, [<<"add_fun">>, ListSrc]),
+    {ok, Proc}.
 
-render_list_head({_Lang, Pid}, Req, Db, Head) ->
+render_list_head(Proc, Req, Db, Head) ->
     JsonReq = couch_httpd_external:json_req_obj(Req, Db),
-    couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]).
+    proc_prompt(Proc, [<<"list">>, Head, JsonReq]).
 
-render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) ->
+render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) ->
     JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, 
IncludeDoc),
-    couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]);
+    couch_os_process:prompt(Proc, [<<"list_row">>, JsonRow]);
 
-render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) ->
+render_list_row(Proc, _, {Key, Value}, _IncludeDoc) ->
     JsonRow = {[{key, Key}, {value, Value}]},
-    couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]).
+    proc_prompt(Proc, [<<"list_row">>, JsonRow]).
 
-render_list_tail({Lang, Pid}) ->
-    JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]),
-    ok = ret_os_process(Lang, Pid),
+render_list_tail(Proc) ->
+    JsonResp = proc_prompt(Proc, [<<"list_end">>]),
+    ok = ret_os_process(Proc),
     JsonResp.
 
 start_filter(Lang, FilterSrc) ->
@@ -225,8 +232,8 @@ filter_doc({_Lang, Pid}, Doc, Req, Db) ->
         [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]),
     {ok, Pass}.
 
-end_filter({Lang, Pid}) ->
-    ok = ret_os_process(Lang, Pid).
+end_filter(Proc) ->
+    ok = ret_os_process(Proc).
     
 
 init([]) ->
@@ -240,58 +247,74 @@ init([]) ->
         fun("query_servers" ++ _, _) ->
             ?MODULE:stop()
         end),
+    ok = couch_config:register(
+        fun("native_query_servers" ++ _, _) ->
+            ?MODULE:stop()
+        end),
 
     Langs = ets:new(couch_query_server_langs, [set, private]),
-    PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
-    Pids = ets:new(couch_query_server_procs, [set, private]),
+    PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
+    LangProcs = ets:new(couch_query_server_procs, [set, private]),
     InUse = ets:new(couch_query_server_used, [set, private]),
+    % 'query_servers' specifies an OS command-line to execute.
     lists:foreach(fun({Lang, Command}) ->
-        true = ets:insert(Langs, {?l2b(Lang), Command})
+        true = ets:insert(Langs, {?l2b(Lang),
+                          couch_os_process, start_link, [Command]})
     end, couch_config:get("query_servers")),
+    % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
+    lists:foreach(fun({Lang, SpecStr}) ->
+        {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
+        true = ets:insert(Langs, {?l2b(Lang),
+                          Mod, Fun, SpecArg})
+    end, couch_config:get("native_query_servers")),
     process_flag(trap_exit, true),
-    {ok, {Langs, PidLangs, Pids, InUse}}.
+    {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
+          PidProcs, % Keyed by PID, valus is a #proc record.
+          LangProcs, % Keyed by language name, value is a #proc record
+          InUse % Keyed by PID, value is #proc record.
+          }}.
 
 terminate(_Reason, _Server) ->
     ok.
 
 
-handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
+handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, 
InUse}=Server) ->
     % Note to future self. Add max process limit.
-    case ets:lookup(Pids, Lang) of
-    [{Lang, [Pid|_]}] ->
-        add_value(PidLangs, Pid, Lang),
-        rem_from_list(Pids, Lang, Pid),
-        add_to_list(InUse, Lang, Pid),
-        {reply, {recycled, Pid, get_query_server_config()}, Server};
+    case ets:lookup(LangProcs, Lang) of
+    [{Lang, [Proc|_]}] ->
+        add_value(PidProcs, Proc#proc.pid, Proc),
+        rem_from_list(LangProcs, Lang, Proc),
+        add_to_list(InUse, Lang, Proc),
+        {reply, {recycled, Proc, get_query_server_config()}, Server};
     _ ->
         case (catch new_process(Langs, Lang)) of
-        {ok, Pid} ->
-            add_to_list(InUse, Lang, Pid),
-            {reply, {new, Pid}, Server};
+        {ok, Proc} ->
+            add_to_list(InUse, Lang, Proc),
+            {reply, {new, Proc}, Server};
         Error ->
             {reply, Error, Server}
         end
     end;
-handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
+handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) ->
     % Along with max process limit, here we should check
     % if we're over the limit and discard when we are.
-    add_to_list(Pids, Lang, Pid),
-    rem_from_list(InUse, Lang, Pid),
+    add_to_list(LangProcs, Proc#proc.lang, Proc),
+    rem_from_list(InUse, Proc#proc.lang, Proc),
     {reply, true, Server}.
 
 handle_cast(_Whatever, Server) ->
     {noreply, Server}.
 
-handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) ->
-    case ets:lookup(PidLangs, Pid) of
-    [{Pid, Lang}] ->
+handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) ->
+    case ets:lookup(PidProcs, Pid) of
+    [{Pid, Proc}] ->
         case Status of
         normal -> ok;
         _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", 
[Pid, Status])
         end,
-        rem_value(PidLangs, Pid),
-        catch rem_from_list(Pids, Lang, Pid),
-        catch rem_from_list(InUse, Lang, Pid),
+        rem_value(PidProcs, Pid),
+        catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
+        catch rem_from_list(InUse, Proc#proc.lang, Proc),
         {noreply, Server};
     [] ->
         ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, 
Status]),
@@ -310,37 +333,55 @@ get_query_server_config() ->
 
 new_process(Langs, Lang) ->
     case ets:lookup(Langs, Lang) of
-    [{Lang, Command}] ->
-        couch_os_process:start_link(Command);
+    [{Lang, Mod, Func, Arg}] ->
+        {ok, Pid} = apply(Mod, Func, Arg),
+        {ok, #proc{lang=Lang,
+                   pid=Pid,
+                   % Called via proc_prompt, proc_set_timeout, and proc_stop
+                   prompt_fun={Mod, prompt},
+                   set_timeout_fun={Mod, set_timeout},
+                   stop_fun={Mod, stop}}};
     _ ->
         {unknown_query_language, Lang}
     end.
 
+proc_prompt(Proc, Args) ->
+    {Mod, Func} = Proc#proc.prompt_fun,
+    apply(Mod, Func, [Proc#proc.pid, Args]).
+
+proc_stop(Proc) ->
+    {Mod, Func} = Proc#proc.stop_fun,
+    apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+    {Mod, Func} = Proc#proc.set_timeout_fun,
+    apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
 get_os_process(Lang) ->
     case gen_server:call(couch_query_servers, {get_proc, Lang}) of
-    {new, Pid} ->
-        couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
-                "couchdb", "os_process_timeout", "5000"))),
-        link(Pid),
-        Pid;
-    {recycled, Pid, QueryConfig} ->
-        case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) 
of
+    {new, Proc} ->
+        proc_set_timeout(Proc, list_to_integer(couch_config:get(
+                            "couchdb", "os_process_timeout", "5000"))),
+        link(Proc#proc.pid),
+        Proc;
+    {recycled, Proc, QueryConfig} ->
+        case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
         true ->
-            couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
-                "couchdb", "os_process_timeout", "5000"))),
-            link(Pid),
-            Pid;
+            proc_set_timeout(Proc, list_to_integer(couch_config:get(
+                                "couchdb", "os_process_timeout", "5000"))),
+            link(Proc#proc.pid),
+            Proc;
         _ ->
-            catch couch_os_process:stop(Pid),
+            catch proc_stop(Proc),
             get_os_process(Lang)
         end;
     Error ->
         throw(Error)
     end.
 
-ret_os_process(Lang, Pid) ->
-    true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
-    catch unlink(Pid),
+ret_os_process(Proc) ->
+    true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
+    catch unlink(Proc#proc.pid),
     ok.
 
 add_value(Tid, Key, Value) ->
diff --git a/test/query_server_spec.rb b/test/query_server_spec.rb
index dfc57a5..eaa7815 100644
--- a/test/query_server_spec.rb
+++ b/test/query_server_spec.rb
@@ -14,9 +14,8 @@
 # spec test/query_server_spec.rb -f specdoc --color
 
 COUCH_ROOT = "#{File.dirname(__FILE__)}/.." unless defined?(COUCH_ROOT)
-LANGUAGE = "js"
+LANGUAGE = "erlang"
 
-require 'open3'
 require 'spec'
 require 'json'
 
@@ -25,25 +24,21 @@ class OSProcessRunner
     trace = false
     puts "launching #{run_command}" if trace
     if block_given?
-      Open3.popen3(run_command) do |jsin, jsout, jserr|
-        js = QueryServerRunner.new(jsin, jsout, jserr, trace)
-        yield js
+      IO.popen(run_command, "r+") do |io|
+        qs = QueryServerRunner.new(io, trace)
+        yield qs
       end
     else
-      jsin, jsout, jserr = Open3.popen3(run_command)
-      QueryServerRunner.new(jsin, jsout, jserr, trace)
+      io = IO.popen(run_command, "r+")
+      QueryServerRunner.new(io, trace)
     end
   end
-  def initialize jsin, jsout, jserr, trace = false
-    @qsin = jsin
-    @qsout = jsout
-    @qserr = jserr
+  def initialize io, trace = false
+    @qsio = io
     @trace = trace
   end
   def close
-    @qsin.close
-    @qsout.close
-    @qserr.close
+    @qsio.close
   end
   def reset!
     run(["reset"])
@@ -63,10 +58,10 @@ class OSProcessRunner
   def rrun json
     line = json.to_json
     puts "run: #{line}" if @trace
-    @qsin.puts line
+    @qsio.puts line
   end
   def rgets
-    resp = @qsout.gets
+    resp = @qsio.gets
     puts "got: #{resp}"  if @trace
     resp
   end
@@ -92,7 +87,10 @@ end
 
 class QueryServerRunner < OSProcessRunner
 
-  COMMANDS = {"js" => "#{COUCH_ROOT}/src/couchdb/couchjs 
#{COUCH_ROOT}/share/server/main.js" }
+  COMMANDS = {
+    "js" => "#{COUCH_ROOT}/src/couchdb/couchjs 
#{COUCH_ROOT}/share/server/main.js",
+    "erlang" => "#{COUCH_ROOT}/test/run_native_process.es"
+  }
 
   def self.run_command
     COMMANDS[LANGUAGE]
@@ -107,39 +105,87 @@ end
 
 functions = {
   "emit-twice" => {
-    "js" => %{function(doc){emit("foo",doc.a); emit("bar",doc.a)}}
+    "js" => %{function(doc){emit("foo",doc.a); emit("bar",doc.a)}},
+    "erlang" => <<-ERLANG
+      fun({Doc}) ->
+        A = proplists:get_value(<<"a">>, Doc, null),
+        Emit(<<"foo">>, A),
+        Emit(<<"bar">>, A)
+      end.
+    ERLANG
   },
   "emit-once" => {
-    "js" => %{function(doc){emit("baz",doc.a)}}
+    "js" => %{function(doc){emit("baz",doc.a)}},
+    "erlang" => <<-ERLANG
+        fun({Doc}) ->
+            A = proplists:get_value(<<"a">>, Doc, null),
+            Emit(<<"baz">>, A)
+        end.
+    ERLANG
   },
   "reduce-values-length" => {
-    "js" => %{function(keys, values, rereduce) { return values.length; }}
+    "js" => %{function(keys, values, rereduce) { return values.length; }},
+    "erlang" => %{fun(Keys, Values, ReReduce) -> length(Values) end.}
   },
   "reduce-values-sum" => {
-    "js" => %{function(keys, values, rereduce) { return sum(values); }}
+    "js" => %{function(keys, values, rereduce) { return sum(values); }},
+    "erlang" => %{fun(Keys, Values, ReReduce) -> lists:sum(Values) end.}
   },
   "validate-forbidden" => {
-    "js" => %{function(newDoc, oldDoc, userCtx) { if (newDoc.bad) 
throw({forbidden:"bad doc"}); "foo bar";}}
+    "js" => <<-JS,
+      function(newDoc, oldDoc, userCtx) {
+        if(newDoc.bad)
+          throw({forbidden:"bad doc"}); "foo bar";
+      }
+    JS
+    "erlang" => <<-ERLANG
+      fun({NewDoc}, _OldDoc, _UserCtx) ->
+        case proplists:get_value(<<"bad">>, NewDoc) of
+            undefined -> 1;
+            _ -> {[{forbidden, <<"bad doc">>}]}
+        end
+      end.
+    ERLANG
   },
   "show-simple" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(doc, req) {
-          log("ok");
-          return [doc.title, doc.body].join(' - ');
+            log("ok");
+            return [doc.title, doc.body].join(' - ');
         }
     JS
+    "erlang" => <<-ERLANG
+      fun({Doc}, Req) ->
+            Title = proplists:get_value(<<"title">>, Doc),
+            Body = proplists:get_value(<<"body">>, Doc),
+            Resp = <<Title/binary, " - ", Body/binary>>,
+        {[{<<"body">>, Resp}]}
+      end.
+    ERLANG
   },
   "show-headers" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(doc, req) {
           var resp = {"code":200, "headers":{"X-Plankton":"Rusty"}};
           resp.body = [doc.title, doc.body].join(' - ');
           return resp;
         }
      JS
+    "erlang" => <<-ERLANG
+  fun({Doc}, Req) ->
+        Title = proplists:get_value(<<"title">>, Doc),
+        Body = proplists:get_value(<<"body">>, Doc),
+        Resp = <<Title/binary, " - ", Body/binary>>,
+        {[
+        {<<"code">>, 200},
+        {<<"headers">>, {[{<<"X-Plankton">>, <<"Rusty">>}]}},
+        {<<"body">>, Resp}
+      ]}
+  end.
+    ERLANG
   },
   "show-sends" => {
-    "js" =>  <<-JS
+    "js" =>  <<-JS,
         function(head, req) {
           start({headers:{"Content-Type" : "text/plain"}});
           send("first chunk");
@@ -147,9 +193,20 @@ functions = {
           return "tail";
         };
     JS
+    "erlang" => <<-ERLANG
+      fun(Head, Req) ->
+        Resp = {[
+          {<<"headers">>, {[{<<"Content-Type">>, <<"text/plain">>}]}}
+        ]},
+        Start(Resp),
+        Send(<<"first chunk">>),
+        Send(<<"second \\\"chunk\\\"">>),
+        <<"tail">>
+      end.
+    ERLANG
   },
   "show-while-get-rows" => {
-    "js" =>  <<-JS
+    "js" =>  <<-JS,
         function(head, req) {
           send("first chunk");
           send(req.q);
@@ -161,9 +218,21 @@ functions = {
           return "tail";
         };
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, {Req}) ->
+            Send(<<"first chunk">>),
+            Send(proplists:get_value(<<"q">>, Req)),
+            Fun = fun({Row}, _) ->
+                Send(proplists:get_value(<<"key">>, Row)),
+                {ok, nil}
+            end,
+            {ok, _} = FoldRows(Fun, nil),
+            <<"tail">>
+        end.
+    ERLANG
   },
   "show-while-get-rows-multi-send" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req) {
           send("bacon");
           var row;
@@ -175,9 +244,21 @@ functions = {
           return "tail";
         };
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, Req) ->
+            Send(<<"bacon">>),
+            Fun = fun({Row}, _) ->
+                Send(proplists:get_value(<<"key">>, Row)),
+                Send(<<"eggs">>),
+                {ok, nil}
+            end,
+            FoldRows(Fun, nil),
+            <<"tail">>
+        end.
+    ERLANG
   },
   "list-simple" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req) {
           send("first chunk");
           send(req.q);
@@ -188,9 +269,21 @@ functions = {
           return "early";
         };
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, {Req}) ->
+            Send(<<"first chunk">>),
+            Send(proplists:get_value(<<"q">>, Req)),
+            Fun = fun({Row}, _) ->
+                Send(proplists:get_value(<<"key">>, Row)),
+                {ok, nil}
+            end,
+            FoldRows(Fun, nil),
+            <<"early">>
+        end.
+    ERLANG
   },
   "list-chunky" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req) {
           send("first chunk");
           send(req.q);
@@ -204,16 +297,37 @@ functions = {
           };
         };
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, {Req}) ->
+            Send(<<"first chunk">>),
+            Send(proplists:get_value(<<"q">>, Req)),
+            Fun = fun
+                ({Row}, Count) when Count < 2 ->
+                    Send(proplists:get_value(<<"key">>, Row)),
+                    {ok, Count+1};
+                ({Row}, Count) when Count == 2 ->
+                    Send(proplists:get_value(<<"key">>, Row)),
+                    {stop, <<"early tail">>}
+            end,
+            {ok, Tail} = FoldRows(Fun, 0),
+            Tail
+        end.
+    ERLANG
   },
   "list-old-style" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req, foo, bar) {
           return "stuff";
         }
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, Req, Foo, Bar) ->
+            <<"stuff">>
+        end.
+    ERLANG
   },
   "list-capped" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req) {
           send("bacon")
           var row, i = 0;
@@ -226,9 +340,24 @@ functions = {
           };
         }
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, Req) ->
+            Send(<<"bacon">>),
+            Fun = fun
+                ({Row}, Count) when Count < 2 ->
+                    Send(proplists:get_value(<<"key">>, Row)),
+                    {ok, Count+1};
+                ({Row}, Count) when Count == 2 ->
+                    Send(proplists:get_value(<<"key">>, Row)),
+                    {stop, <<"early">>}
+            end,
+            {ok, Tail} = FoldRows(Fun, 0),
+            Tail
+        end.
+    ERLANG
   },
   "list-raw" => {
-    "js" => <<-JS
+    "js" => <<-JS,
         function(head, req) {
           send("first chunk");
           send(req.q);
@@ -239,15 +368,32 @@ functions = {
           return "tail";
         };
     JS
+    "erlang" => <<-ERLANG,
+        fun(Head, {Req}) ->
+            Send(<<"first chunk">>),
+            Send(proplists:get_value(<<"q">>, Req)),
+            Fun = fun({Row}, _) ->
+                Send(proplists:get_value(<<"key">>, Row)),
+                {ok, nil}
+            end,
+            FoldRows(Fun, nil),
+            <<"tail">>
+        end.
+    ERLANG
   },
   "filter-basic" => {
-    "js" => <<-JS
+    "js" => <<-JS,
       function(doc, req, userCtx) {
         if (doc.good) {
           return true;
         }
       }
     JS
+    "erlang" => <<-ERLANG,
+        fun({Doc}, Req, Ctx) ->
+            proplists:get_value(<<"good">>, Doc)
+        end.
+    ERLANG
   }
 }
 
@@ -313,7 +459,7 @@ describe "query server normal case" do
     end
     it "should show" do
       @qs.rrun(["show", @fun,
-        {:title => "Best ever", :body => "Doc body"}])
+        {:title => "Best ever", :body => "Doc body"}, {}])
       @qs.jsgets.should == ["resp", {"body" => "Best ever - Doc body"}]
     end
   end
@@ -325,7 +471,7 @@ describe "query server normal case" do
     end
     it "should show headers" do
       @qs.rrun(["show", @fun,
-        {:title => "Best ever", :body => "Doc body"}])
+        {:title => "Best ever", :body => "Doc body"}, {}])
       @qs.jsgets.should == ["resp", {"code"=>200,"headers" => 
{"X-Plankton"=>"Rusty"}, "body" => "Best ever - Doc body"}]
     end
   end
@@ -437,7 +583,7 @@ describe "query server normal case" do
       @qs.add_fun(@fun).should == true
     end
     it "should only return true for good docs" do
-      @qs.run(["filter", [{"key"=>"bam", "good" => true}, {"foo" => "bar"}, 
{"good" => true}]]).
+      @qs.run(["filter", [{"key"=>"bam", "good" => true}, {"foo" => "bar"}, 
{"good" => true}], {}, {}]).
         should ==  [true, [true, false, true]]
     end
   end
@@ -471,7 +617,7 @@ describe "query server that exits" do
     it "should get a warning" do
       resp = @qs.run(["list", {"foo"=>"bar"}, {"q" => "ok"}])
       resp["error"].should == "render_error"
-      resp["reason"].should include("the list API has changed")
+      #resp["reason"].should include("the list API has changed")
     end
   end
 
diff --git a/test/run_native_process.es b/test/run_native_process.es
new file mode 100755
index 0000000..275d2bb
--- /dev/null
+++ b/test/run_native_process.es
@@ -0,0 +1,43 @@
+#! /usr/bin/env escript
+
+read() ->
+    case io:get_line('') of
+        eof -> stop;
+        Data -> mochijson2:decode(Data)
+    end.
+
+send(Data) when is_binary(Data) ->
+    send(binary_to_list(Data));
+send(Data) when is_list(Data) ->
+    io:format(Data ++ "\n", []).
+
+write(Data) ->
+    case (catch mochijson2:encode(Data)) of
+        {json_encode, Error} -> write({[{<<"error">>, Error}]});
+        Json -> send(Json)
+    end.
+
+%log(Mesg) ->
+%    log(Mesg, []).
+%log(Mesg, Params) ->
+%    io:format(standard_error, Mesg, Params).
+
+loop(Pid) ->
+    case read() of
+        stop -> ok;
+        Json ->
+            case (catch couch_native_process:prompt(Pid, Json)) of
+                {error, Reason} ->
+                    ok = write({[{error, Reason}]});
+                Resp ->
+                    ok = write(Resp),
+                    loop(Pid)
+            end
+    end.
+
+main([]) ->
+    code:add_pathz("src/couchdb"),
+    code:add_pathz("src/mochiweb"),
+    {ok, Pid} = couch_native_process:start_link(),
+    loop(Pid).
+

Reply via email to