This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7bd473067dc214fc02ada7c9491f436b24b48cfa Author: Russell Branca <chewbra...@apache.org> AuthorDate: Thu Nov 9 11:22:13 2023 -0800 WIP: core delta aggregation working --- src/chttpd/src/chttpd.erl | 14 ++- .../src/couch_stats_resource_tracker.erl | 100 ++++++++++++++++++--- src/fabric/priv/stats_descriptions.cfg | 8 ++ src/fabric/src/fabric_rpc.erl | 1 + src/rexi/src/rexi.erl | 6 +- src/rexi/src/rexi_utils.erl | 6 ++ 6 files changed, 121 insertions(+), 14 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index fdd7855f0..688541ae8 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -313,6 +313,8 @@ handle_request_int(MochiReq) -> ] }, + io:format("CSRTZ PROCESSING: ~w~n", [RequestedPath]), + % put small token on heap to keep requests synced to backend calls erlang:put(nonce, Nonce), @@ -324,14 +326,22 @@ handle_request_int(MochiReq) -> chttpd_util:mochiweb_client_req_set(MochiReq), %% This is probably better in before_request, but having Path is nice + io:format("CSRTZ INITIATING CONTEXT: ~w~n", [Nonce]), couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path), + io:format("CSRTZ INITIAL CONTEXT: ~w~n", [couch_stats_resource_tracker:get_resource()]), + Coord = self(), + PidRef = couch_stats_resource_tracker:get_pid_ref(), + spawn(fun() -> monitor(process, Coord), receive M -> io:format("CSRTZ PROCESS DOWN[~w]{~w}: ~w~n", [ Coord, M, couch_stats_resource_tracker:get_resource(PidRef)]) end end), {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> + io:format("CSRTZ BEFORE_REQUEST OK: ~w~n", [couch_stats_resource_tracker:get_resource()]), process_request(HttpReq1); {error, Response0} -> + io:format("CSRTZ BEFORE_REQUEST ERROR: ~w~n", [couch_stats_resource_tracker:get_resource()]), {HttpReq0, Response0} end, + io:format("CSRTZ AFTER PROCESS_REQUEST: ~w~n", [couch_stats_resource_tracker:get_resource()]), chttpd_util:mochiweb_client_req_clean(), @@ -347,9 +357,11 @@ handle_request_int(MochiReq) -> case after_request(HttpReq2, HttpResp) of #httpd_resp{status = ok, response = Resp} -> + io:format("CSRTZ AFTER REQUEST OK: ~w~n", [couch_stats_resource_tracker:get_resource()]), {ok, Resp}; #httpd_resp{status = aborted, reason = Reason} -> - couch_log:error("Response abnormally terminated: ~p", [Reason]), + io:format("CSRTZ AFTER REQUEST ERROR: ~w~n", [couch_stats_resource_tracker:get_resource()]), + couch_log:error("Response abnormally terminated: ~w", [Reason]), exit({shutdown, Reason}) end. diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index b74eb5a10..9f4f9254e 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -34,6 +34,8 @@ -export([ create_context/0, create_context/1, create_context/3, create_coordinator_context/1, create_coordinator_context/2, + get_resource/0, + get_resource/1, set_context_dbname/1, set_context_username/1, track/1, @@ -73,6 +75,10 @@ io_bytes_written/1 ]). +-export([ + field/2 +]). + -include_lib("couch/include/couch_db.hrl"). %% Use these for record upgrades over the wire and in ETS tables @@ -98,6 +104,7 @@ %% TODO: overlap between this and couch btree fold invocations %% TODO: need some way to distinguish fols on views vs find vs all_docs -define(FRPC_CHANGES_ROW, changes_processed). +-define(FRPC_CHANGES_RETURNED, changes_returned). %%-define(FRPC_CHANGES_ROW, ?ROWS_READ). %% Module pdict markers @@ -133,6 +140,7 @@ rows_read = 0, btree_folds = 0, changes_processed = 0, + changes_returned = 0, ioq_calls = 0, io_bytes_read = 0, io_bytes_written = 0, @@ -170,6 +178,8 @@ inc(docs_read) -> inc(docs_read, 1); inc(?ROWS_READ) -> inc(?ROWS_READ, 1); +inc(?FRPC_CHANGES_RETURNED) -> + inc(?FRPC_CHANGES_RETURNED, 1); inc(?COUCH_BT_FOLDS) -> inc(?COUCH_BT_FOLDS, 1); inc(ioq_calls) -> @@ -200,6 +210,8 @@ inc(?DB_OPEN, N) -> update_counter(#rctx.?DB_OPEN, N); inc(?ROWS_READ, N) -> update_counter(#rctx.?ROWS_READ, N); +inc(?FRPC_CHANGES_RETURNED, N) -> + update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); inc(ioq_calls, N) -> update_counter(#rctx.ioq_calls, N); inc(io_bytes_read, N) -> @@ -233,6 +245,8 @@ maybe_inc([couchdb, database_reads], Val) -> inc(?DB_OPEN_DOC, Val); maybe_inc([fabric_rpc, changes, processed], Val) -> inc(?FRPC_CHANGES_ROW, Val); +maybe_inc([fabric_rpc, changes, returned], Val) -> + inc(?FRPC_CHANGES_RETURNED, Val); maybe_inc([fabric_rpc, view, rows_read], Val) -> inc(?ROWS_READ, Val); maybe_inc([couchdb, couch_server, open], Val) -> @@ -261,6 +275,8 @@ should_track([fabric_rpc, changes, spawned]) -> true; should_track([fabric_rpc, changes, processed]) -> true; +should_track([fabric_rpc, changes, returned]) -> + true; should_track([fabric_rpc, map_view, spawned]) -> true; should_track([fabric_rpc, reduce_view, spawned]) -> @@ -283,7 +299,12 @@ should_track(_Metric) -> %% TODO: update coordinator stats from worker deltas accumulate_delta(Delta) -> - io:format("Accumulating delta: ~p~n", [Delta]), + case get_resource() of + #rctx{type={coordinator,_,_}} = Rctx -> + io:format("Accumulating delta: ~w || ~w~n", [Delta, Rctx]); + _ -> + ok + end, %% TODO: switch to creating a batch of updates to invoke a single %% update_counter rather than sequentially invoking it for each field maps:foreach(fun inc/2, Delta). @@ -292,8 +313,8 @@ update_counter(Field, Count) -> update_counter(get_pid_ref(), Field, Count). -update_counter({_Pid,_Ref}=Key, Field, Count) -> - ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}). +update_counter({_Pid,_Ref}=PidRef, Field, Count) -> + ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). active() -> active_int(all). @@ -319,6 +340,39 @@ select_by_type(all) -> lists:map(fun to_json/1, ets:tab2list(?MODULE)). +field(#rctx{pid_ref=Val}, pid_ref) -> Val; +field(#rctx{mfa=Val}, mfa) -> Val; +field(#rctx{nonce=Val}, nonce) -> Val; +field(#rctx{from=Val}, from) -> Val; +field(#rctx{type=Val}, type) -> Val; +field(#rctx{state=Val}, state) -> Val; +field(#rctx{dbname=Val}, dbname) -> Val; +field(#rctx{username=Val}, username) -> Val; +field(#rctx{db_open=Val}, db_open) -> Val; +field(#rctx{docs_read=Val}, docs_read) -> Val; +field(#rctx{rows_read=Val}, rows_read) -> Val; +field(#rctx{btree_folds=Val}, btree_folds) -> Val; +field(#rctx{changes_processed=Val}, changes_processed) -> Val; +field(#rctx{changes_returned=Val}, changes_returned) -> Val; +field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; +field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; +field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; +field(#rctx{js_evals=Val}, js_evals) -> Val; +field(#rctx{js_filter=Val}, js_filter) -> Val; +field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; +field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; +field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; +field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; +field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. + + +%%group_by(GBFun) when is_fun(GBFun) -> +%% group_by(GBFun, sum). + + +%%group_by(GbFun, Agg) when is_function(GBFun) -> + + to_json(#rctx{}=Rctx) -> #rctx{ updated_at = TP, @@ -331,13 +385,17 @@ to_json(#rctx{}=Rctx) -> db_open = DbOpens, docs_read = DocsRead, rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, state = State0, type = Type, btree_folds = BtFolds, get_kp_node = KpNodes, get_kv_node = KvNodes, ioq_calls = IoqCalls, - changes_processed = ChangesProcessed + changes_processed = ChangesProcessed, + changes_returned = ChangesReturned } = Rctx, %%io:format("TO_JSON_MFA: ~p~n", [MFA0]), MFA = case MFA0 of @@ -380,6 +438,9 @@ to_json(#rctx{}=Rctx) -> username => UserName, db_open => DbOpens, docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, rows_read => RowsRead, state => State, type => term_to_json({type, Type}), @@ -387,7 +448,8 @@ to_json(#rctx{}=Rctx) -> kp_nodes => KpNodes, kv_nodes => KvNodes, ioq_calls => IoqCalls, - changes_processed => ChangesProcessed + changes_processed => ChangesProcessed, + changes_returned => ChangesReturned }. term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) -> @@ -428,14 +490,18 @@ to_flat_json(#rctx{}=Rctx) -> db_open = DbOpens, docs_read = DocsRead, rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, state = State0, type = Type, get_kp_node = KpNodes, get_kv_node = KvNodes, btree_folds = ChangesProcessed, + changes_returned = ChangesReturned, ioq_calls = IoqCalls } = Rctx, - io:format("TO_JSON_MFA: ~p~n", [MFA0]), + %%io:format("TO_JSON_MFA: ~p~n", [MFA0]), MFA = case MFA0 of {_M, _F, _A} -> ?l2b(io_lib:format("~w", [MFA0])); @@ -475,12 +541,16 @@ to_flat_json(#rctx{}=Rctx) -> username => UserName, db_open => DbOpens, docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, rows_read => RowsRead, state => State, type => term_to_flat_json({type, Type}), kp_nodes => KpNodes, kv_nodes => KvNodes, btree_folds => ChangesProcessed, + changes_returned => ChangesReturned, ioq_calls => IoqCalls }. @@ -507,7 +577,7 @@ create_context(Pid) -> %% add type to disnguish coordinator vs rpc_worker create_context(From, {M,F,_A} = MFA, Nonce) -> - io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]), + %%io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]), PidRef = get_pid_ref(), %% this will instantiate a new PidRef %%Rctx = make_record(self(), Ref), %% TODO: extract user_ctx and db/shard from @@ -527,7 +597,7 @@ create_coordinator_context(#httpd{path_parts=Parts} = Req) -> create_coordinator_context(Req, io_lib:format("~p", [Parts])). create_coordinator_context(#httpd{} = Req, Path) -> - io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]), + %%io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]), #httpd{ method = Verb, %%path_parts = Parts, @@ -561,11 +631,11 @@ set_context_dbname(DbName) -> set_context_username(null) -> ok; set_context_username(UserName) -> - io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]), + %%io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]), case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of false -> Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, - io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), timer:sleep(1000), erlang:halt(kaboomz); true -> @@ -612,6 +682,7 @@ make_delta() -> %% Perhaps somewhat naughty we're incrementing stats from within %% couch_stats itself? Might need to handle this differently %% TODO: determine appropriate course of action here + io:format("~n**********MISSING STARTING DELTA************~n~n", []), couch_stats:increment_counter( [couchdb, csrt, delta_missing_t0]), %%[couch_stats_resource_tracker, delta_missing_t0]), @@ -630,13 +701,19 @@ make_delta() -> TA0 end, TB = get_resource(), - make_delta(TA, TB). + Delta = make_delta(TA, TB), + set_delta_a(TB), + Delta. make_delta(#rctx{}=TA, #rctx{}=TB) -> Delta = #{ docs_read => TB#rctx.docs_read - TA#rctx.docs_read, + js_filter => TB#rctx.js_filter - TA#rctx.js_filter, + js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, + js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, rows_read => TB#rctx.rows_read - TA#rctx.rows_read, + changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds, get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, @@ -761,4 +838,5 @@ log_process_lifetime_report(PidRef) -> %% More safely assert this can't ever be undefined #rctx{} = Rctx = get_resource(PidRef), %% TODO: catch error out of here, report crashes on depth>1 json + io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)). diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg index 7a062cca0..528942be8 100644 --- a/src/fabric/priv/stats_descriptions.cfg +++ b/src/fabric/priv/stats_descriptions.cfg @@ -52,6 +52,14 @@ {type, counter}, {desc, <<"number of fabric_rpc worker changes spawns">>} ]}. +{[fabric_rpc, changes, processed], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes row invocations">>} +]}. +{[fabric_rpc, changes, returned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes rows returned">>} +]}. {[fabric_rpc, view, rows_read], [ {type, counter}, {desc, <<"number of fabric_rpc view_cb row invocations">>} diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index da0710c96..b5f41782a 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -570,6 +570,7 @@ changes_enumerator(DocInfo, Acc) -> {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} ]}; Results -> + couch_stats:increment_counter([fabric_rpc, changes, returned]), Opts = if Conflicts -> [conflicts | DocOptions]; diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 0db6ac5de..35e884a9f 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -243,7 +243,9 @@ stream2(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, self(), Msg}), + %% TODO: why do the numbers go whacky when we add the delta here + erlang:send(Caller, {Ref, self(), Msg, get_delta()}), + %%erlang:send(Caller, {Ref, self(), Msg}), ok catch throw:timeout -> @@ -281,7 +283,7 @@ ping() -> %% filtered queries will be silent on usage until they finally return %% a row or no results. This delay is proportional to the database size, %% so instead we make sure ping/0 keeps live stats flowing. - erlang:send(Caller, {rexi, '$rexi_ping'}). + erlang:send(Caller, {rexi, '$rexi_ping'}, get_delta()). %% internal functions %% diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 3125f75db..642087fab 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -107,6 +107,9 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> {Ref, Msg} -> %% TODO: add stack trace to log entry couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]), + timer:sleep(100), + %%erlang:halt(enodelta), + erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg])))), %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> @@ -119,6 +122,9 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]), %% TODO: add stack trace to log entry couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]), + timer:sleep(100), + %%erlang:halt(enodelta), + erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg])))), case lists:keyfind(Ref, Keypos, RefList) of false -> {ok, Acc0};