This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-v2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit aabc801e0a2a3ec8ef2280dc2a10432f63ca1850 Author: Russell Branca <[email protected]> AuthorDate: Mon Jul 29 16:14:58 2024 -0700 Rework CSRT post experimentation --- src/chttpd/src/chttpd.erl | 1 + .../src/couch_stats_resource_tracker.erl | 886 +++++++++++++++++++++ src/rexi/src/rexi_server.erl | 2 +- 3 files changed, 888 insertions(+), 1 deletion(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 14f14e53e..5911cd0af 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -418,6 +418,7 @@ handle_req_after_auth(HandlerKey, HttpReq) -> possibly_hack(HttpReq), fun chttpd_auth_request:authorize_request/1 ), + couch_stats_resource_tracker:set_context_username(AuthorizedReq), {AuthorizedReq, HandlerFun(AuthorizedReq)} catch ErrorType:Error:Stack -> diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl new file mode 100644 index 000000000..2e3915371 --- /dev/null +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -0,0 +1,886 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stats_resource_tracker). + +-behaviour(gen_server). + +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +%% PidRef API +-export([ + get_pid_ref/0, + set_pid_ref/1, + create_pid_ref/0, + close_pid_ref/0, close_pid_ref/1 +]). + +%% Context API +-export([ + create_resource/1, + create_context/5, + create_coordinator_context/2, + create_worker_context/3, + destroy_context/0, destroy_context/1, + + get_resource/0, get_resource/1, get_resource_raw/1, + + set_context_dbname/1, set_context_dbname/2, + set_context_handler_fun/1, set_context_handler_fun/2, + set_context_username/1, set_context_username/2 +]). + +%% stats collection api +-export([ + is_enabled/0, + + inc/1, inc/2, + maybe_inc/2, + accumulate_delta/1, + make_delta/0, + + ioq_called/0, + + should_track/1 +]). + +%% aggregate query api +-export([ + active/0, + active_coordinators/0, + active_workers/0, + + count_by/1, + group_by/2, group_by/3, + sorted/1, + sorted_by/1, sorted_by/2, sorted_by/3, + + find_by_pid/1, + find_by_pidref/1, + find_workers_by_pidref/1 +]). + +%% Process lifetime reporting api +-export([ + log_process_lifetime_report/1, + is_logging_enabled/0, + logging_enabled/0, + should_log/1, should_log/2, + tracker/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + +%% Module pdict markers +-define(DELTA_TA, csrt_delta_ta). +-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0 +-define(PID_REF, csrt_pid_ref). %% track local ID +-define(TRACKER_PID, csrt_tracker). %% tracker pid + +-define(MANGO_EVAL_MATCH, mango_eval_match). +-define(DB_OPEN_DOC, docs_read). +-define(DB_OPEN, db_open). +-define(COUCH_SERVER_OPEN, db_open). +-define(COUCH_BT_GET_KP_NODE, get_kp_node). +-define(COUCH_BT_GET_KV_NODE, get_kv_node). +-define(COUCH_JS_FILTER, js_filter). +-define(COUCH_JS_FILTER_ERROR, js_filter_error). +-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). +-define(IOQ_CALLS, ioq_calls). +-define(ROWS_READ, rows_read). + +%% TODO: overlap between this and couch btree fold invocations +%% TODO: need some way to distinguish fols on views vs find vs all_docs +-define(FRPC_CHANGES_ROW, changes_processed). +-define(FRPC_CHANGES_RETURNED, changes_returned). + +-record(st, {}). + +-record(rctx, { + %% Metadata + started_at = tnow(), + updated_at = tnow(), + pid_ref, + mfa, + nonce, + from, + type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc + dbname, + username, + path, + + %% Stats counters + db_open = 0, + docs_read = 0, + rows_read = 0, + changes_processed = 0, + changes_returned = 0, + ioq_calls = 0, + io_bytes_read = 0, + io_bytes_written = 0, + js_evals = 0, + js_filter = 0, + js_filter_error = 0, + js_filtered_docs = 0, + mango_eval_match = 0, + %% TODO: switch record definitions to be macro based, eg: + %% ?COUCH_BT_GET_KP_NODE = 0, + get_kv_node = 0, + get_kp_node = 0 +}). + +%% +%% Public API +%% + +%% +%% PidRef operations +%% + +get_pid_ref() -> + get(?PID_REF). + +set_pid_ref(PidRef) -> + erlang:put(?PID_REF, PidRef), + PidRef. + +create_pid_ref() -> + case get_pid_ref() of + undefined -> + ok; + PidRef0 -> + %% TODO: what to do when it already exists? + throw({epidexist, PidRef0}), + close_pid_ref(PidRef0) + end, + PidRef = {self(), make_ref()}, + set_pid_ref(PidRef), + PidRef. + +close_pid_ref() -> + close_pid_ref(get_pid_ref()). + +%%close_pid_ref(undefined) -> +%% undefined; +close_pid_ref(_PidRef) -> + erase(?PID_REF). + +get_resource() -> + get_resource(get_pid_ref()). + +get_resource(undefined) -> + undefined; +get_resource(PidRef) -> + catch get_resource_raw(PidRef). + +get_resource_raw(undefined) -> + undefined; +get_resource_raw(PidRef) -> + case ets:lookup(?MODULE, PidRef) of + [#rctx{}=Rctx] -> + Rctx; + [] -> + undefined + end. + +%% monotonic time now in millisecionds +tnow() -> + erlang:monotonic_time(millisecond). + +is_enabled() -> + config:get_boolean(?MODULE_STRING, "enabled", true). + +%% +%% Aggregate query API +%% + +active() -> active_int(all). +active_coordinators() -> active_int(coordinators). +active_workers() -> active_int(workers). + + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + lists:map(fun to_json/1, ets:tab2list(?MODULE)). + + +select_by_type(coordinators) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(workers) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(all) -> + lists:map(fun to_json/1, ets:tab2list(?MODULE)). + +find_by_pid(Pid) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. + +find_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref=PidRef, _ = '_'})]. + +find_workers_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{from=PidRef, _ = '_'})]. + +field(#rctx{pid_ref=Val}, pid_ref) -> Val; +%% NOTE: Pros and cons to doing these convert functions here +%% Ideally, this would be done later so as to prefer the core data structures +%% as long as possible, but we currently need the output of this function to +%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by +%% structure provided by the caller of *_by aggregator functions below. +%% For now, we just always return jiffy:encode'able data types. +field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val); +field(#rctx{nonce=Val}, nonce) -> Val; +field(#rctx{from=Val}, from) -> Val; +field(#rctx{type=Val}, type) -> convert_type(Val); +field(#rctx{dbname=Val}, dbname) -> Val; +field(#rctx{username=Val}, username) -> Val; +field(#rctx{path=Val}, path) -> Val; +field(#rctx{db_open=Val}, db_open) -> Val; +field(#rctx{docs_read=Val}, docs_read) -> Val; +field(#rctx{rows_read=Val}, rows_read) -> Val; +field(#rctx{changes_processed=Val}, changes_processed) -> Val; +field(#rctx{changes_returned=Val}, changes_returned) -> Val; +field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; +field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; +field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; +field(#rctx{js_evals=Val}, js_evals) -> Val; +field(#rctx{js_filter=Val}, js_filter) -> Val; +field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; +field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; +field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; +field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; +field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. + +curry_field(Field) -> + fun(Ele) -> field(Ele, Field) end. + +count_by(KeyFun) -> + group_by(KeyFun, fun(_) -> 1 end). + +group_by(KeyFun, ValFun) -> + group_by(KeyFun, ValFun, fun erlang:'+'/2). + +%% eg: group_by(mfa, docs_read). +%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). +%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls). +%% eg: group_by([username, dbname, mfa], docs_read). +%% eg: group_by([username, dbname, mfa], ioq_calls). +%% eg: group_by([username, dbname, mfa], js_filters). +group_by(KeyL, ValFun, AggFun) when is_list(KeyL) -> + KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, + group_by(KeyFun, ValFun, AggFun); +group_by(Key, ValFun, AggFun) when is_atom(Key) -> + group_by(curry_field(Key), ValFun, AggFun); +group_by(KeyFun, Val, AggFun) when is_atom(Val) -> + group_by(KeyFun, curry_field(Val), AggFun); +group_by(KeyFun, ValFun, AggFun) -> + FoldFun = fun(Ele, Acc) -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + NewVal = AggFun(CurrVal, Val), + %% TODO: should we skip here? how to make this optional? + case NewVal > 0 of + true -> + maps:put(Key, NewVal, Acc); + false -> + Acc + end + end, + ets:foldl(FoldFun, #{}, ?MODULE). + +%% Sorts largest first +sorted(Map) when is_map(Map) -> + lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). + +shortened(L) -> + lists:sublist(L, 10). + +%% eg: sorted_by([username, dbname, mfa], ioq_calls) +%% eg: sorted_by([dbname, mfa], doc_reads) +sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). +sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). +sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). + +%% +%% Conversion API for outputting JSON +%% + +convert_mfa(MFA) when is_list(MFA) -> + list_to_binary(MFA); +convert_mfa({M0, F0, A0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + A = integer_to_binary(A0), + <<M/binary, ":", F/binary, "/", A/binary>>; +convert_mfa(null) -> + null; +convert_mfa(undefined) -> + null. + +convert_type(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +convert_type({coordinator, Verb0, Atom0}) when is_atom(Atom0) -> + Verb = atom_to_binary(Verb0), + Atom = atom_to_binary(Atom0), + <<"coordinator:", Verb/binary, ":", Atom/binary>>; +convert_type({coordinator, Verb0, Path0}) -> + Verb = atom_to_binary(Verb0), + Path = list_to_binary(Path0), + <<"coordinator:", Verb/binary, ":", Path/binary>>; +convert_type({worker, M0, F0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + <<"worker:", M/binary, ":", F/binary>>; +convert_type(null) -> + null; +convert_type(undefined) -> + null. + +convert_pidref({Parent0, ParentRef0}) -> + Parent = convert_pid(Parent0), + ParentRef = convert_ref(ParentRef0), + <<Parent/binary, ":", ParentRef/binary>>; +convert_pidref(null) -> + null; +convert_pidref(undefined) -> + null. + +convert_pid(Pid) when is_pid(Pid) -> + ?l2b(pid_to_list(Pid)). + +convert_ref(Ref) when is_reference(Ref) -> + ?l2b(ref_to_list(Ref)). + +to_json(#rctx{}=Rctx) -> + #rctx{ + updated_at = TP, + started_at = TInit, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + from = From, + dbname = DbName, + username = UserName, + db_open = DbOpens, + docs_read = DocsRead, + rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, + type = Type, + get_kp_node = KpNodes, + get_kv_node = KvNodes, + changes_returned = ChangesReturned, + ioq_calls = IoqCalls + } = Rctx, + + #{ + updated_at => TP, + started_at => TInit, + pid_ref => convert_pidref(PidRef), + mfa => convert_mfa(MFA), + nonce => Nonce, + from => convert_pidref(From), + dbname => DbName, + username => UserName, + db_open => DbOpens, + docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, + rows_read => RowsRead, + type => convert_type(Type), + kp_nodes => KpNodes, + kv_nodes => KvNodes, + changes_returned => ChangesReturned, + ioq_calls => IoqCalls + }. + +%% +%% Context lifecycle API +%% + +create_resource(#rctx{} = Rctx) -> + catch ets:insert(?MODULE, Rctx). + +create_worker_context(From, {M,F,_A} = MFA, Nonce) -> + case is_enabled() of + true -> + create_context(MFA, {worker, M, F}, null, From, Nonce); + false -> + false + end. + +create_coordinator_context(#httpd{} = Req, Path0) -> + case is_enabled() of + true -> + #httpd{ + method = Verb, + nonce = Nonce + %%path_parts = Parts + } = Req, + %%Path = list_to_binary([$/ | io_lib:format("~p", [Parts])]), + Path = list_to_binary([$/ | Path0]), + Type = {coordinator, Verb, init}, + create_context(null, Type, Path, null, Nonce); + false -> + false + end. + +create_context(MFA, Type, Path, From, Nonce) -> + PidRef = create_pid_ref(), + Rctx = #rctx{ + from = From, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + path = Path, + type = Type + }, + erlang:put(?DELTA_TZ, Rctx), + create_resource(Rctx), + track(Rctx), + PidRef. + +set_context_dbname(DbName) -> + set_context_dbname(DbName, get_pid_ref()). + +set_context_dbname(_, undefined) -> + ok; +set_context_dbname(DbName, PidRef) -> + is_enabled() andalso update_element(PidRef, [{#rctx.dbname, DbName}]). + +set_context_handler_fun(Fun) when is_function(Fun) -> + set_context_handler_fun(Fun, get_pid_ref()). +set_context_handler_fun(_, undefined) -> + ok; +set_context_handler_fun(Fun, PidRef) when is_function(Fun) -> + case is_enabled() of + false -> + ok; + true -> + FunName = erlang:fun_to_list(Fun), + #rctx{type={coordinator, Verb, _}} = get_resource(), + Update = [{#rctx.type, {coordinator, Verb, FunName}}], + update_element(PidRef, Update) + end. + +set_context_username(null) -> + ok; +set_context_username(undefined) -> + ok; +set_context_username(User) -> + set_context_username(User, get_pid_ref()). + +set_context_username(null, _) -> + ok; +set_context_username(_, undefined) -> + ok; +set_context_username(#httpd{user_ctx = Ctx}, PidRef) -> + set_context_username(Ctx, PidRef); +set_context_username(#user_ctx{name = Name}, PidRef) -> + set_context_username(Name, PidRef); +set_context_username(UserName, PidRef) -> + io:format("SETTING USERNAME TO: ~p~n", [UserName]), + is_enabled() andalso update_element(PidRef, [{#rctx.username, UserName}]). + +destroy_context() -> + destroy_context(get_pid_ref()). + +destroy_context(undefined) -> + ok; +destroy_context({_, _} = PidRef) -> + stop_tracker(get_tracker()), + close_pid_ref(PidRef), + ok. + +%% Stat collection API + +inc(Key) -> + inc(Key, 1). + +%% TODO: inc(io_bytes_read, N) -> +%% TODO: inc(io_bytes_written, N) -> +%% TODO: inc(js_evals, N) -> +inc(?DB_OPEN, N) -> + update_counter(#rctx.?DB_OPEN, N); +inc(?ROWS_READ, N) -> + update_counter(#rctx.?ROWS_READ, N); +inc(?FRPC_CHANGES_RETURNED, N) -> + update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); +inc(?IOQ_CALLS, N) -> + update_counter(#rctx.?IOQ_CALLS, N); +inc(?COUCH_JS_FILTER, N) -> + update_counter(#rctx.?COUCH_JS_FILTER, N); +inc(?COUCH_JS_FILTER_ERROR, N) -> + update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N); +inc(?COUCH_JS_FILTERED_DOCS, N) -> + update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N); +inc(?MANGO_EVAL_MATCH, N) -> + update_counter(#rctx.?MANGO_EVAL_MATCH, N); +inc(?DB_OPEN_DOC, N) -> + update_counter(#rctx.?DB_OPEN_DOC, N); +inc(?FRPC_CHANGES_ROW, N) -> + update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read +inc(?COUCH_BT_GET_KP_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); +inc(?COUCH_BT_GET_KV_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N); +inc(_, _) -> + %% inc needs to allow unknown types to pass for accumulate_update to handle + %% updates from nodes with newer data formats + 0. + +maybe_inc([mango, evaluate_selector], Val) -> + inc(?MANGO_EVAL_MATCH, Val); +maybe_inc([couchdb, database_reads], Val) -> + inc(?DB_OPEN_DOC, Val); +maybe_inc([fabric_rpc, changes, processed], Val) -> + inc(?FRPC_CHANGES_ROW, Val); +maybe_inc([fabric_rpc, changes, returned], Val) -> + inc(?FRPC_CHANGES_RETURNED, Val); +maybe_inc([fabric_rpc, view, rows_read], Val) -> + inc(?ROWS_READ, Val); +maybe_inc([couchdb, couch_server, open], Val) -> + inc(?DB_OPEN, Val); +maybe_inc([couchdb, btree, kp_node], Val) -> + inc(?COUCH_BT_GET_KP_NODE, Val); +maybe_inc([couchdb, btree, kv_node], Val) -> + inc(?COUCH_BT_GET_KV_NODE, Val); +maybe_inc([couchdb, query_server, js_filter_error], Val) -> + inc(?COUCH_JS_FILTER_ERROR, Val); +maybe_inc([couchdb, query_server, js_filter], Val) -> + inc(?COUCH_JS_FILTER, Val); +maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> + inc(?COUCH_JS_FILTERED_DOCS, Val); +maybe_inc(_Metric, _Val) -> + %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), + 0. + +%% TODO: update stats_descriptions.cfg for relevant apps +should_track([fabric_rpc, all_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, processed]) -> + is_enabled(); +should_track([fabric_rpc, changes, returned]) -> + is_enabled(); +should_track([fabric_rpc, map_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, reduce_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, get_all_security, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_doc, spawned]) -> + is_enabled(); +should_track([fabric_rpc, update_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_shard, spawned]) -> + is_enabled(); +should_track([mango_cursor, view, all_docs]) -> + is_enabled(); +should_track([mango_cursor, view, idx]) -> + is_enabled(); +should_track(_Metric) -> + %%io:format("SKIPPING METRIC: ~p~n", [Metric]), + false. + +ioq_called() -> + is_enabled() andalso inc(ioq_calls). + +accumulate_delta(Delta) when is_map(Delta) -> + %% TODO: switch to creating a batch of updates to invoke a single + %% update_counter rather than sequentially invoking it for each field + is_enabled() andalso maps:foreach(fun inc/2, Delta); +accumulate_delta(undefined) -> + ok. + +make_delta() -> + TA = case get(?DELTA_TA) of + undefined -> + %% Need to handle this better, can't just make a new T0 at T' as + %% the timestamps will be identical causing a divide by zero error. + %% + %% Realistically need to ensure that all invocations of database + %% operations sets T0 appropriately. Perhaps it's possible to do + %% this is the couch_db:open chain, and then similarly, in + %% couch_server, and uhhhh... couch_file, and... + %% + %% I think we need some type of approach for establishing a T0 that + %% doesn't result in outrageous deltas. For now zero out the + %% microseconds field, or subtract a second on the off chance that + %% microseconds is zero. I'm not uptodate on the latest Erlang time + %% libraries and don't remember how to easily get an + %% `os:timestamp()` out of now() - 100ms or some such. + %% + %% I think it's unavoidable that we'll have some codepaths that do + %% not properly instantiate the T0 at spawn resulting in needing to + %% do some time of "time warp" or ignoring the timing collection + %% entirely. Perhaps if we hoisted out the stats collection into + %% the primary flow of the database and funnel that through all the + %% function clauses we could then utilize Dialyzer to statically + %% analyze and assert all code paths that invoke database + %% operations have properly instantinated a T0 at the appropriate + %% start time such that we don't have to "fudge" deltas with a + %% missing start point, but we're a long ways from that happening + %% so I feel it necessary to address the NULL start time. + + %% Track how often we fail to initiate T0 correctly + %% Perhaps somewhat naughty we're incrementing stats from within + %% couch_stats itself? Might need to handle this differently + %% TODO: determine appropriate course of action here + %% io:format("~n**********MISSING STARTING DELTA************~n~n", []), + couch_stats:increment_counter( + [couchdb, csrt, delta_missing_t0]), + %%[couch_stats_resource_tracker, delta_missing_t0]), + + case erlang:get(?DELTA_TZ) of + undefined -> + TA0 = make_delta_base(), + %% TODO: handline missing deltas, otherwise divide by zero + set_delta_a(TA0), + TA0; + TA0 -> + TA0 + end; + #rctx{} = TA0 -> + TA0 + end, + TB = get_resource(), + Delta = make_delta(TA, TB), + set_delta_a(TB), + Delta. + +make_delta(#rctx{}=TA, #rctx{}=TB) -> + Delta = #{ + docs_read => TB#rctx.docs_read - TA#rctx.docs_read, + js_filter => TB#rctx.js_filter - TA#rctx.js_filter, + js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, + js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, + rows_read => TB#rctx.rows_read - TA#rctx.rows_read, + changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, + get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, + get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, + db_open => TB#rctx.db_open - TA#rctx.db_open, + ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, + dt => TB#rctx.updated_at - TA#rctx.updated_at + }, + %% TODO: reevaluate this decision + %% Only return non zero (and also positive) delta fields + maps:filter(fun(_K,V) -> V > 0 end, Delta); +make_delta(_, #rctx{}) -> + #{error => missing_beg_rctx}; +make_delta(#rctx{}, _) -> + #{error => missing_fin_rctx}. + +%% TODO: what to do when PidRef=undefined? +make_delta_base(PidRef) -> + %% TODO: extract user_ctx and db/shard from request + Now = tnow(), + #rctx{ + pid_ref = PidRef, + %% TODO: confirm this subtraction works + started_at = Now - 100, %% give us 100ms rewind time for missing T0 + updated_at = Now + }. + +make_delta_base() -> + make_delta_base(get_pid_ref()). + +set_delta_a(TA) -> + erlang:put(?DELTA_TA, TA). + +update_counter(Field, Count) -> + is_enabled() andalso update_counter(get_pid_ref(), Field, Count). + +update_counter(undefined, _Field, _Count) -> + ok; +update_counter({_Pid,_Ref}=PidRef, Field, Count) -> + %% TODO: mem3 crashes without catch, why do we lose the stats table? + is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). + +update_element(undefined, _Update) -> + ok; +update_element({_Pid,_Ref}=PidRef, Update) -> + %% TODO: should we take any action when the update fails? + is_enabled() andalso catch ets:update_element(?MODULE, PidRef, Update). + +%% Process lifetime logging api + +track(#rctx{pid_ref=PidRef}) -> + case get_tracker() of + undefined -> + Pid = spawn(?MODULE, tracker, [PidRef]), + put_tracker(Pid), + Pid; + Pid when is_pid(Pid) -> + Pid + end. + +tracker({Pid, _Ref}=PidRef) -> + MonRef = erlang:monitor(process, Pid), + receive + stop -> + %% TODO: do we need cleanup here? + log_process_lifetime_report(PidRef), + catch evict(PidRef), + demonitor(MonRef), + ok; + {'DOWN', MonRef, _Type, _0DPid, _Reason0} -> + %% TODO: should we pass reason to log_process_lifetime_report? + %% Reason = case Reason0 of + %% {shutdown, Shutdown0} -> + %% Shutdown = atom_to_binary(Shutdown0), + %% <<"shutdown: ", Shutdown/binary>>; + %% Reason0 -> + %% Reason0 + %% end, + log_process_lifetime_report(PidRef), + catch evict(PidRef) + end. + +log_process_lifetime_report(PidRef) -> + %% TODO: catch error out of here, report crashes on depth>1 json + %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), + %% TODO: clean this up + case is_enabled() andalso is_logging_enabled() of + true -> + Rctx = get_resource_raw(PidRef), + case should_log(Rctx) of + true -> + couch_log:report("csrt-pid-usage-lifetime", to_json(Rctx)); + _ -> + ok + end; + false -> + ok + end. + +is_logging_enabled() -> + logging_enabled() =/= false. + +logging_enabled() -> + case conf_get("log_pid_usage_report", "coordinator") of + "coordinator" -> + coordinator; + "true" -> + true; + _ -> + false + end. + +should_log(undefined) -> + false; +should_log(#rctx{}=Rctx) -> + should_log(Rctx, logging_enabled()). + +should_log(undefined, _) -> + false; +should_log(#rctx{}, true) -> + true; +should_log(#rctx{}, false) -> + false; +should_log(#rctx{type = {coordinator, _, _}}, coordinator) -> + true; +should_log(#rctx{type = {worker, fabric_rpc, FName}}, _) -> + case conf_get("log_fabric_rpc") of + "true" -> + true; + undefined -> + false; + Name -> + Name =:= atom_to_list(FName) + end; +should_log(#rctx{}, _) -> + false. + +%% +%% gen_server callbacks +%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(?MODULE, [ + named_table, + public, + {decentralized_counters, true}, + {write_concurrency, true}, + {read_concurrency, true}, + {keypos, #rctx.pid_ref} + ]), + {ok, #st{}}. + +handle_call(fetch, _from, #st{} = St) -> + {reply, {ok, St}, St}; +handle_call({call_search, _}, _From, St) -> + %% TODO: provide isolated search queries here + {reply, ok, St}; +handle_call(Msg, _From, St) -> + {stop, {unknown_call, Msg}, St}. + +handle_cast(Msg, St) -> + {stop, {unknown_cast, Msg}, St}. + +handle_info(Msg, St) -> + {stop, {unknown_info, Msg}, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +%% +%% private functions +%% + +conf_get(Key) -> + conf_get(Key, undefined). + + +conf_get(Key, Default) -> + config:get(?MODULE_STRING, Key, Default). + +%% +%% Process lifetime logging api +%% + +get_tracker() -> + get(?TRACKER_PID). + +put_tracker(Pid) when is_pid(Pid) -> + put(?TRACKER_PID, Pid). + +evict(PidRef) -> + ets:delete(?MODULE, PidRef). + +stop_tracker(undefined) -> + ok; +stop_tracker(Pid) when is_pid(Pid) -> + Pid ! stop. + diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index b25f4c0b2..ea35e1047 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -136,7 +136,7 @@ init_p(From, {M, F, A}, Nonce) -> put('$initial_call', MFA), put(nonce, Nonce), try - couch_stats_resource_tracker:create_context(From, MFA, Nonce), + couch_stats_resource_tracker:create_worker_context(From, MFA, Nonce), couch_stats:maybe_track_rexi_init_p(MFA), apply(M, F, A) catch
