This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch else in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git
commit 7095e0d07109417dfd7ca561207a884e53a09590 Author: ILYA Khlopotov <iil...@apache.org> AuthorDate: Fri Feb 22 16:06:32 2019 +0000 Fix compiler warnings --- src/ken_server.erl | 69 +++++++++++++++++++++++++++++++++++++----------- test/ken_server_test.erl | 2 -- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/ken_server.erl b/src/ken_server.erl index 91f18c1..f45acfb 100644 --- a/src/ken_server.erl +++ b/src/ken_server.erl @@ -36,7 +36,7 @@ server, % Pid of either view group or search index worker_pid = nil, seq = 0, - lru = now() + lru = erlang:monotonic_time() }). -record(state, { @@ -117,7 +117,7 @@ init(_) -> ets:new(ken_resubmit, [named_table]), ets:new(ken_workers, [named_table, public, {keypos, #job.name}]), Limit = list_to_integer(config("limit", "20")), - {ok, #state{pruned_last = now(), limit = Limit}}. + {ok, #state{pruned_last = erlang:monotonic_time(), limit = Limit}}. terminate(_Reason, _State) -> ok. @@ -166,19 +166,22 @@ handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = % hastings_index:await will trigger a hastings index update {Pid, _} = erlang:spawn_monitor(hastings_index, await, [GPid, Seq]), - ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + Now = erlang:monotonic_time(), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}), {noreply, State, 0}; % search index job names have 3 elements. See job record definition. handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) -> % dreyfus_index:await will trigger a search index update. {Pid, _} = erlang:spawn_monitor(dreyfus_index, await, [GPid, Seq]), - ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + Now = erlang:monotonic_time(), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}), {noreply, State, 0}; handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) -> % couch_index:get_state/2 will trigger a view group index update. {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]), - ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + Now = erlang:monotonic_time(), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}), {noreply, State, 0}; handle_cast(Msg, State) -> @@ -200,8 +203,11 @@ handle_info(start_event_handler, State) -> {noreply, State, 0}; handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) -> - case timer:now_diff(now(), Last) of - X when X > (1000 * I) -> + Now = erlang:monotonic_time(), + Interval = erlang:convert_time_unit( + State#state.delay, millisecond, native), + case Now - Last > Interval of + true -> NewState = prune_worker_table(State); _ -> NewState = State @@ -269,8 +275,7 @@ get_active_count() -> % If any indexing job fails, resubmit requests for all indexes. update_db_indexes(Name, State) -> {ok, DDocs} = design_docs(Name), - random:seed(now()), - RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]), + RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]), Resubmit = lists:foldl(fun({_, DDoc}, Acc) -> JsonDDoc = couch_doc:from_json_obj(DDoc), case update_ddoc_indexes(Name, JsonDDoc, State) of @@ -447,7 +452,8 @@ should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) -> false end; [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] -> - DeltaT = timer:now_diff(now(), LRU) / 1000, + Now = erlang:monotonic_time(), + DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond), if A < BatchChannels, (Seq - OldSeq) >= BS -> true; @@ -509,12 +515,14 @@ resubmit(Delay, DbName) -> end. prune_worker_table(State) -> - {A, B, _} = now(), - C = (1000000 * A) + B - 0.001 * State#state.delay, - MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'}, - Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C}, + % remove all entries older than specified `delay` in milliseconds + Delay = erlang:convert_time_unit(State#state.delay, millisecond, native), + C = erlang:monotonic_time() - Delay, + %% fun(#job{worker_pid=nil, lru=A) when A < C -> true end + MatchHead = #job{worker_pid=nil, lru='$1', _='_'}, + Guard = {'<', '$1', C}, ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]), - State#state{pruned_last = now()}. + State#state{pruned_last = erlang:monotonic_time()}. allowed_languages() -> Config = config:get("query_servers") ++ config:get("native_query_servers"), @@ -522,3 +530,34 @@ allowed_languages() -> config(Key, Default) -> config:get("ken", Key, Default). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + + +prune_old_entries_test() -> + { + setup, + fun() -> + ets:new(ken_workers, [named_table, public, {keypos, #job.name}]) + end, + fun(_) -> + catch ets:delete(ken_workers) + end, + ?_test(begin + lists:foreach(fun(Idx) -> + ets:insert(ken_workers, #job{name=Idx}), + timer:sleep(100) + end, lists:seq(1, 3)), + prune_worker_table(#state{delay=250}), + ?assertEqual( + [2, 3], + lists:usort( + [N || #job{name = N} <- ets:tab2list(ken_workers)]) + ), + ok + end) + }. + +-endif. diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl index 1d2af7a..eed3484 100644 --- a/test/ken_server_test.erl +++ b/test/ken_server_test.erl @@ -12,8 +12,6 @@ -module(ken_server_test). --compile([export_all]). - -include_lib("eunit/include/eunit.hrl"). %% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000