Repository: couchdb-couch-index Updated Branches: refs/heads/master ee21d0181 -> 53555fd90
Do not use config subscription in couch_index COUCHDB-3135 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/commit/a4db124b Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/tree/a4db124b Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/diff/a4db124b Branch: refs/heads/master Commit: a4db124b097089b89fe26cd9a60b88b9f38dc3c0 Parents: ee21d01 Author: ILYA Khlopotov <iil...@ca.ibm.com> Authored: Tue Sep 6 13:07:28 2016 -0700 Committer: ILYA Khlopotov <iil...@ca.ibm.com> Committed: Tue Sep 6 13:19:33 2016 -0700 ---------------------------------------------------------------------- src/couch_index.erl | 47 +++++++++++++---------------------------------- 1 file changed, 13 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/a4db124b/src/couch_index.erl ---------------------------------------------------------------------- diff --git a/src/couch_index.erl b/src/couch_index.erl index a5958b9..57dddee 100644 --- a/src/couch_index.erl +++ b/src/couch_index.erl @@ -13,13 +13,12 @@ -module(couch_index). -behaviour(gen_server). --vsn(2). +-vsn(3). %% API -export([start_link/1, stop/1, get_state/2, get_info/1]). -export([trigger_update/2]). -export([compact/1, compact/2, get_compactor_pid/1]). --export([config_change/3]). %% gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -30,8 +29,6 @@ -define(CHECK_INTERVAL, 600000). % 10 minutes --define(RELISTEN_DELAY, 5000). --define(CONFIG_SUBSCRIPTION, [{"query_server_config", "commit_freq"}]). -record(st, { mod, @@ -39,7 +36,6 @@ updater, compactor, waiters=[], - commit_delay, committed=true, shutdown=false }). @@ -81,12 +77,7 @@ compact(Pid, Options) -> get_compactor_pid(Pid) -> gen_server:call(Pid, get_compactor_pid). -config_change("query_server_config", "commit_freq", NewValue) -> - ok = gen_server:cast(?MODULE, {config_update, NewValue}). - - init({Mod, IdxState}) -> - ok = config:subscribe_for_changes(?CONFIG_SUBSCRIPTION), DbName = Mod:get(db_name, IdxState), erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), Resp = couch_util:with_db(DbName, fun(Db) -> @@ -102,14 +93,11 @@ init({Mod, IdxState}) -> {ok, NewIdxState} -> {ok, UPid} = couch_index_updater:start_link(self(), Mod), {ok, CPid} = couch_index_compactor:start_link(self(), Mod), - Delay = config:get("query_server_config", "commit_freq", "5"), - MsDelay = 1000 * list_to_integer(Delay), State = #st{ mod=Mod, idx_state=NewIdxState, updater=UPid, - compactor=CPid, - commit_delay=MsDelay + compactor=CPid }, Args = [ Mod:get(db_name, IdxState), @@ -206,9 +194,6 @@ handle_call({compacted, NewIdxState}, _From, State) -> {reply, ok, commit_compacted(NewIdxState, State)} end. -handle_cast({config_change, NewDelay}, State) -> - MsDelay = 1000 * list_to_integer(NewDelay), - {noreply, State#st{commit_delay=MsDelay}}; handle_cast({trigger_update, UpdateSeq}, State) -> #st{ mod=Mod, @@ -233,8 +218,7 @@ handle_cast({updated, NewIdxState}, State) -> handle_cast({new_state, NewIdxState}, State) -> #st{ mod=Mod, - idx_state=OldIdxState, - commit_delay=Delay + idx_state=OldIdxState } = State, assert_signature_match(Mod, OldIdxState, NewIdxState), CurrSeq = Mod:get(update_seq, NewIdxState), @@ -246,7 +230,7 @@ handle_cast({new_state, NewIdxState}, State) -> couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args), Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); + true -> erlang:send_after(commit_delay(), self(), commit); false -> ok end, {noreply, State#st{ @@ -293,7 +277,7 @@ handle_cast(_Mesg, State) -> handle_info(commit, #st{committed=true}=State) -> {noreply, State}; handle_info(commit, State) -> - #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State, + #st{mod=Mod, idx_state=IdxState} = State, DbName = Mod:get(db_name, IdxState), IdxName = Mod:get(idx_name, IdxState), GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, @@ -311,7 +295,7 @@ handle_info(commit, State) -> % forever out of sync with the database. But a crash before we % commit these changes, no big deal, we only lose incremental % changes since last committal. - erlang:send_after(Delay, self(), commit), + erlang:send_after(commit_delay(), self(), commit), {noreply, State} end; handle_info(maybe_close, State) -> @@ -341,15 +325,7 @@ handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], couch_log:info("Index shutdown by monitor notice for db: ~s idx: ~s", Args), catch send_all(State#st.waiters, shutdown), - {stop, normal, State#st{waiters=[]}}; -handle_info({config_change, "query_server_config", "commit_freq", NewDelay, _}, State) -> - handle_cast({config_change, NewDelay}, State); -handle_info({gen_event_EXIT, _Handler, _Reason}, State) -> - erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener), - {noreply, State}; -handle_info(restart_config_listener, State) -> - ok = config:subscribe_for_changes(?CONFIG_SUBSCRIPTION), - {noreply, State}. + {stop, normal, State#st{waiters=[]}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -390,8 +366,7 @@ commit_compacted(NewIdxState, State) -> #st{ mod=Mod, idx_state=OldIdxState, - updater=Updater, - commit_delay=Delay + updater=Updater } = State, {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState), % Restart the indexer if it's running. @@ -400,7 +375,7 @@ commit_compacted(NewIdxState, State) -> false -> ok end, case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); + true -> erlang:send_after(commit_delay(), self(), commit); false -> ok end, State#st{ @@ -436,6 +411,10 @@ get_value(Section, Key) -> undefined -> undefined end. +commit_delay() -> + config:get_integer("query_server_config", "commit_freq", 5) * 1000. + + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl").