Author: fdmanana
Date: Sun Jul 10 12:11:15 2011
New Revision: 1144844

URL: http://svn.apache.org/viewvc?rev=1144844&view=rev
Log:
Some view indexer optimizations

Changes:

- Decoding of the JSON received from the external OS process
  (couchjs) is now done in the indexer's process which does the
  writes to disk. This avoids several phases of EJSON term
  copying: couch_os_process -> do_maps process -> work queue ->
  do_writes process;

- The indexer's do_maps process no longer needs to find out
  which seq number is the highest from the input list via
  a list comprehension + lists:max/1 call. The highest is
  always the last one on the queue (this is enforced when
  folding the database's byseq btree);

- Don't update the task status everytime we read a single doc
  from the database. Instead update it only after writing
  a view KV batch to disk;

- Avoid several linear passages over the same lists multiple
  times.

This is part of COUCHDB-1186.


Modified:
    couchdb/trunk/src/couchdb/couch_os_process.erl
    couchdb/trunk/src/couchdb/couch_query_servers.erl
    couchdb/trunk/src/couchdb/couch_view_group.erl
    couchdb/trunk/src/couchdb/couch_view_updater.erl

Modified: couchdb/trunk/src/couchdb/couch_os_process.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_os_process.erl?rev=1144844&r1=1144843&r2=1144844&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_os_process.erl (original)
+++ couchdb/trunk/src/couchdb/couch_os_process.erl Sun Jul 10 12:11:15 2011
@@ -85,22 +85,51 @@ writejson(OsProc, Data) when is_record(O
     true = writeline(OsProc, JsonData).
 
 readjson(OsProc) when is_record(OsProc, os_proc) ->
-    Line = readline(OsProc),
+    Line = iolist_to_binary(readline(OsProc)),
     ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
-    case ?JSON_DECODE(Line) of
-    [<<"log">>, Msg] when is_binary(Msg) ->
-        % we got a message to log. Log it and continue
-        ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]),
-        readjson(OsProc);
-    [<<"error">>, Id, Reason] ->
-        throw({error, {couch_util:to_existing_atom(Id),Reason}});
-    [<<"fatal">>, Id, Reason] ->
-        ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",[OsProc#os_proc.port, 
Id, Reason]),
-        throw({couch_util:to_existing_atom(Id),Reason});
-    Result ->
-        Result
+    try
+        % Don't actually parse the whole JSON. Just try to see if it's
+        % a command or a doc map/reduce/filter/show/list/update output.
+        % If it's a command then parse the whole JSON and execute the
+        % command, otherwise return the raw JSON line to the caller.
+        pick_command(Line)
+    catch
+    throw:abort ->
+        {json, Line};
+    throw:{cmd, _Cmd} ->
+        case ?JSON_DECODE(Line) of
+        [<<"log">>, Msg] when is_binary(Msg) ->
+            % we got a message to log. Log it and continue
+            ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]),
+            readjson(OsProc);
+        [<<"error">>, Id, Reason] ->
+            throw({error, {couch_util:to_existing_atom(Id),Reason}});
+        [<<"fatal">>, Id, Reason] ->
+            ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",
+                [OsProc#os_proc.port, Id, Reason]),
+            throw({couch_util:to_existing_atom(Id),Reason});
+        _Result ->
+            {json, Line}
+        end
     end.
 
+pick_command(Line) ->
+    json_stream_parse:events(Line, fun pick_command0/1).
+
+pick_command0(array_start) ->
+    fun pick_command1/1;
+pick_command0(_) ->
+    throw(abort).
+
+pick_command1(<<"log">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"error">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"fatal">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(_) ->
+    throw(abort).
+
 
 % gen_server API
 init([Command, Options, PortOptions]) ->

Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=1144844&r1=1144843&r2=1144844&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_query_servers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_query_servers.erl Sun Jul 10 12:11:15 2011
@@ -16,7 +16,7 @@
 -export([start_link/0, config_change/1]).
 
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, 
handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, stop_doc_map/1]).
+-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, 
raw_to_ejson/1]).
 -export([reduce/3, rereduce/3,validate_doc_update/5]).
 -export([filter_docs/5]).
 -export([filter_view/3]).
@@ -82,6 +82,10 @@ map_docs(Proc, Docs) ->
         Docs),
     {ok, Results}.
 
