Fix config listener event handler registration We rely on `gen_event:add_sup_handler/3` to remove handlers when the process that registered for events exits. On master this was changed so that config becomes the process that's monitored by gen_event. As such any handler that is registered (say, for when an index is opened) adds a handler to the config_event gen_event process. Since the config process never exits these handlers are never removed.
The end result of all of this is that on a busy cluster the config_event process will end up with millions of handlers consuming many gigabytes of RAM. This change creates a monitor process for every event handler. This monitors the process wanting to listen for config changes and exits when the requesting process exits. This means that we maintain our pure callback API improvement while correctly removing handlers. COUCHDB-3096 Project: http://git-wip-us.apache.org/repos/asf/couchdb-config/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-config/commit/b1b56054 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-config/tree/b1b56054 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-config/diff/b1b56054 Branch: refs/heads/master Commit: b1b56054752287c806d9cf5927d1a5f02e09c420 Parents: ab55181 Author: Paul J. Davis <[email protected]> Authored: Fri Aug 5 14:41:58 2016 -0500 Committer: Paul J. Davis <[email protected]> Committed: Wed Aug 10 14:10:58 2016 -0500 ---------------------------------------------------------------------- src/config.erl | 19 ++------- src/config_listener_mon.erl | 84 ++++++++++++++++++++++++++++++++++++++++ test/config_tests.erl | 63 +++++++++++++++++++++++++++++- 3 files changed, 150 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-config/blob/b1b56054/src/config.erl ---------------------------------------------------------------------- diff --git a/src/config.erl b/src/config.erl index 5c9ac6f..c0d0446 100644 --- a/src/config.erl +++ b/src/config.erl @@ -183,7 +183,8 @@ delete(Section, Key, Persist, Reason) when is_list(Section), is_list(Key) -> listen_for_changes(CallbackModule, InitialState) -> - gen_server:call(?MODULE, {listen_for_changes, CallbackModule, InitialState}). + config_listener_mon:subscribe(CallbackModule, InitialState). + init(IniFiles) -> ets:new(?MODULE, [named_table, set, protected, {read_concurrency, true}]), @@ -258,26 +259,14 @@ handle_call(reload, _From, Config) -> ets:delete(?MODULE, K) end end, nil, ?MODULE), - {reply, ok, Config}; -handle_call({listen_for_changes, CallbackModule, InitialState}, - {Subscriber, _}, Config) -> - Reply = config_listener:start(CallbackModule, {Subscriber, InitialState}), - {reply, Reply, Config}. + {reply, ok, Config}. handle_cast(stop, State) -> {stop, normal, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({gen_event_EXIT, {config_listener, Module}, shutdown}, State) -> - couch_log:notice("config_listener(~p) stopped with reason: shutdown~n", [Module]), - {noreply, State}; -handle_info({gen_event_EXIT, {config_listener, Module}, normal}, State) -> - couch_log:info("config_listener(~p) stopped with reason: shutdown~n", [Module]), - {noreply, State}; -handle_info({gen_event_EXIT, {config_listener, Module}, Reason}, State) -> - couch_log:error("config_listener(~p) stopped with reason: ~p~n", [Module, Reason]), - {noreply, State}; + handle_info(Info, State) -> couch_log:error("config:handle_info Info: ~p~n", [Info]), {noreply, State}. http://git-wip-us.apache.org/repos/asf/couchdb-config/blob/b1b56054/src/config_listener_mon.erl ---------------------------------------------------------------------- diff --git a/src/config_listener_mon.erl b/src/config_listener_mon.erl new file mode 100644 index 0000000..70c2707 --- /dev/null +++ b/src/config_listener_mon.erl @@ -0,0 +1,84 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(config_listener_mon). +-behaviour(gen_server). +-vsn(1). + + +-export([ + subscribe/2 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-record(st, { + pid, + ref +}). + + +subscribe(Module, InitSt) -> + proc_lib:start(?MODULE, init, [{self(), Module, InitSt}]). + + +init({Pid, Mod, InitSt}) -> + Ref = erlang:monitor(process, Pid), + case config_listener:start(Mod, {Mod, Pid}, {Pid, InitSt}) of + ok -> + proc_lib:init_ack(ok), + gen_server:enter_loop(?MODULE, [], #st{pid = Pid, ref = Ref}); + Else -> + proc_lib:init_ack(Else) + end. + + +terminate(_Reason, _St) -> + ok. + + +handle_call(_Message, _From, St) -> + {reply, ignored, St}. + + +handle_cast(_Message, St) -> + {noreply, St}. + + +handle_info({'DOWN', Ref, _, _, _}, #st{ref = Ref} = St) -> + {stop, normal, St}; + +handle_info({gen_event_EXIT, {config_listener, Module}, Reason}, St) -> + Level = case Reason of + normal -> debug; + shutdown -> debug; + _ -> error + end, + Fmt = "config_listener(~p) for ~p stopped with reason: ~r~n", + couch_log:Level(Fmt, [Module, St#st.pid, Reason]), + {stop, shutdown, St}; + +handle_info(_, St) -> + {noreply, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. http://git-wip-us.apache.org/repos/asf/couchdb-config/blob/b1b56054/test/config_tests.erl ---------------------------------------------------------------------- diff --git a/test/config_tests.erl b/test/config_tests.erl index 2c7d13f..311a805 100644 --- a/test/config_tests.erl +++ b/test/config_tests.erl @@ -100,6 +100,9 @@ handle_config_change("update_state", Key, Value, Persist, {Pid, State}) -> Pid ! {config_msg, {{"update_state", Key, Value, Persist}, State}}, {ok, {Pid, Key}}; +handle_config_change("throw_error", _Key, _Value, _Persist, {_Pid, _State}) -> + throw(this_is_an_error); + handle_config_change(Section, Key, Value, Persist, {Pid, State}) -> Pid ! {config_msg, {{Section, Key, Value, Persist}, State}}, {ok, {Pid, State}}. @@ -241,7 +244,9 @@ config_listener_behaviour_test_() -> fun should_pass_correct_state_to_handle_config_terminate/1, fun should_pass_subscriber_pid_to_handle_config_terminate/1, fun should_not_call_handle_config_after_related_process_death/1, - fun should_remove_handler_when_requested/1 + fun should_remove_handler_when_requested/1, + fun should_remove_handler_when_pid_exits/1, + fun should_stop_monitor_on_error/1 ] } }. @@ -456,6 +461,62 @@ should_remove_handler_when_requested(Pid) -> end). +should_remove_handler_when_pid_exits(Pid) -> + ?_test(begin + ?assertEqual(2, n_handlers()), + + % Monitor the config_listener_mon process + {monitored_by, [Mon]} = process_info(Pid, monitored_by), + MonRef = erlang:monitor(process, Mon), + + % Kill the process synchronously + PidRef = erlang:monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', PidRef, _, _, _} -> ok + after ?TIMEOUT -> + erlang:error({timeout, config_listener_death}) + end, + + % Wait for the config_listener_mon process to + % exit to indicate the handler has been removed. + receive + {'DOWN', MonRef, _, _, normal} -> ok + after ?TIMEOUT -> + erlang:error({timeout, config_listener_mon_death}) + end, + + ?assertEqual(1, n_handlers()) + end). + + +should_stop_monitor_on_error(Pid) -> + ?_test(begin + ?assertEqual(2, n_handlers()), + + % Monitor the config_listener_mon process + {monitored_by, [Mon]} = process_info(Pid, monitored_by), + MonRef = erlang:monitor(process, Mon), + + % Have the process throw an error + ?assertEqual(ok, config:set("throw_error", "foo", "bar", false)), + + % Make sure handle_config_terminate is called + ?assertEqual({Pid, {error, this_is_an_error}, undefined}, getmsg(Pid)), + + % Wait for the config_listener_mon process to + % exit to indicate the handler has been removed + % due to an error + receive + {'DOWN', MonRef, _, _, shutdown} -> ok + after ?TIMEOUT -> + erlang:error({timeout, config_listener_mon_shutdown}) + end, + + ?assertEqual(1, n_handlers()) + end). + + spawn_config_listener() -> Self = self(), Pid = erlang:spawn(fun() ->
