This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 144cbd40f8a23064a8fc2991eca0eb18f4167611 Author: Russell Branca <chewbra...@apache.org> AuthorDate: Mon Dec 18 16:13:21 2023 -0800 Pass tests and toggle csrt --- src/chttpd/src/chttpd.erl | 2 ++ .../src/couch_stats_resource_tracker.erl | 33 ++++++++++++++++------ src/rexi/src/rexi_monitor.erl | 1 + src/rexi/src/rexi_server.erl | 8 +++++- src/rexi/src/rexi_utils.erl | 2 ++ 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 78b41a710..16cda1a58 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -221,6 +221,7 @@ stop() -> mochiweb_http:stop(?MODULE). handle_request(MochiReq0) -> + couch_util:clear_pdict(), %% Make sure we start clean, everytime erlang:put(?REWRITE_COUNT, 0), MochiReq = couch_httpd_vhost:dispatch_host(MochiReq0), handle_request_int(MochiReq). @@ -377,6 +378,7 @@ after_request(HttpReq, HttpResp0) -> HttpResp2 = update_stats(HttpReq, HttpResp1), chttpd_stats:report(HttpReq, HttpResp2), maybe_log(HttpReq, HttpResp2), + %%couch_stats_resource_tracker:close_context(), HttpResp2. process_request(#httpd{mochi_req = MochiReq} = HttpReq) -> diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 59d2a9de3..2f3a34f2f 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -60,6 +60,8 @@ sorted_by/2, sorted_by/3, + find_by_pid/1, + unsafe_foldl/3, term_to_flat_json/1 @@ -322,17 +324,23 @@ should_track(_Metric) -> %%io:format("SKIPPING METRIC: ~p~n", [Metric]), false. -accumulate_delta(Delta) -> +accumulate_delta(Delta) when is_map(Delta) -> %% TODO: switch to creating a batch of updates to invoke a single %% update_counter rather than sequentially invoking it for each field - is_enabled() andalso maps:foreach(fun inc/2, Delta). + is_enabled() andalso maps:foreach(fun inc/2, Delta); +accumulate_delta(undefined) -> + ok; +accumulate_delta(Other) -> + io:format("CSRT:ACC_DELTA UNKNOWN DELTA: ~p~n", [Other]). + update_counter(Field, Count) -> is_enabled() andalso update_counter(get_pid_ref(), Field, Count). update_counter({_Pid,_Ref}=PidRef, Field, Count) -> - is_enabled() andalso ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). + %% TODO: mem3 crashes without catch, why do we lose the stats table? + is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). active() -> active_int(all). @@ -638,10 +646,15 @@ create_context(Pid) -> Ref = make_ref(), Rctx = make_record(Pid, Ref), track(Rctx), - ets:insert(?MODULE, Rctx), + create_resource(Rctx), Rctx end. + +create_resource(#rctx{} = Rctx) -> + %% true = ets:insert(?MODULE, Rctx). + catch ets:insert(?MODULE, Rctx). + %% add type to disnguish coordinator vs rpc_worker create_context(From, {M,F,_A} = MFA, Nonce) -> case is_enabled() of @@ -659,7 +672,7 @@ create_context(From, {M,F,_A} = MFA, Nonce) -> }, track(Rctx), erlang:put(?DELTA_TZ, Rctx), - true = ets:insert(?MODULE, Rctx), + create_resource(Rctx), Rctx end. @@ -683,7 +696,7 @@ create_coordinator_context(#httpd{} = Req, Path) -> }, track(Rctx), erlang:put(?DELTA_TZ, Rctx), - true = ets:insert(?MODULE, Rctx), + create_resource(Rctx), Rctx end. @@ -831,7 +844,7 @@ get_resource() -> get_resource(get_pid_ref()). get_resource(PidRef) -> - case ets:lookup(?MODULE, PidRef) of + catch case ets:lookup(?MODULE, PidRef) of [#rctx{}=TP] -> TP; [] -> @@ -847,6 +860,10 @@ find_unmonitored() -> [PR || #rctx{pid_ref=PR} <- ets:match_object(?MODULE, #rctx{mon_ref=undefined, _ = '_'})]. +find_by_pid(Pid) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. + + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -947,7 +964,7 @@ log_process_lifetime_report(PidRef) -> #rctx{} = Rctx = get_resource(PidRef), %% TODO: catch error out of here, report crashes on depth>1 json %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), - couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)). + is_enabled() andalso couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)). %% Reimplementation of: https://github.com/erlang/otp/blob/b2ee4fc9a0b81a139dad2033e9b2bfc178146886/lib/stdlib/src/ets.erl#L633-L658 diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl index 7fe66db71..72f0985df 100644 --- a/src/rexi/src/rexi_monitor.erl +++ b/src/rexi/src/rexi_monitor.erl @@ -35,6 +35,7 @@ start(Procs) -> %% messages from our mailbox. -spec stop(pid()) -> ok. stop(MonitoringPid) -> + unlink(MonitoringPid), MonitoringPid ! {self(), shutdown}, flush_down_messages(). diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 94a4fad30..e56046bf7 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -201,7 +201,13 @@ find_worker(Ref, Tab) -> end. notify_caller({Caller, Ref}, Reason, Delta) -> - rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}, {delta, Delta}}). + Msg = case couch_stats_resource_tracker:is_enabled() of + true -> + {Ref, {rexi_EXIT, Reason}, {delta, Delta}}; + false -> + {Ref, {rexi_EXIT, Reason}} + end, + rexi_utils:send(Caller, Msg). kill_worker(FromRef, #st{clients = Clients} = St) -> case find_worker(FromRef, Clients) of diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 4d81475df..56c5a69b8 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -14,6 +14,8 @@ -export([server_id/1, server_pid/1, send/2, recv/6]). +-export([extract_delta/1]). + %% @doc Return a rexi_server id for the given node. server_id(Node) -> case config:get_boolean("rexi", "server_per_node", true) of