+map_doc_raw(Proc, Doc) ->
+    Json = couch_doc:to_json_obj(Doc, []),
+    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
 
 stop_doc_map(nil) ->
     ok;
@@ -494,9 +498,21 @@ proc_with_ddoc(DDoc, DDocKey, LangProcs)
     end.
 
 proc_prompt(Proc, Args) ->
-    {Mod, Func} = Proc#proc.prompt_fun,
+     case proc_prompt_raw(Proc, Args) of
+     {json, Json} ->
+         ?JSON_DECODE(Json);
+     EJson ->
+         EJson
+     end.
+
+proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
     apply(Mod, Func, [Proc#proc.pid, Args]).
 
+raw_to_ejson({json, Json}) ->
+    ?JSON_DECODE(Json);
+raw_to_ejson(EJson) ->
+    EJson.
+
 proc_stop(Proc) ->
     {Mod, Func} = Proc#proc.stop_fun,
     apply(Mod, Func, [Proc#proc.pid]).

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=1144844&r1=1144843&r2=1144844&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Sun Jul 10 12:11:15 2011
@@ -236,16 +236,22 @@ handle_cast({partial_update, Pid, NewGro
         = State) ->
     #group_state{
         db_name = DbName,
-        waiting_commit = WaitingCommit
+        waiting_commit = WaitingCommit,
+        group = Group
     } = State,
     NewSeq = NewGroup#group.current_seq,
-    ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
-        DbName, NewGroup#group.name]),
-    if not WaitingCommit ->
-        erlang:send_after(1000, self(), delayed_commit);
-    true -> ok
-    end,
-    {noreply, State#group_state{group=NewGroup, waiting_commit=true}};
+    case NewSeq > Group#group.current_seq of
+    true ->
+        ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
+            DbName, NewGroup#group.name]),
+        if not WaitingCommit ->
+            erlang:send_after(1000, self(), delayed_commit);
+        true -> ok
+        end,
+        {noreply, State#group_state{group=NewGroup, waiting_commit=true}};
+    false ->
+        {noreply, State}
+    end;
 handle_cast({partial_update, _, _}, State) ->
     %% message from an old (probably pre-compaction) updater; ignore
     {noreply, State}.

Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=1144844&r1=1144843&r2=1144844&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_updater.erl Sun Jul 10 12:11:15 2011
@@ -24,19 +24,14 @@ update(Owner, Group, DbName) ->
         current_seq = Seq,
         purge_seq = PurgeSeq
     } = Group,
-    couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," 
",GroupName/binary>>, <<"Starting index update">>),
 
     {ok, Db} = couch_db:open_int(DbName, []),
     DbPurgeSeq = couch_db:get_purge_seq(Db),
-    Group2 =
     if DbPurgeSeq == PurgeSeq ->
-        Group;
+        ok;
     DbPurgeSeq == PurgeSeq + 1 ->
-        couch_task_status:update(<<"Removing purged entries from view 
index.">>),
-        purge_index(Group, Db);
+        ok;
     true ->
-        couch_task_status:update(<<"Resetting view index due to lost purge 
entries.">>),
-        couch_db:close(Db),
         exit(reset)
     end,
     {ok, MapQueue} = couch_work_queue:new(
@@ -44,13 +39,30 @@ update(Owner, Group, DbName) ->
     {ok, WriteQueue} = couch_work_queue:new(
         [{max_size, 100000}, {max_items, 500}]),
     Self = self(),
-    ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
-    spawn_link(fun() -> do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) 
end),
-    spawn_link(fun() -> do_writes(Self, Owner, Group2, WriteQueue, Seq == 0) 
end),
-    % compute on all docs modified since we last computed.
+    spawn_link(fun() ->
+        do_maps(add_query_server(Group), MapQueue, WriteQueue)
+    end),
     TotalChanges = couch_db:count_changes_since(Db, Seq),
-    % update status every half second
-    couch_task_status:set_update_frequency(500),
+    spawn_link(fun() ->
+        couch_task_status:add_task(
+            <<"View Group Indexer">>,
+            <<DbName/binary, " ", GroupName/binary>>,
+            <<"Starting index update">>),
+        couch_task_status:set_update_frequency(500),
+        Group2 =
+        if DbPurgeSeq == PurgeSeq + 1 ->
+            couch_task_status:update(<<"Removing purged entries from view 
index.">>),
+            purge_index(Group, Db);
+        true ->
+            Group
+        end,
+        ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
+        do_writes(Self, Owner, Group2, WriteQueue,
+            Seq == 0, ViewEmptyKVs, 0, TotalChanges),
+        couch_task_status:set_update_frequency(0),
+        couch_task_status:update("Finishing.")
+    end),
+    % compute on all docs modified since we last computed.
     #group{ design_options = DesignOptions } = Group,
     IncludeDesign = couch_util:get_value(<<"include_design">>,
         DesignOptions, false),
@@ -64,15 +76,11 @@ update(Owner, Group, DbName) ->
         = couch_db:enum_docs_since(
             Db,
             Seq,
-            fun(DocInfo, _, ChangesProcessed) ->
-                couch_task_status:update("Processed ~p of ~p changes (~p%)",
-                        [ChangesProcessed, TotalChanges, 
(ChangesProcessed*100) div TotalChanges]),
+            fun(DocInfo, _, Acc) ->
                 load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign),
-                {ok, ChangesProcessed+1}
+                {ok, Acc}
             end,
-            0, []),
-    couch_task_status:set_update_frequency(0),
-    couch_task_status:update("Finishing."),
+            ok, []),
     couch_work_queue:close(MapQueue),
     couch_db:close(Db),
     receive {new_group, NewGroup} ->
@@ -81,6 +89,16 @@ update(Owner, Group, DbName) ->
     end.
 
 
+add_query_server(#group{query_server = nil} = Group) ->
+    {ok, Qs} = couch_query_servers:start_doc_map(
+        Group#group.def_lang,
+        [View#view.def || View <- Group#group.views],
+        Group#group.lib),
+    Group#group{query_server = Qs};
+add_query_server(Group) ->
+    Group.
+
+
 purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) ->
     {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
     Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
@@ -133,96 +151,83 @@ load_doc(Db, DocInfo, MapQueue, DocOpts,
         end
     end.
     
-do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) ->
+do_maps(#group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
     case couch_work_queue:dequeue(MapQueue) of
     closed ->
         couch_work_queue:close(WriteQueue),
         couch_query_servers:stop_doc_map(Group#group.query_server);
     {ok, Queue} ->
-        Docs = [Doc || {_,#doc{deleted=false}=Doc} <- Queue],
-        DelKVs = [{Id, []} || {_, #doc{deleted=true,id=Id}} <- Queue],
-        LastSeq = lists:max([Seq || {Seq, _Doc} <- Queue]),
-        {Group1, Results} = view_compute(Group, Docs),
-        {ViewKVs, DocIdViewIdKeys} = view_insert_query_results(Docs,
-                    Results, ViewEmptyKVs, DelKVs),
-        couch_work_queue:queue(WriteQueue, {LastSeq, ViewKVs, 
DocIdViewIdKeys}),
-        do_maps(Group1, MapQueue, WriteQueue, ViewEmptyKVs)
+        lists:foreach(
+            fun({Seq, #doc{id = Id, deleted = true}}) ->
+                Item = {Seq, Id, []},
+                ok = couch_work_queue:queue(WriteQueue, Item);
+            ({Seq, #doc{id = Id, deleted = false} = Doc}) ->
+                {ok, Result} = couch_query_servers:map_doc_raw(Qs, Doc),
+                Item = {Seq, Id, Result},
+                ok = couch_work_queue:queue(WriteQueue, Item)
+            end,
+            Queue),
+        do_maps(Group, MapQueue, WriteQueue)
     end.
 
-do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) ->
+do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs,
+        ChangesDone, TotalChanges) ->
     case couch_work_queue:dequeue(WriteQueue) of
     closed ->
         Parent ! {new_group, Group};
     {ok, Queue} ->
-        {NewSeq, ViewKeyValues, DocIdViewIdKeys} = lists:foldl(
-            fun({Seq, ViewKVs, DocIdViewIdKeys}, nil) ->
-                {Seq, ViewKVs, DocIdViewIdKeys};
-            ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) ->
-                {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc,
-                AccViewKVs2 = lists:zipwith(
-                    fun({View, KVsIn}, {_View, KVsAcc}) ->
-                        {View, KVsIn ++ KVsAcc}
-                    end, ViewKVs, AccViewKVs),
-                {lists:max([Seq, Seq2]),
-                        AccViewKVs2, DocIdViewIdKeys ++ AccDocIdViewIdKeys}
-            end, nil, Queue),
-        Group2 = write_changes(Group, ViewKeyValues, DocIdViewIdKeys, NewSeq,
-                InitialBuild),
+        {ViewKVs, DocIdViewIdKeys} = lists:foldr(
+            fun({_Seq, Id, []}, {ViewKVsAcc, DocIdViewIdKeysAcc}) ->
+                {ViewKVsAcc, [{Id, []} | DocIdViewIdKeysAcc]};
+            ({_Seq, Id, RawQueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc}) ->
+                QueryResults = [
+                    [list_to_tuple(FunResult) || FunResult <- FunRs] || FunRs 
<-
+                        couch_query_servers:raw_to_ejson(RawQueryResults)
+                ],
+                {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(
+                        Id, QueryResults, ViewKVsAcc, [], []),
+                {NewViewKVs, [{Id, NewViewIdKeys} | DocIdViewIdKeysAcc]}
+            end,
+            {ViewEmptyKVs, []}, Queue),
+        {NewSeq, _, _} = lists:last(Queue),
+        Group2 = write_changes(
+            Group, ViewKVs, DocIdViewIdKeys, NewSeq, InitialBuild),
         case Owner of
-        nil -> ok;
-        _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2})
+        nil ->
+            ok;
+        _ ->
+            ok = gen_server:cast(Owner, {partial_update, Parent, Group2})
         end,
-        do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild)
+        ChangesDone2 = ChangesDone + length(Queue),
+        couch_task_status:update("Processed ~p of ~p changes (~p%)",
+              [ChangesDone2, TotalChanges, (ChangesDone2 * 100) div 
TotalChanges]),
+        do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild, 
ViewEmptyKVs,
+            ChangesDone2, TotalChanges)
     end.
 
-view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
-    {ViewKVs, DocIdViewIdKeysAcc};
-view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], 
ViewKVs, DocIdViewIdKeysAcc) ->
-    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, 
QueryResults, ViewKVs, [], []),
-    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
-    view_insert_query_results(RestDocs, RestResults, NewViewKVs, 
NewDocIdViewIdKeys).
 
-
-view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
+view_insert_doc_query_results(_DocId, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
     {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
-view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], 
[{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
+view_insert_doc_query_results(DocId, [ResultKVs | RestResults],
+        [{View, KVs} | RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
     % Take any identical keys and combine the values
-    ResultKVs2 = lists:foldl(
-        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
-            case Key == PrevKey of
-            true ->
-                case PrevVal of
-                {dups, Dups} ->
-                    [{PrevKey, {dups, [Value|Dups]}} | AccRest];
-                _ ->
-                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
-                end;
-            false ->
-                [{Key,Value},{PrevKey,PrevVal}|AccRest]
-            end;
-        (KV, []) ->
-           [KV]
-        end, [], lists:sort(ResultKVs)),
-    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
+    {NewKVs, NewViewIdKeys} = lists:foldl(
+        fun({Key, Val}, {[{{Key, _DocId} = Kd, PrevVal} | AccRest], AccVid}) ->
+            AccKv2 = case PrevVal of
+            {dups, Dups} ->
+                [{Kd, {dups, [Val | Dups]}} | AccRest];
+            _ ->
+                [{Kd, {dups, [Val, PrevVal]}} | AccRest]
+            end,
+            {AccKv2, [{View#view.id_num, Key} | AccVid]};
+        ({Key, Val}, {AccKv, AccVid}) ->
+            {[{{Key, DocId}, Val} | AccKv], [{View#view.id_num, Key} | AccVid]}
+        end,
+        {[], []}, lists:sort(ResultKVs)),
     NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
-    NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
     NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
-    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, 
NewViewKVsAcc, NewViewIdKeysAcc).
-
-view_compute(Group, []) ->
-    {Group, []};
-view_compute(#group{def_lang=DefLang, lib=Lib, 
query_server=QueryServerIn}=Group, Docs) ->
-    {ok, QueryServer} =
-    case QueryServerIn of
-    nil -> % doc map not started
-        Definitions = [View#view.def || View <- Group#group.views],
-        couch_query_servers:start_doc_map(DefLang, Definitions, Lib);
-    _ ->
-        {ok, QueryServerIn}
-    end,
-    {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
-    {Group#group{query_server=QueryServer}, Results}.
-
+    view_insert_doc_query_results(
+        DocId, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
 
 
 write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq, 
InitialBuild) ->


Reply via email to