nickva commented on code in PR #5491:
URL: https://github.com/apache/couchdb/pull/5491#discussion_r2080507613
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
+enabled = true
+
+; CSRT Rexi Server Init P tracking
+; Enable these to enable additional metrics for RPC worker spawn rates
+; Mod and Function are separated by double underscores
+[csrt.init_p]
+enabled = false
+fabric_rpc__all_docs = true
+fabric_rpc__changes = true
+fabric_rpc__map_view = true
+fabric_rpc__reduce_view = true
+fabric_rpc__get_all_security = true
+fabric_rpc__open_doc = true
+fabric_rpc__update_docs = true
+fabric_rpc__open_shard = true
Review Comment:
Very nice, it would be great to see detailed reports on what these workers
are doing!
##########
src/couch/src/couch_os_process.erl:
##########
@@ -258,6 +259,12 @@ bump_time_stat(Stat, USec) when is_atom(Stat),
is_integer(USec) ->
couch_stats:increment_counter([couchdb, query_server, calls, Stat]),
couch_stats:increment_counter([couchdb, query_server, time, Stat], USec).
+bump_volume_stat(ddoc_filter = Stat, Docs) when is_atom(Stat), is_list(Docs) ->
+ couch_stats:increment_counter([couchdb, query_server, volume, Stat],
length(Docs));
+bump_volume_stat(_, _) ->
+ %% TODO: handle other stats?
+ ok.
+
Review Comment:
At first it seems odd to count filtered docs in the couch_os_process, it
seems this would go somewhere at the higher level. We don't count mapped docs
and such here for instance?
In general wouldn't filtered docs match the changed feed processed docs for
a particular docs, maybe we can skip the stat altogether?
##########
src/rexi/src/rexi_utils.erl:
##########
@@ -60,6 +60,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef,
PerMsgTO) ->
process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
receive
+ Msg ->
Review Comment:
It's worrying that we could be receiving all the messages here? Maybe there
is a low(no?) chance we'd get stray messages from other part of the code?
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
Review Comment:
Maybe add a quick comment what it is, what it does, and why a user might
want to enable it and if there are any downsides or trade-offs from it.
##########
src/couch_stats/CSRT.md:
##########
@@ -0,0 +1 @@
+# Couch Stats Resource Tracker (CSRT)
Review Comment:
We should either add a readme file or not have one at all, just the title is
probably not worth having.
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
+enabled = true
Review Comment:
In the `default.ini`, if possible, keep the values at defaults but commented
out. Usually only the section headings are uncomment. See the rest of the
default.ini for the pattern.
##########
README.rst:
##########
@@ -1,6 +1,7 @@
Apache CouchDB README
=====================
+
Review Comment:
Tiny nit: this is not related to the commit? Let's skip it.
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
+enabled = true
+
+; CSRT Rexi Server Init P tracking
+; Enable these to enable additional metrics for RPC worker spawn rates
+; Mod and Function are separated by double underscores
+[csrt.init_p]
+enabled = false
+fabric_rpc__all_docs = true
Review Comment:
Instead of a double underscore, would a more traditional erlang colon work:
`mod:fun = true|false`?
##########
src/chttpd/src/chttpd.erl:
##########
@@ -400,6 +407,7 @@ process_request(#httpd{mochi_req = MochiReq} = HttpReq) ->
RawUri = MochiReq:get(raw_path),
try
+ csrt:set_context_handler_fun(?MODULE, ?FUNCTION_NAME),
Review Comment:
I think we already set it before calling process_request in the
`handle_request_int/1` callback?
##########
src/config/src/config_listener_mon.erl:
##########
@@ -13,6 +13,8 @@
-module(config_listener_mon).
-behaviour(gen_server).
+-dialyzer({nowarn_function, init/1}).
Review Comment:
What does this do, it doesn't seem related to the PR per-se?
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
+enabled = true
+
+; CSRT Rexi Server Init P tracking
+; Enable these to enable additional metrics for RPC worker spawn rates
+; Mod and Function are separated by double underscores
+[csrt.init_p]
+enabled = false
+fabric_rpc__all_docs = true
+fabric_rpc__changes = true
+fabric_rpc__map_view = true
+fabric_rpc__reduce_view = true
+fabric_rpc__get_all_security = true
+fabric_rpc__open_doc = true
+fabric_rpc__update_docs = true
+fabric_rpc__open_shard = true
+
+;; CSRT dbname matchers
Review Comment:
Use a `;` for comments for consistency.
The typical format is
```ini
; Some comment with a space after semicolon
; continuation...
;value = default
```
##########
rel/overlay/etc/default.ini:
##########
@@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true
+
+; Couch Stats Resource Tracker (CSRT)
+[csrt]
+enabled = true
+
+; CSRT Rexi Server Init P tracking
Review Comment:
Tiny nit: unless someone has read the rexi erlang code they won't know what
"Init P" means. Maybe "CSRT rexi server worker process tracking"?
Would users know that additional stuff they'd be able to see if this enabled
vs just `csrt enabled = true`
##########
src/couch_stats/src/csrt_util.erl:
##########
@@ -0,0 +1,485 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_util).
+
+-export([
+ is_enabled/0,
+ is_enabled_init_p/0,
+ get_pid_ref/0,
+ get_pid_ref/1,
+ set_pid_ref/1,
+ should_track_init_p/2,
+ tnow/0,
+ tutc/0,
+ tutc/1
+]).
+
+%% JSON Conversion API
+-export([
+ convert_type/1,
+ convert_pidref/1,
+ convert_pid/1,
+ convert_ref/1,
+ to_json/1
+]).
+
+%% Delta API
+-export([
+ add_delta/2,
+ extract_delta/1,
+ get_delta/1,
+ get_delta_a/0,
+ get_delta_zero/0,
+ maybe_add_delta/1,
+ maybe_add_delta/2,
+ make_delta/1,
+ make_dt/2,
+ make_dt/3,
+ rctx_delta/2,
+ set_delta_a/1,
+ set_delta_zero/1
+]).
+
+%% Extra niceties and testing facilities
+-export([
+ set_fabric_init_p/2,
+ set_fabric_init_p/3,
+ map_to_rctx/1,
+ field/2
+]).
+
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-spec is_enabled() -> boolean().
+is_enabled() ->
+ config:get_boolean(?CSRT, "enabled", true).
+
+-spec is_enabled_init_p() -> boolean().
+is_enabled_init_p() ->
+ config:get_boolean(?CSRT_INIT_P, "enabled", true).
+
+-spec should_track_init_p(Mod :: atom(), Func :: atom()) -> boolean().
+should_track_init_p(fabric_rpc, Func) ->
+ is_enabled_init_p() andalso config:get_boolean(?CSRT_INIT_P,
fabric_conf_key(Func), false);
+should_track_init_p(_Mod, _Func) ->
+ false.
+
+%% Monotnonic time now in native format using time forward only event tracking
+-spec tnow() -> integer().
+tnow() ->
+ erlang:monotonic_time().
+
+%% Get current system time in UTC RFC 3339 format
+-spec tutc() -> calendar:rfc3339_string().
+tutc() ->
+ tutc(tnow()).
+
+%% Convert a integer system time in milliseconds into UTC RFC 3339 format
+-spec tutc(Time :: integer()) -> calendar:rfc3339_string().
+tutc(Time0) when is_integer(Time0) ->
+ Unit = millisecond,
+ Time1 = Time0 + erlang:time_offset(),
+ Time = erlang:convert_time_unit(Time1, native, Unit),
+ calendar:system_time_to_rfc3339(Time, [{unit, Unit}, {offset, "z"}]).
+
+%% Returns dt (delta time) in microseconds
+%% @equiv make_dt(A, B, microsecond)
+-spec make_dt(A, B) -> pos_integer() when
+ A :: integer(),
+ B :: integer().
+make_dt(A, B) ->
+ make_dt(A, B, microsecond).
+
+%% Returns monotonic dt (delta time) in specified time_unit()
+-spec make_dt(A, B, Unit) -> pos_integer() when
+ A :: integer(),
+ B :: integer(),
+ Unit :: erlang:time_unit().
+make_dt(A, A, _Unit) when is_integer(A) ->
+ %% Handle edge case when monotonic_time()'s are equal
+ %% Always return a non zero value so we don't divide by zero
+ %% This always returns 1, independent of unit, as that's the smallest
+ %% possible positive integer value delta.
+ 1;
+make_dt(A, B, Unit) when is_integer(A) andalso is_integer(B) andalso B > A ->
+ A1 = erlang:convert_time_unit(A, native, Unit),
+ B1 = erlang:convert_time_unit(B, native, Unit),
+ B1 - A1.
+
+%%
+%% Conversion API for outputting JSON
+%%
+
+-spec convert_type(T) -> binary() | null when
+ T :: #coordinator{} | #rpc_worker{} | undefined.
+convert_type(#coordinator{method = Verb0, path = Path, mod = M0, func = F0}) ->
+ M = atom_to_binary(M0),
+ F = atom_to_binary(F0),
+ Verb = atom_to_binary(Verb0),
+ <<"coordinator-{", M/binary, ":", F/binary, "}:", Verb/binary, ":",
Path/binary>>;
+convert_type(#rpc_worker{mod = M0, func = F0, from = From0}) ->
+ M = atom_to_binary(M0),
+ F = atom_to_binary(F0),
+ From = convert_pidref(From0),
+ <<"rpc_worker-{", From/binary, "}:", M/binary, ":", F/binary>>;
+convert_type(undefined) ->
+ null.
+
+-spec convert_pidref(PidRef) -> binary() | null when
+ PidRef :: {A :: pid(), B :: reference()} | undefined.
+convert_pidref({Parent0, ParentRef0}) ->
+ Parent = convert_pid(Parent0),
+ ParentRef = convert_ref(ParentRef0),
+ <<Parent/binary, ":", ParentRef/binary>>;
+%%convert_pidref(null) ->
+%% null;
+convert_pidref(undefined) ->
+ null.
+
+-spec convert_pid(Pid :: pid()) -> binary().
+convert_pid(Pid) when is_pid(Pid) ->
+ list_to_binary(pid_to_list(Pid)).
+
+-spec convert_ref(Ref :: reference()) -> binary().
+convert_ref(Ref) when is_reference(Ref) ->
+ list_to_binary(ref_to_list(Ref)).
+
+-spec to_json(Rctx :: rctx()) -> map().
+to_json(#rctx{} = Rctx) ->
+ #{
+ updated_at => tutc(Rctx#rctx.updated_at),
+ started_at => tutc(Rctx#rctx.started_at),
+ pid_ref => convert_pidref(Rctx#rctx.pid_ref),
+ nonce => Rctx#rctx.nonce,
+ dbname => Rctx#rctx.dbname,
+ username => Rctx#rctx.username,
+ db_open => Rctx#rctx.db_open,
+ docs_read => Rctx#rctx.docs_read,
+ docs_written => Rctx#rctx.docs_written,
+ js_filter => Rctx#rctx.js_filter,
+ js_filtered_docs => Rctx#rctx.js_filtered_docs,
+ rows_read => Rctx#rctx.rows_read,
+ type => convert_type(Rctx#rctx.type),
+ get_kp_node => Rctx#rctx.get_kp_node,
+ get_kv_node => Rctx#rctx.get_kv_node,
+ write_kp_node => Rctx#rctx.write_kp_node,
+ write_kv_node => Rctx#rctx.write_kv_node,
+ changes_returned => Rctx#rctx.changes_returned,
+ changes_processed => Rctx#rctx.changes_processed,
+ ioq_calls => Rctx#rctx.ioq_calls
+ }.
+
+%% NOTE: this does not do the inverse of to_json, should it convert types?
+-spec map_to_rctx(Map :: map()) -> rctx().
+map_to_rctx(Map) ->
+ maps:fold(fun map_to_rctx_field/3, #rctx{}, Map).
+
+-spec map_to_rctx_field(Field :: rctx_field(), Val :: any(), Rctx :: rctx())
-> rctx().
+map_to_rctx_field(updated_at, Val, Rctx) ->
+ Rctx#rctx{updated_at = Val};
+map_to_rctx_field(started_at, Val, Rctx) ->
+ Rctx#rctx{started_at = Val};
+map_to_rctx_field(pid_ref, Val, Rctx) ->
+ Rctx#rctx{pid_ref = Val};
+map_to_rctx_field(nonce, Val, Rctx) ->
+ Rctx#rctx{nonce = Val};
+map_to_rctx_field(dbname, Val, Rctx) ->
+ Rctx#rctx{dbname = Val};
+map_to_rctx_field(username, Val, Rctx) ->
+ Rctx#rctx{username = Val};
+map_to_rctx_field(db_open, Val, Rctx) ->
+ Rctx#rctx{db_open = Val};
+map_to_rctx_field(docs_read, Val, Rctx) ->
+ Rctx#rctx{docs_read = Val};
+map_to_rctx_field(docs_written, Val, Rctx) ->
+ Rctx#rctx{docs_written = Val};
+map_to_rctx_field(js_filter, Val, Rctx) ->
+ Rctx#rctx{js_filter = Val};
+map_to_rctx_field(js_filtered_docs, Val, Rctx) ->
+ Rctx#rctx{js_filtered_docs = Val};
+map_to_rctx_field(rows_read, Val, Rctx) ->
+ Rctx#rctx{rows_read = Val};
+map_to_rctx_field(type, Val, Rctx) ->
+ Rctx#rctx{type = Val};
+map_to_rctx_field(get_kp_node, Val, Rctx) ->
+ Rctx#rctx{get_kp_node = Val};
+map_to_rctx_field(get_kv_node, Val, Rctx) ->
+ Rctx#rctx{get_kv_node = Val};
+map_to_rctx_field(write_kp_node, Val, Rctx) ->
+ Rctx#rctx{write_kp_node = Val};
+map_to_rctx_field(write_kv_node, Val, Rctx) ->
+ Rctx#rctx{write_kv_node = Val};
+map_to_rctx_field(changes_returned, Val, Rctx) ->
+ Rctx#rctx{changes_returned = Val};
+map_to_rctx_field(changes_processed, Val, Rctx) ->
+ Rctx#rctx{changes_processed = Val};
+map_to_rctx_field(ioq_calls, Val, Rctx) ->
+ Rctx#rctx{ioq_calls = Val}.
+
+-spec field(Field :: rctx_field(), Rctx :: rctx()) -> any().
+field(updated_at, #rctx{updated_at = Val}) ->
+ Val;
+field(started_at, #rctx{started_at = Val}) ->
+ Val;
+field(pid_ref, #rctx{pid_ref = Val}) ->
+ Val;
+field(nonce, #rctx{nonce = Val}) ->
+ Val;
+field(dbname, #rctx{dbname = Val}) ->
+ Val;
+field(username, #rctx{username = Val}) ->
+ Val;
+field(db_open, #rctx{db_open = Val}) ->
+ Val;
+field(docs_read, #rctx{docs_read = Val}) ->
+ Val;
+field(docs_written, #rctx{docs_written = Val}) ->
+ Val;
+field(js_filter, #rctx{js_filter = Val}) ->
+ Val;
+field(js_filtered_docs, #rctx{js_filtered_docs = Val}) ->
+ Val;
+field(rows_read, #rctx{rows_read = Val}) ->
+ Val;
+field(type, #rctx{type = Val}) ->
+ Val;
+field(get_kp_node, #rctx{get_kp_node = Val}) ->
+ Val;
+field(get_kv_node, #rctx{get_kv_node = Val}) ->
+ Val;
+field(changes_returned, #rctx{changes_returned = Val}) ->
+ Val;
+field(changes_processed, #rctx{changes_processed = Val}) ->
+ Val;
+field(ioq_calls, #rctx{ioq_calls = Val}) ->
+ Val.
+
+add_delta({A}, Delta) -> {A, Delta};
+add_delta({A, B}, Delta) -> {A, B, Delta};
+add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
+add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
+add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
+add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
+add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
+add_delta(T, _Delta) -> T.
+
+extract_delta({A, {delta, Delta}}) -> {{A}, Delta};
+extract_delta({A, B, {delta, Delta}}) -> {{A, B}, Delta};
+extract_delta({A, B, C, {delta, Delta}}) -> {{A, B, C}, Delta};
+extract_delta({A, B, C, D, {delta, Delta}}) -> {{A, B, C, D}, Delta};
+extract_delta({A, B, C, D, E, {delta, Delta}}) -> {{A, B, C, D, E}, Delta};
+extract_delta({A, B, C, D, E, F, {delta, Delta}}) -> {{A, B, C, D, E, F},
Delta};
+extract_delta({A, B, C, D, E, F, G, {delta, Delta}}) -> {{A, B, C, D, E, F,
G}, Delta};
+extract_delta(T) -> {T, undefined}.
Review Comment:
This looks a bit unusual, maybe we can add an extra map to the messages
which can have an optional `delta`: val KV?
##########
src/chttpd/src/chttpd.erl:
##########
@@ -369,6 +373,7 @@ handle_request_int(MochiReq) ->
before_request(HttpReq) ->
try
+ csrt:set_context_handler_fun(?MODULE, ?FUNCTION_NAME),
Review Comment:
I think we already set this before calling `before_request/1`?
##########
src/couch_stats/src/couch_stats_resource_tracker.hrl:
##########
@@ -0,0 +1,174 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(CSRT, "csrt").
+-define(CSRT_INIT_P, "csrt.init_p").
+-define(CSRT_ETS, csrt_server).
+
+%% CSRT pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+-define(TRACKER_PID, csrt_tracker). %% tracker pid
+
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+-define(DB_OPEN, db_open).
+-define(COUCH_SERVER_OPEN, db_open).
+-define(COUCH_BT_GET_KP_NODE, get_kp_node).
+-define(COUCH_BT_GET_KV_NODE, get_kv_node).
+-define(COUCH_BT_WRITE_KP_NODE, write_kp_node).
+-define(COUCH_BT_WRITE_KV_NODE, write_kv_node).
+-define(COUCH_JS_FILTER, js_filter).
+-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs).
+-define(IOQ_CALLS, ioq_calls).
+-define(DOCS_WRITTEN, docs_written).
+-define(ROWS_READ, rows_read).
+%% TODO: use dedicated changes_processed or use rows_read?
+%%-define(FRPC_CHANGES_PROCESSED, rows_read).
+-define(FRPC_CHANGES_PROCESSED, changes_processed).
+-define(FRPC_CHANGES_RETURNED, changes_returned).
Review Comment:
Hmm maybe rows_read/rows_returned considering filtering
Then if we have rows_read/rows_returned, could we skip the js_filter
counter, since for any given filtered request the rows we filtered should be
about `rows_read - rows_returned`
##########
src/couch_stats/src/couch_stats.erl:
##########
@@ -49,6 +54,11 @@ increment_counter(Name) ->
-spec increment_counter(any(), pos_integer()) -> response().
increment_counter(Name, Value) ->
+ %% Should maybe_track_local happen before or after notify?
+ %% If after, only currently tracked metrics declared in the app's
+ %% stats_description.cfg will be trackable locally. Pros/cons.
+ %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
Review Comment:
Debug left-over
##########
src/couch_stats/src/couch_stats_sup.erl:
##########
@@ -29,6 +29,8 @@ init([]) ->
{
{one_for_one, 5, 10}, [
?CHILD(couch_stats_server, worker),
+ ?CHILD(csrt_server, worker),
+ ?CHILD(csrt_logger, worker),
Review Comment:
Since CSRT lives in couch_stats it should be prefixed with `couch_stats_`. I
know don't always follow that rule, but most applications use the `$app_...`
prefix scheme.
##########
src/chttpd/src/chttpd_misc.erl:
##########
@@ -219,6 +220,112 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+handle_resource_status_req(#httpd{method = 'POST'} = Req) ->
+ ok = chttpd:verify_is_server_admin(Req),
+ chttpd:validate_ctype(Req, "application/json"),
+ {Props} = chttpd:json_body_obj(Req),
+ Action = proplists:get_value(<<"action">>, Props),
+ Key = proplists:get_value(<<"key">>, Props),
+ Val = proplists:get_value(<<"val">>, Props),
+
+ CountBy = fun csrt:count_by/1,
+ GroupBy = fun csrt:group_by/2,
+ SortedBy1 = fun csrt:sorted_by/1,
+ SortedBy2 = fun csrt:sorted_by/2,
+ ConvertEle = fun erlang:binary_to_existing_atom/1,
+ ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end,
+ ToJson = fun csrt_util:to_json/1,
+ JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end,
+
+ Fun =
+ case {Action, Key, Val} of
+ {<<"count_by">>, Keys, undefined} when is_list(Keys) ->
+ Keys1 = [ConvertEle(K) || K <- Keys],
+ fun() -> CountBy(Keys1) end;
+ {<<"count_by">>, Key, undefined} ->
+ Key1 = ConvertEle(Key),
+ fun() -> CountBy(Key1) end;
+ {<<"group_by">>, Keys, Vals} when is_list(Keys) andalso
is_list(Vals) ->
+ Keys1 = ConvertList(Keys),
+ Vals1 = ConvertList(Vals),
+ fun() -> GroupBy(Keys1, Vals1) end;
+ {<<"group_by">>, Key, Vals} when is_list(Vals) ->
+ Key1 = ConvertEle(Key),
+ Vals1 = ConvertList(Vals),
+ fun() -> GroupBy(Key1, Vals1) end;
+ {<<"group_by">>, Keys, Val} when is_list(Keys) ->
+ Keys1 = ConvertList(Keys),
+ Val1 = ConvertEle(Val),
+ fun() -> GroupBy(Keys1, Val1) end;
+ {<<"group_by">>, Key, Val} ->
+ Key1 = ConvertEle(Key),
+ Val1 = ConvertList(Val),
+ fun() -> GroupBy(Key1, Val1) end;
+ {<<"sorted_by">>, Key, undefined} ->
+ Key1 = ConvertEle(Key),
+ fun() -> JsonKeys(SortedBy1(Key1)) end;
+ {<<"sorted_by">>, Keys, undefined} when is_list(Keys) ->
+ Keys1 = [ConvertEle(K) || K <- Keys],
+ fun() -> JsonKeys(SortedBy1(Keys1)) end;
+ {<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso
is_list(Vals) ->
+ Keys1 = ConvertList(Keys),
+ Vals1 = ConvertList(Vals),
+ fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end;
+ {<<"sorted_by">>, Key, Vals} when is_list(Vals) ->
+ Key1 = ConvertEle(Key),
+ Vals1 = ConvertList(Vals),
+ fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end;
+ {<<"sorted_by">>, Keys, Val} when is_list(Keys) ->
+ Keys1 = ConvertList(Keys),
+ Val1 = ConvertEle(Val),
+ fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end;
+ {<<"sorted_by">>, Key, Val} ->
+ Key1 = ConvertEle(Key),
+ Val1 = ConvertList(Val),
+ fun() -> JsonKeys(SortedBy2(Key1, Val1)) end;
+ _ ->
+ throw({badrequest, invalid_resource_request})
+ end,
+
+ Fun1 = fun() ->
+ case Fun() of
+ Map when is_map(Map) ->
+ {maps:fold(
+ fun
+ %% TODO: Skip 0 value entries?
+ (_K, 0, A) -> A;
+ (K, V, A) -> [{ToJson(K), V} | A]
+ end,
+ [],
+ Map
+ )};
+ List when is_list(List) ->
+ List
+ end
+ end,
+
+ {Resp, _Bad} = rpc:multicall(erlang, apply, [
+ fun() ->
+ {node(), Fun1()}
+ end,
+ []
+ ]),
Review Comment:
Unless absolutely necessary, it's not a good idea to pass function closures
through the dist protocol. We've done with attachments in the past and had
since removed it.
It might be better to create a utility function, and do an
`erpc:multicall([node()|nodes()], mod, fun, [Action, Key, Val])`
See how we get active tasks for instance:
https://github.com/apache/couchdb/blob/c3789e97cebd6a1cb4872cb7d25fd3d7d64fb956/src/chttpd/src/chttpd_util.erl#L174-L188
##########
src/couch_stats/src/couch_stats.erl:
##########
@@ -100,6 +110,22 @@ stats() ->
now_sec() ->
erlang:monotonic_time(second).
+%% Only potentially track positive increments to counters
+-spec maybe_track_local_counter(any(), any()) -> ok.
+maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 ->
+ %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]),
Review Comment:
Debug left-over
##########
src/couch_stats/src/couch_stats_resource_tracker.hrl:
##########
@@ -0,0 +1,174 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(CSRT, "csrt").
+-define(CSRT_INIT_P, "csrt.init_p").
+-define(CSRT_ETS, csrt_server).
+
+%% CSRT pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+-define(TRACKER_PID, csrt_tracker). %% tracker pid
+
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+-define(DB_OPEN, db_open).
+-define(COUCH_SERVER_OPEN, db_open).
+-define(COUCH_BT_GET_KP_NODE, get_kp_node).
+-define(COUCH_BT_GET_KV_NODE, get_kv_node).
+-define(COUCH_BT_WRITE_KP_NODE, write_kp_node).
+-define(COUCH_BT_WRITE_KV_NODE, write_kv_node).
+-define(COUCH_JS_FILTER, js_filter).
Review Comment:
Noticed we don't track selector based filters. That's because they are not
resource intensive?
##########
src/mem3/src/mem3_rpc.erl:
##########
@@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
- receive
- {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error);
- {rexi_DOWN, Mon, _, Reason} ->
- erlang:error({rexi_DOWN, {Node, Reason}})
- after Timeout ->
- erlang:error(timeout)
- end
+ wait_message(Node, Ref, Mon, Timeout)
after
rexi_monitor:stop(Mon)
end.
+wait_message(Node, Ref, Mon, Timeout) ->
+ receive
+ Msg ->
+ process_raw_message(Msg, Node, Ref, Mon, Timeout)
+ after Timeout ->
+ erlang:error(timeout)
+ end.
+
+process_raw_message(Msg0, Node, Ref, Mon, Timeout) ->
+ {Msg, Delta} = csrt:extract_delta(Msg0),
+ csrt:accumulate_delta(Delta),
+ case Msg of
+ {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, {Node, Reason}});
+ Other ->
+ ?LOG_UNEXPECTED_MSG(Other),
+ wait_message(Node, Ref, Mon, Timeout)
Review Comment:
Good idea to log the message if we drop it.
Hmm, maybe we should see if we can use aliases for this to avoid un-expected
messages. If we're switching to dropping messages we should make it a separate
(preliminary) PR to discuss it and then rebase the work on that.
##########
src/couch_stats/src/csrt_server.erl:
##########
@@ -0,0 +1,213 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_server).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2
+]).
+
+-export([
+ create_pid_ref/0,
+ create_resource/1,
+ destroy_resource/1,
+ get_resource/1,
+ get_context_type/1,
+ inc/2,
+ inc/3,
+ match_resource/1,
+ new_context/2,
+ set_context_dbname/2,
+ set_context_username/2,
+ set_context_type/2,
+ update_counter/3
+]).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-record(st, {}).
+
+%%
+%% Public API
+%%
+
+-spec create_pid_ref() -> pid_ref().
+create_pid_ref() ->
+ {self(), make_ref()}.
+
+%%
+%%
+%% Context lifecycle API
+%%
+
+-spec new_context(Type :: rctx_type(), Nonce :: nonce()) -> rctx().
+new_context(Type, Nonce) ->
+ #rctx{
+ nonce = Nonce,
+ pid_ref = create_pid_ref(),
+ type = Type
+ }.
+
+-spec set_context_dbname(DbName, PidRef) -> boolean() when
+ DbName :: dbname(), PidRef :: maybe_pid_ref().
+set_context_dbname(_, undefined) ->
+ false;
+set_context_dbname(DbName, PidRef) ->
+ update_element(PidRef, [{#rctx.dbname, DbName}]).
+
+%%set_context_handler_fun(_, undefined) ->
+%% ok;
+%%set_context_handler_fun(Fun, PidRef) when is_function(Fun) ->
+%% FProps = erlang:fun_info(Fun),
+%% Mod = proplists:get_value(module, FProps),
+%% Func = proplists:get_value(name, FProps),
+%% #rctx{type=#coordinator{}=Coordinator} = get_resource(PidRef),
+%% Update = [{#rctx.type, Coordinator#coordinator{mod=Mod, func=Func}}],
+%% update_element(PidRef, Update).
+
+-spec set_context_username(UserName, PidRef) -> boolean() when
+ UserName :: username(), PidRef :: maybe_pid_ref().
+set_context_username(_, undefined) ->
+ ok;
+set_context_username(UserName, PidRef) ->
+ update_element(PidRef, [{#rctx.username, UserName}]).
+
+-spec get_context_type(Rctx :: rctx()) -> rctx_type().
+get_context_type(#rctx{type = Type}) ->
+ Type.
+
+-spec set_context_type(Type, PidRef) -> boolean() when
+ Type :: rctx_type(), PidRef :: maybe_pid_ref().
+set_context_type(Type, PidRef) ->
+ update_element(PidRef, [{#rctx.type, Type}]).
+
+-spec create_resource(Rctx :: rctx()) -> boolean().
+create_resource(#rctx{} = Rctx) ->
+ (catch ets:insert(?CSRT_ETS, Rctx)) == true.
+
+-spec destroy_resource(PidRef :: maybe_pid_ref()) -> boolean().
+destroy_resource(undefined) ->
+ false;
+destroy_resource({_, _} = PidRef) ->
+ (catch ets:delete(?CSRT_ETS, PidRef)) == true.
+
+-spec get_resource(PidRef :: maybe_pid_ref()) -> maybe_rctx().
+get_resource(undefined) ->
+ undefined;
+get_resource(PidRef) ->
+ try ets:lookup(?CSRT_ETS, PidRef) of
+ [#rctx{} = Rctx] ->
+ Rctx;
+ [] ->
+ undefined
+ catch
+ _:_ ->
+ undefined
+ end.
+
+-spec match_resource(Rctx :: maybe_rctx()) -> [] | [rctx()].
+match_resource(undefined) ->
+ [];
+match_resource(#rctx{} = Rctx) ->
+ ets:match_object(?CSRT_ETS, Rctx).
+
+-spec is_rctx_field(Field :: rctx_field() | atom()) -> boolean().
+is_rctx_field(Field) ->
+ maps:is_key(Field, ?KEYS_TO_FIELDS).
+
+-spec get_rctx_field(Field :: rctx_field()) ->
+ non_neg_integer()
+ | throw({badkey, Key :: any()}).
+get_rctx_field(Field) ->
+ maps:get(Field, ?KEYS_TO_FIELDS).
+
+-spec update_counter(PidRef, Field, Count) -> non_neg_integer() when
+ PidRef :: maybe_pid_ref(),
+ Field :: rctx_field(),
+ Count :: non_neg_integer().
+update_counter(undefined, _Field, _Count) ->
+ 0;
+update_counter({_Pid, _Ref} = PidRef, Field, Count) when Count >= 0 ->
+ %% TODO: mem3 crashes without catch, why do we lose the stats table?
+ case is_rctx_field(Field) of
+ true ->
+ Update = {get_rctx_field(Field), Count},
+ try
+ ets:update_counter(?CSRT_ETS, PidRef, Update, #rctx{pid_ref =
PidRef})
+ catch
+ _:_ ->
+ 0
+ end;
+ false ->
+ 0
+ end.
+
+-spec inc(PidRef :: maybe_pid_ref(), Field :: rctx_field()) ->
non_neg_integer().
+inc(PidRef, Field) ->
+ inc(PidRef, Field, 1).
+
+-spec inc(PidRef, Field, N) -> non_neg_integer() when
+ PidRef :: maybe_pid_ref(),
+ Field :: rctx_field(),
+ N :: non_neg_integer().
+inc(undefined, _Field, _) ->
+ 0;
+inc(_PidRef, _Field, 0) ->
+ 0;
+inc({_Pid, _Ref} = PidRef, Field, N) when is_integer(N) andalso N > 0 ->
+ case is_rctx_field(Field) of
+ true ->
+ update_counter(PidRef, Field, N);
+ false ->
+ 0
+ end.
+
+%%
+%% gen_server callbacks
+%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(?CSRT_ETS, [
+ named_table,
+ public,
+ {decentralized_counters, true},
+ {write_concurrency, true},
+ {read_concurrency, true},
Review Comment:
Use `{write_concurrency, auto}` instead of `decentralized_counters`
##########
src/couch_stats/src/csrt_util.erl:
##########
@@ -0,0 +1,485 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_util).
+
+-export([
+ is_enabled/0,
+ is_enabled_init_p/0,
+ get_pid_ref/0,
+ get_pid_ref/1,
+ set_pid_ref/1,
+ should_track_init_p/2,
+ tnow/0,
+ tutc/0,
+ tutc/1
+]).
+
+%% JSON Conversion API
+-export([
+ convert_type/1,
+ convert_pidref/1,
+ convert_pid/1,
+ convert_ref/1,
+ to_json/1
+]).
+
+%% Delta API
+-export([
+ add_delta/2,
+ extract_delta/1,
+ get_delta/1,
+ get_delta_a/0,
+ get_delta_zero/0,
+ maybe_add_delta/1,
+ maybe_add_delta/2,
+ make_delta/1,
+ make_dt/2,
+ make_dt/3,
+ rctx_delta/2,
+ set_delta_a/1,
+ set_delta_zero/1
+]).
+
+%% Extra niceties and testing facilities
+-export([
+ set_fabric_init_p/2,
+ set_fabric_init_p/3,
+ map_to_rctx/1,
+ field/2
+]).
+
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-spec is_enabled() -> boolean().
+is_enabled() ->
+ config:get_boolean(?CSRT, "enabled", true).
+
+-spec is_enabled_init_p() -> boolean().
+is_enabled_init_p() ->
+ config:get_boolean(?CSRT_INIT_P, "enabled", true).
+
+-spec should_track_init_p(Mod :: atom(), Func :: atom()) -> boolean().
+should_track_init_p(fabric_rpc, Func) ->
+ is_enabled_init_p() andalso config:get_boolean(?CSRT_INIT_P,
fabric_conf_key(Func), false);
+should_track_init_p(_Mod, _Func) ->
+ false.
+
+%% Monotnonic time now in native format using time forward only event tracking
+-spec tnow() -> integer().
+tnow() ->
+ erlang:monotonic_time().
+
+%% Get current system time in UTC RFC 3339 format
+-spec tutc() -> calendar:rfc3339_string().
+tutc() ->
+ tutc(tnow()).
+
+%% Convert a integer system time in milliseconds into UTC RFC 3339 format
+-spec tutc(Time :: integer()) -> calendar:rfc3339_string().
+tutc(Time0) when is_integer(Time0) ->
+ Unit = millisecond,
+ Time1 = Time0 + erlang:time_offset(),
+ Time = erlang:convert_time_unit(Time1, native, Unit),
+ calendar:system_time_to_rfc3339(Time, [{unit, Unit}, {offset, "z"}]).
+
+%% Returns dt (delta time) in microseconds
+%% @equiv make_dt(A, B, microsecond)
+-spec make_dt(A, B) -> pos_integer() when
+ A :: integer(),
+ B :: integer().
+make_dt(A, B) ->
+ make_dt(A, B, microsecond).
+
+%% Returns monotonic dt (delta time) in specified time_unit()
+-spec make_dt(A, B, Unit) -> pos_integer() when
+ A :: integer(),
+ B :: integer(),
+ Unit :: erlang:time_unit().
+make_dt(A, A, _Unit) when is_integer(A) ->
+ %% Handle edge case when monotonic_time()'s are equal
+ %% Always return a non zero value so we don't divide by zero
+ %% This always returns 1, independent of unit, as that's the smallest
+ %% possible positive integer value delta.
+ 1;
+make_dt(A, B, Unit) when is_integer(A) andalso is_integer(B) andalso B > A ->
+ A1 = erlang:convert_time_unit(A, native, Unit),
+ B1 = erlang:convert_time_unit(B, native, Unit),
+ B1 - A1.
+
+%%
+%% Conversion API for outputting JSON
+%%
+
+-spec convert_type(T) -> binary() | null when
+ T :: #coordinator{} | #rpc_worker{} | undefined.
+convert_type(#coordinator{method = Verb0, path = Path, mod = M0, func = F0}) ->
+ M = atom_to_binary(M0),
+ F = atom_to_binary(F0),
+ Verb = atom_to_binary(Verb0),
+ <<"coordinator-{", M/binary, ":", F/binary, "}:", Verb/binary, ":",
Path/binary>>;
+convert_type(#rpc_worker{mod = M0, func = F0, from = From0}) ->
+ M = atom_to_binary(M0),
+ F = atom_to_binary(F0),
+ From = convert_pidref(From0),
+ <<"rpc_worker-{", From/binary, "}:", M/binary, ":", F/binary>>;
+convert_type(undefined) ->
+ null.
+
+-spec convert_pidref(PidRef) -> binary() | null when
+ PidRef :: {A :: pid(), B :: reference()} | undefined.
+convert_pidref({Parent0, ParentRef0}) ->
+ Parent = convert_pid(Parent0),
+ ParentRef = convert_ref(ParentRef0),
+ <<Parent/binary, ":", ParentRef/binary>>;
+%%convert_pidref(null) ->
+%% null;
Review Comment:
Debug left-over
##########
src/couch_stats/src/csrt_util.erl:
##########
@@ -0,0 +1,485 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_util).
+
+-export([
+ is_enabled/0,
+ is_enabled_init_p/0,
+ get_pid_ref/0,
+ get_pid_ref/1,
+ set_pid_ref/1,
+ should_track_init_p/2,
+ tnow/0,
+ tutc/0,
+ tutc/1
+]).
+
+%% JSON Conversion API
+-export([
+ convert_type/1,
+ convert_pidref/1,
+ convert_pid/1,
+ convert_ref/1,
+ to_json/1
+]).
+
+%% Delta API
+-export([
+ add_delta/2,
+ extract_delta/1,
+ get_delta/1,
+ get_delta_a/0,
+ get_delta_zero/0,
+ maybe_add_delta/1,
+ maybe_add_delta/2,
+ make_delta/1,
+ make_dt/2,
+ make_dt/3,
+ rctx_delta/2,
+ set_delta_a/1,
+ set_delta_zero/1
+]).
+
+%% Extra niceties and testing facilities
+-export([
+ set_fabric_init_p/2,
+ set_fabric_init_p/3,
+ map_to_rctx/1,
+ field/2
+]).
+
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-spec is_enabled() -> boolean().
+is_enabled() ->
+ config:get_boolean(?CSRT, "enabled", true).
+
+-spec is_enabled_init_p() -> boolean().
+is_enabled_init_p() ->
+ config:get_boolean(?CSRT_INIT_P, "enabled", true).
+
+-spec should_track_init_p(Mod :: atom(), Func :: atom()) -> boolean().
+should_track_init_p(fabric_rpc, Func) ->
+ is_enabled_init_p() andalso config:get_boolean(?CSRT_INIT_P,
fabric_conf_key(Func), false);
+should_track_init_p(_Mod, _Func) ->
+ false.
+
+%% Monotnonic time now in native format using time forward only event tracking
+-spec tnow() -> integer().
+tnow() ->
+ erlang:monotonic_time().
+
+%% Get current system time in UTC RFC 3339 format
+-spec tutc() -> calendar:rfc3339_string().
+tutc() ->
+ tutc(tnow()).
+
+%% Convert a integer system time in milliseconds into UTC RFC 3339 format
+-spec tutc(Time :: integer()) -> calendar:rfc3339_string().
+tutc(Time0) when is_integer(Time0) ->
+ Unit = millisecond,
+ Time1 = Time0 + erlang:time_offset(),
+ Time = erlang:convert_time_unit(Time1, native, Unit),
+ calendar:system_time_to_rfc3339(Time, [{unit, Unit}, {offset, "z"}]).
+
+%% Returns dt (delta time) in microseconds
+%% @equiv make_dt(A, B, microsecond)
+-spec make_dt(A, B) -> pos_integer() when
+ A :: integer(),
+ B :: integer().
+make_dt(A, B) ->
+ make_dt(A, B, microsecond).
+
+%% Returns monotonic dt (delta time) in specified time_unit()
+-spec make_dt(A, B, Unit) -> pos_integer() when
+ A :: integer(),
+ B :: integer(),
+ Unit :: erlang:time_unit().
+make_dt(A, A, _Unit) when is_integer(A) ->
+ %% Handle edge case when monotonic_time()'s are equal
+ %% Always return a non zero value so we don't divide by zero
+ %% This always returns 1, independent of unit, as that's the smallest
+ %% possible positive integer value delta.
+ 1;
+make_dt(A, B, Unit) when is_integer(A) andalso is_integer(B) andalso B > A ->
+ A1 = erlang:convert_time_unit(A, native, Unit),
+ B1 = erlang:convert_time_unit(B, native, Unit),
Review Comment:
Subtract native first then convert once?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]