Author: fdmanana Date: Sun Jul 10 12:11:15 2011 New Revision: 1144844 URL: http://svn.apache.org/viewvc?rev=1144844&view=rev Log: Some view indexer optimizations
Changes: - Decoding of the JSON received from the external OS process (couchjs) is now done in the indexer's process which does the writes to disk. This avoids several phases of EJSON term copying: couch_os_process -> do_maps process -> work queue -> do_writes process; - The indexer's do_maps process no longer needs to find out which seq number is the highest from the input list via a list comprehension + lists:max/1 call. The highest is always the last one on the queue (this is enforced when folding the database's byseq btree); - Don't update the task status everytime we read a single doc from the database. Instead update it only after writing a view KV batch to disk; - Avoid several linear passages over the same lists multiple times. This is part of COUCHDB-1186. Modified: couchdb/trunk/src/couchdb/couch_os_process.erl couchdb/trunk/src/couchdb/couch_query_servers.erl couchdb/trunk/src/couchdb/couch_view_group.erl couchdb/trunk/src/couchdb/couch_view_updater.erl Modified: couchdb/trunk/src/couchdb/couch_os_process.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_os_process.erl?rev=1144844&r1=1144843&r2=1144844&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_os_process.erl (original) +++ couchdb/trunk/src/couchdb/couch_os_process.erl Sun Jul 10 12:11:15 2011 @@ -85,22 +85,51 @@ writejson(OsProc, Data) when is_record(O true = writeline(OsProc, JsonData). readjson(OsProc) when is_record(OsProc, os_proc) -> - Line = readline(OsProc), + Line = iolist_to_binary(readline(OsProc)), ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), - case ?JSON_DECODE(Line) of - [<<"log">>, Msg] when is_binary(Msg) -> - % we got a message to log. Log it and continue - ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]), - readjson(OsProc); - [<<"error">>, Id, Reason] -> - throw({error, {couch_util:to_existing_atom(Id),Reason}}); - [<<"fatal">>, Id, Reason] -> - ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",[OsProc#os_proc.port, Id, Reason]), - throw({couch_util:to_existing_atom(Id),Reason}); - Result -> - Result + try + % Don't actually parse the whole JSON. Just try to see if it's + % a command or a doc map/reduce/filter/show/list/update output. + % If it's a command then parse the whole JSON and execute the + % command, otherwise return the raw JSON line to the caller. + pick_command(Line) + catch + throw:abort -> + {json, Line}; + throw:{cmd, _Cmd} -> + case ?JSON_DECODE(Line) of + [<<"log">>, Msg] when is_binary(Msg) -> + % we got a message to log. Log it and continue + ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]), + readjson(OsProc); + [<<"error">>, Id, Reason] -> + throw({error, {couch_util:to_existing_atom(Id),Reason}}); + [<<"fatal">>, Id, Reason] -> + ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p", + [OsProc#os_proc.port, Id, Reason]), + throw({couch_util:to_existing_atom(Id),Reason}); + _Result -> + {json, Line} + end end. +pick_command(Line) -> + json_stream_parse:events(Line, fun pick_command0/1). + +pick_command0(array_start) -> + fun pick_command1/1; +pick_command0(_) -> + throw(abort). + +pick_command1(<<"log">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(<<"error">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(<<"fatal">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(_) -> + throw(abort). + % gen_server API init([Command, Options, PortOptions]) -> Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=1144844&r1=1144843&r2=1144844&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_query_servers.erl (original) +++ couchdb/trunk/src/couchdb/couch_query_servers.erl Sun Jul 10 12:11:15 2011 @@ -16,7 +16,7 @@ -export([start_link/0, config_change/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). --export([start_doc_map/3, map_docs/2, stop_doc_map/1]). +-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). -export([filter_view/3]). @@ -82,6 +82,10 @@ map_docs(Proc, Docs) -> Docs), {ok, Results}. +map_doc_raw(Proc, Doc) -> + Json = couch_doc:to_json_obj(Doc, []), + {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}. + stop_doc_map(nil) -> ok; @@ -494,9 +498,21 @@ proc_with_ddoc(DDoc, DDocKey, LangProcs) end. proc_prompt(Proc, Args) -> - {Mod, Func} = Proc#proc.prompt_fun, + case proc_prompt_raw(Proc, Args) of + {json, Json} -> + ?JSON_DECODE(Json); + EJson -> + EJson + end. + +proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) -> apply(Mod, Func, [Proc#proc.pid, Args]). +raw_to_ejson({json, Json}) -> + ?JSON_DECODE(Json); +raw_to_ejson(EJson) -> + EJson. + proc_stop(Proc) -> {Mod, Func} = Proc#proc.stop_fun, apply(Mod, Func, [Proc#proc.pid]). Modified: couchdb/trunk/src/couchdb/couch_view_group.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=1144844&r1=1144843&r2=1144844&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_view_group.erl (original) +++ couchdb/trunk/src/couchdb/couch_view_group.erl Sun Jul 10 12:11:15 2011 @@ -236,16 +236,22 @@ handle_cast({partial_update, Pid, NewGro = State) -> #group_state{ db_name = DbName, - waiting_commit = WaitingCommit + waiting_commit = WaitingCommit, + group = Group } = State, NewSeq = NewGroup#group.current_seq, - ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, - DbName, NewGroup#group.name]), - if not WaitingCommit -> - erlang:send_after(1000, self(), delayed_commit); - true -> ok - end, - {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; + case NewSeq > Group#group.current_seq of + true -> + ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, + DbName, NewGroup#group.name]), + if not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, + {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; + false -> + {noreply, State} + end; handle_cast({partial_update, _, _}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}. Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=1144844&r1=1144843&r2=1144844&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_view_updater.erl (original) +++ couchdb/trunk/src/couchdb/couch_view_updater.erl Sun Jul 10 12:11:15 2011 @@ -24,19 +24,14 @@ update(Owner, Group, DbName) -> current_seq = Seq, purge_seq = PurgeSeq } = Group, - couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), {ok, Db} = couch_db:open_int(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), - Group2 = if DbPurgeSeq == PurgeSeq -> - Group; + ok; DbPurgeSeq == PurgeSeq + 1 -> - couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Group, Db); + ok; true -> - couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), - couch_db:close(Db), exit(reset) end, {ok, MapQueue} = couch_work_queue:new( @@ -44,13 +39,30 @@ update(Owner, Group, DbName) -> {ok, WriteQueue} = couch_work_queue:new( [{max_size, 100000}, {max_items, 500}]), Self = self(), - ViewEmptyKVs = [{View, []} || View <- Group2#group.views], - spawn_link(fun() -> do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) end), - spawn_link(fun() -> do_writes(Self, Owner, Group2, WriteQueue, Seq == 0) end), - % compute on all docs modified since we last computed. + spawn_link(fun() -> + do_maps(add_query_server(Group), MapQueue, WriteQueue) + end), TotalChanges = couch_db:count_changes_since(Db, Seq), - % update status every half second - couch_task_status:set_update_frequency(500), + spawn_link(fun() -> + couch_task_status:add_task( + <<"View Group Indexer">>, + <<DbName/binary, " ", GroupName/binary>>, + <<"Starting index update">>), + couch_task_status:set_update_frequency(500), + Group2 = + if DbPurgeSeq == PurgeSeq + 1 -> + couch_task_status:update(<<"Removing purged entries from view index.">>), + purge_index(Group, Db); + true -> + Group + end, + ViewEmptyKVs = [{View, []} || View <- Group2#group.views], + do_writes(Self, Owner, Group2, WriteQueue, + Seq == 0, ViewEmptyKVs, 0, TotalChanges), + couch_task_status:set_update_frequency(0), + couch_task_status:update("Finishing.") + end), + % compute on all docs modified since we last computed. #group{ design_options = DesignOptions } = Group, IncludeDesign = couch_util:get_value(<<"include_design">>, DesignOptions, false), @@ -64,15 +76,11 @@ update(Owner, Group, DbName) -> = couch_db:enum_docs_since( Db, Seq, - fun(DocInfo, _, ChangesProcessed) -> - couch_task_status:update("Processed ~p of ~p changes (~p%)", - [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]), + fun(DocInfo, _, Acc) -> load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign), - {ok, ChangesProcessed+1} + {ok, Acc} end, - 0, []), - couch_task_status:set_update_frequency(0), - couch_task_status:update("Finishing."), + ok, []), couch_work_queue:close(MapQueue), couch_db:close(Db), receive {new_group, NewGroup} -> @@ -81,6 +89,16 @@ update(Owner, Group, DbName) -> end. +add_query_server(#group{query_server = nil} = Group) -> + {ok, Qs} = couch_query_servers:start_doc_map( + Group#group.def_lang, + [View#view.def || View <- Group#group.views], + Group#group.lib), + Group#group{query_server = Qs}; +add_query_server(Group) -> + Group. + + purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], @@ -133,96 +151,83 @@ load_doc(Db, DocInfo, MapQueue, DocOpts, end end. -do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> +do_maps(#group{query_server = Qs} = Group, MapQueue, WriteQueue) -> case couch_work_queue:dequeue(MapQueue) of closed -> couch_work_queue:close(WriteQueue), couch_query_servers:stop_doc_map(Group#group.query_server); {ok, Queue} -> - Docs = [Doc || {_,#doc{deleted=false}=Doc} <- Queue], - DelKVs = [{Id, []} || {_, #doc{deleted=true,id=Id}} <- Queue], - LastSeq = lists:max([Seq || {Seq, _Doc} <- Queue]), - {Group1, Results} = view_compute(Group, Docs), - {ViewKVs, DocIdViewIdKeys} = view_insert_query_results(Docs, - Results, ViewEmptyKVs, DelKVs), - couch_work_queue:queue(WriteQueue, {LastSeq, ViewKVs, DocIdViewIdKeys}), - do_maps(Group1, MapQueue, WriteQueue, ViewEmptyKVs) + lists:foreach( + fun({Seq, #doc{id = Id, deleted = true}}) -> + Item = {Seq, Id, []}, + ok = couch_work_queue:queue(WriteQueue, Item); + ({Seq, #doc{id = Id, deleted = false} = Doc}) -> + {ok, Result} = couch_query_servers:map_doc_raw(Qs, Doc), + Item = {Seq, Id, Result}, + ok = couch_work_queue:queue(WriteQueue, Item) + end, + Queue), + do_maps(Group, MapQueue, WriteQueue) end. -do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> +do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs, + ChangesDone, TotalChanges) -> case couch_work_queue:dequeue(WriteQueue) of closed -> Parent ! {new_group, Group}; {ok, Queue} -> - {NewSeq, ViewKeyValues, DocIdViewIdKeys} = lists:foldl( - fun({Seq, ViewKVs, DocIdViewIdKeys}, nil) -> - {Seq, ViewKVs, DocIdViewIdKeys}; - ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) -> - {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc, - AccViewKVs2 = lists:zipwith( - fun({View, KVsIn}, {_View, KVsAcc}) -> - {View, KVsIn ++ KVsAcc} - end, ViewKVs, AccViewKVs), - {lists:max([Seq, Seq2]), - AccViewKVs2, DocIdViewIdKeys ++ AccDocIdViewIdKeys} - end, nil, Queue), - Group2 = write_changes(Group, ViewKeyValues, DocIdViewIdKeys, NewSeq, - InitialBuild), + {ViewKVs, DocIdViewIdKeys} = lists:foldr( + fun({_Seq, Id, []}, {ViewKVsAcc, DocIdViewIdKeysAcc}) -> + {ViewKVsAcc, [{Id, []} | DocIdViewIdKeysAcc]}; + ({_Seq, Id, RawQueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc}) -> + QueryResults = [ + [list_to_tuple(FunResult) || FunResult <- FunRs] || FunRs <- + couch_query_servers:raw_to_ejson(RawQueryResults) + ], + {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results( + Id, QueryResults, ViewKVsAcc, [], []), + {NewViewKVs, [{Id, NewViewIdKeys} | DocIdViewIdKeysAcc]} + end, + {ViewEmptyKVs, []}, Queue), + {NewSeq, _, _} = lists:last(Queue), + Group2 = write_changes( + Group, ViewKVs, DocIdViewIdKeys, NewSeq, InitialBuild), case Owner of - nil -> ok; - _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) + nil -> + ok; + _ -> + ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) end, - do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) + ChangesDone2 = ChangesDone + length(Queue), + couch_task_status:update("Processed ~p of ~p changes (~p%)", + [ChangesDone2, TotalChanges, (ChangesDone2 * 100) div TotalChanges]), + do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild, ViewEmptyKVs, + ChangesDone2, TotalChanges) end. -view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> - {ViewKVs, DocIdViewIdKeysAcc}; -view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> - {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), - NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], - view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). - -view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> +view_insert_doc_query_results(_DocId, [], [], ViewKVsAcc, ViewIdKeysAcc) -> {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; -view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> +view_insert_doc_query_results(DocId, [ResultKVs | RestResults], + [{View, KVs} | RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> % Take any identical keys and combine the values - ResultKVs2 = lists:foldl( - fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> - case Key == PrevKey of - true -> - case PrevVal of - {dups, Dups} -> - [{PrevKey, {dups, [Value|Dups]}} | AccRest]; - _ -> - [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] - end; - false -> - [{Key,Value},{PrevKey,PrevVal}|AccRest] - end; - (KV, []) -> - [KV] - end, [], lists:sort(ResultKVs)), - NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], + {NewKVs, NewViewIdKeys} = lists:foldl( + fun({Key, Val}, {[{{Key, _DocId} = Kd, PrevVal} | AccRest], AccVid}) -> + AccKv2 = case PrevVal of + {dups, Dups} -> + [{Kd, {dups, [Val | Dups]}} | AccRest]; + _ -> + [{Kd, {dups, [Val, PrevVal]}} | AccRest] + end, + {AccKv2, [{View#view.id_num, Key} | AccVid]}; + ({Key, Val}, {AccKv, AccVid}) -> + {[{{Key, DocId}, Val} | AccKv], [{View#view.id_num, Key} | AccVid]} + end, + {[], []}, lists:sort(ResultKVs)), NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], - NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, - view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). - -view_compute(Group, []) -> - {Group, []}; -view_compute(#group{def_lang=DefLang, lib=Lib, query_server=QueryServerIn}=Group, Docs) -> - {ok, QueryServer} = - case QueryServerIn of - nil -> % doc map not started - Definitions = [View#view.def || View <- Group#group.views], - couch_query_servers:start_doc_map(DefLang, Definitions, Lib); - _ -> - {ok, QueryServerIn} - end, - {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), - {Group#group{query_server=QueryServer}, Results}. - + view_insert_doc_query_results( + DocId, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq, InitialBuild) ->