This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7bd473067dc214fc02ada7c9491f436b24b48cfa
Author: Russell Branca <chewbra...@apache.org>
AuthorDate: Thu Nov 9 11:22:13 2023 -0800

    WIP: core delta aggregation working
---
 src/chttpd/src/chttpd.erl                          |  14 ++-
 .../src/couch_stats_resource_tracker.erl           | 100 ++++++++++++++++++---
 src/fabric/priv/stats_descriptions.cfg             |   8 ++
 src/fabric/src/fabric_rpc.erl                      |   1 +
 src/rexi/src/rexi.erl                              |   6 +-
 src/rexi/src/rexi_utils.erl                        |   6 ++
 6 files changed, 121 insertions(+), 14 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index fdd7855f0..688541ae8 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -313,6 +313,8 @@ handle_request_int(MochiReq) ->
         ]
     },
 
+    io:format("CSRTZ PROCESSING: ~w~n", [RequestedPath]),
+
     % put small token on heap to keep requests synced to backend calls
     erlang:put(nonce, Nonce),
 
@@ -324,14 +326,22 @@ handle_request_int(MochiReq) ->
     chttpd_util:mochiweb_client_req_set(MochiReq),
 
     %% This is probably better in before_request, but having Path is nice
+    io:format("CSRTZ INITIATING CONTEXT: ~w~n", [Nonce]),
     couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path),
+    io:format("CSRTZ INITIAL CONTEXT: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
+    Coord = self(),
+    PidRef = couch_stats_resource_tracker:get_pid_ref(),
+    spawn(fun() -> monitor(process, Coord), receive M -> io:format("CSRTZ 
PROCESS DOWN[~w]{~w}: ~w~n", [ Coord, M, 
couch_stats_resource_tracker:get_resource(PidRef)]) end end),
     {HttpReq2, Response} =
         case before_request(HttpReq0) of
             {ok, HttpReq1} ->
+                io:format("CSRTZ BEFORE_REQUEST OK: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
                 process_request(HttpReq1);
             {error, Response0} ->
+                io:format("CSRTZ BEFORE_REQUEST ERROR: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
                 {HttpReq0, Response0}
         end,
+    io:format("CSRTZ AFTER PROCESS_REQUEST: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
 
     chttpd_util:mochiweb_client_req_clean(),
 
@@ -347,9 +357,11 @@ handle_request_int(MochiReq) ->
 
     case after_request(HttpReq2, HttpResp) of
         #httpd_resp{status = ok, response = Resp} ->
+            io:format("CSRTZ AFTER REQUEST OK: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
             {ok, Resp};
         #httpd_resp{status = aborted, reason = Reason} ->
-            couch_log:error("Response abnormally terminated: ~p", [Reason]),
+            io:format("CSRTZ AFTER REQUEST ERROR: ~w~n", 
[couch_stats_resource_tracker:get_resource()]),
+            couch_log:error("Response abnormally terminated: ~w", [Reason]),
             exit({shutdown, Reason})
     end.
 
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl 
b/src/couch_stats/src/couch_stats_resource_tracker.erl
index b74eb5a10..9f4f9254e 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.erl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.erl
@@ -34,6 +34,8 @@
 -export([
     create_context/0, create_context/1, create_context/3,
     create_coordinator_context/1, create_coordinator_context/2,
+    get_resource/0,
+    get_resource/1,
     set_context_dbname/1,
     set_context_username/1,
     track/1,
@@ -73,6 +75,10 @@
     io_bytes_written/1
 ]).
 
+-export([
+    field/2
+]).
+
 -include_lib("couch/include/couch_db.hrl").
 
 %% Use these for record upgrades over the wire and in ETS tables
@@ -98,6 +104,7 @@
 %% TODO: overlap between this and couch btree fold invocations
 %% TODO: need some way to distinguish fols on views vs find vs all_docs
 -define(FRPC_CHANGES_ROW, changes_processed).
+-define(FRPC_CHANGES_RETURNED, changes_returned).
 %%-define(FRPC_CHANGES_ROW, ?ROWS_READ).
 
 %% Module pdict markers
@@ -133,6 +140,7 @@
     rows_read = 0,
     btree_folds = 0,
     changes_processed = 0,
+    changes_returned = 0,
     ioq_calls = 0,
     io_bytes_read = 0,
     io_bytes_written = 0,
@@ -170,6 +178,8 @@ inc(docs_read) ->
     inc(docs_read, 1);
 inc(?ROWS_READ) ->
     inc(?ROWS_READ, 1);
+inc(?FRPC_CHANGES_RETURNED) ->
+    inc(?FRPC_CHANGES_RETURNED, 1);
 inc(?COUCH_BT_FOLDS) ->
     inc(?COUCH_BT_FOLDS, 1);
 inc(ioq_calls) ->
@@ -200,6 +210,8 @@ inc(?DB_OPEN, N) ->
     update_counter(#rctx.?DB_OPEN, N);
 inc(?ROWS_READ, N) ->
     update_counter(#rctx.?ROWS_READ, N);
+inc(?FRPC_CHANGES_RETURNED, N) ->
+    update_counter(#rctx.?FRPC_CHANGES_RETURNED, N);
 inc(ioq_calls, N) ->
     update_counter(#rctx.ioq_calls, N);
 inc(io_bytes_read, N) ->
@@ -233,6 +245,8 @@ maybe_inc([couchdb, database_reads], Val) ->
     inc(?DB_OPEN_DOC, Val);
 maybe_inc([fabric_rpc, changes, processed], Val) ->
     inc(?FRPC_CHANGES_ROW, Val);
+maybe_inc([fabric_rpc, changes, returned], Val) ->
+    inc(?FRPC_CHANGES_RETURNED, Val);
 maybe_inc([fabric_rpc, view, rows_read], Val) ->
     inc(?ROWS_READ, Val);
 maybe_inc([couchdb, couch_server, open], Val) ->
@@ -261,6 +275,8 @@ should_track([fabric_rpc, changes, spawned]) ->
     true;
 should_track([fabric_rpc, changes, processed]) ->
     true;
+should_track([fabric_rpc, changes, returned]) ->
+    true;
 should_track([fabric_rpc, map_view, spawned]) ->
     true;
 should_track([fabric_rpc, reduce_view, spawned]) ->
@@ -283,7 +299,12 @@ should_track(_Metric) ->
 
 %% TODO: update coordinator stats from worker deltas
 accumulate_delta(Delta) ->
-    io:format("Accumulating delta: ~p~n", [Delta]),
+    case get_resource() of
+        #rctx{type={coordinator,_,_}} = Rctx ->
+            io:format("Accumulating delta: ~w || ~w~n", [Delta, Rctx]);
+        _ ->
+            ok
+    end,
     %% TODO: switch to creating a batch of updates to invoke a single
     %% update_counter rather than sequentially invoking it for each field
     maps:foreach(fun inc/2, Delta).
@@ -292,8 +313,8 @@ update_counter(Field, Count) ->
     update_counter(get_pid_ref(), Field, Count).
 
 
-update_counter({_Pid,_Ref}=Key, Field, Count) ->
-    ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}).
+update_counter({_Pid,_Ref}=PidRef, Field, Count) ->
+    ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}).
 
 
 active() -> active_int(all).
@@ -319,6 +340,39 @@ select_by_type(all) ->
     lists:map(fun to_json/1, ets:tab2list(?MODULE)).
 
 
+field(#rctx{pid_ref=Val}, pid_ref) -> Val;
+field(#rctx{mfa=Val}, mfa) -> Val;
+field(#rctx{nonce=Val}, nonce) -> Val;
+field(#rctx{from=Val}, from) -> Val;
+field(#rctx{type=Val}, type) -> Val;
+field(#rctx{state=Val}, state) -> Val;
+field(#rctx{dbname=Val}, dbname) -> Val;
+field(#rctx{username=Val}, username) -> Val;
+field(#rctx{db_open=Val}, db_open) -> Val;
+field(#rctx{docs_read=Val}, docs_read) -> Val;
+field(#rctx{rows_read=Val}, rows_read) -> Val;
+field(#rctx{btree_folds=Val}, btree_folds) -> Val;
+field(#rctx{changes_processed=Val}, changes_processed) -> Val;
+field(#rctx{changes_returned=Val}, changes_returned) -> Val;
+field(#rctx{ioq_calls=Val}, ioq_calls) -> Val;
+field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val;
+field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val;
+field(#rctx{js_evals=Val}, js_evals) -> Val;
+field(#rctx{js_filter=Val}, js_filter) -> Val;
+field(#rctx{js_filter_error=Val}, js_filter_error) -> Val;
+field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val;
+field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val;
+field(#rctx{get_kv_node=Val}, get_kv_node) -> Val;
+field(#rctx{get_kp_node=Val}, get_kp_node) -> Val.
+
+
+%%group_by(GBFun) when is_fun(GBFun) ->
+%%    group_by(GBFun, sum).
+
+
+%%group_by(GbFun, Agg) when is_function(GBFun) ->
+
+
 to_json(#rctx{}=Rctx) ->
     #rctx{
         updated_at = TP,
@@ -331,13 +385,17 @@ to_json(#rctx{}=Rctx) ->
         db_open = DbOpens,
         docs_read = DocsRead,
         rows_read = RowsRead,
+        js_filter = JSFilters,
+        js_filter_error = JSFilterErrors,
+        js_filtered_docs = JSFilteredDocss,
         state = State0,
         type = Type,
         btree_folds = BtFolds,
         get_kp_node = KpNodes,
         get_kv_node = KvNodes,
         ioq_calls = IoqCalls,
-        changes_processed = ChangesProcessed
+        changes_processed = ChangesProcessed,
+        changes_returned = ChangesReturned
     } = Rctx,
     %%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
     MFA = case MFA0 of
@@ -380,6 +438,9 @@ to_json(#rctx{}=Rctx) ->
         username => UserName,
         db_open => DbOpens,
         docs_read => DocsRead,
+        js_filter => JSFilters,
+        js_filter_error => JSFilterErrors,
+        js_filtered_docs => JSFilteredDocss,
         rows_read => RowsRead,
         state => State,
         type => term_to_json({type, Type}),
@@ -387,7 +448,8 @@ to_json(#rctx{}=Rctx) ->
         kp_nodes => KpNodes,
         kv_nodes => KvNodes,
         ioq_calls => IoqCalls,
-        changes_processed => ChangesProcessed
+        changes_processed => ChangesProcessed,
+        changes_returned => ChangesReturned
     }.
 
 term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) ->
@@ -428,14 +490,18 @@ to_flat_json(#rctx{}=Rctx) ->
         db_open = DbOpens,
         docs_read = DocsRead,
         rows_read = RowsRead,
+        js_filter = JSFilters,
+        js_filter_error = JSFilterErrors,
+        js_filtered_docs = JSFilteredDocss,
         state = State0,
         type = Type,
         get_kp_node = KpNodes,
         get_kv_node = KvNodes,
         btree_folds = ChangesProcessed,
+        changes_returned = ChangesReturned,
         ioq_calls = IoqCalls
     } = Rctx,
-    io:format("TO_JSON_MFA: ~p~n", [MFA0]),
+    %%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
     MFA = case MFA0 of
         {_M, _F, _A} ->
             ?l2b(io_lib:format("~w", [MFA0]));
@@ -475,12 +541,16 @@ to_flat_json(#rctx{}=Rctx) ->
         username => UserName,
         db_open => DbOpens,
         docs_read => DocsRead,
+        js_filter => JSFilters,
+        js_filter_error => JSFilterErrors,
+        js_filtered_docs => JSFilteredDocss,
         rows_read => RowsRead,
         state => State,
         type => term_to_flat_json({type, Type}),
         kp_nodes => KpNodes,
         kv_nodes => KvNodes,
         btree_folds => ChangesProcessed,
+        changes_returned => ChangesReturned,
         ioq_calls => IoqCalls
     }.
 
@@ -507,7 +577,7 @@ create_context(Pid) ->
 
 %% add type to disnguish coordinator vs rpc_worker
 create_context(From, {M,F,_A} = MFA, Nonce) ->
-    io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, 
Nonce]),
+    %%io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, 
Nonce]),
     PidRef = get_pid_ref(), %% this will instantiate a new PidRef
     %%Rctx = make_record(self(), Ref),
     %% TODO: extract user_ctx and db/shard from 
@@ -527,7 +597,7 @@ create_coordinator_context(#httpd{path_parts=Parts} = Req) 
->
     create_coordinator_context(Req, io_lib:format("~p", [Parts])).
 
 create_coordinator_context(#httpd{} = Req, Path) ->
-    io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]),
+    %%io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]),
     #httpd{
         method = Verb,
         %%path_parts = Parts,
@@ -561,11 +631,11 @@ set_context_dbname(DbName) ->
 set_context_username(null) ->
     ok;
 set_context_username(UserName) ->
-    io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]),
+    %%io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]),
     case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, 
UserName}]) of
         false ->
             Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
-            io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND 
STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), 
current_stacktrace)]),
+            io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND 
STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), 
current_stacktrace)]),
             timer:sleep(1000),
             erlang:halt(kaboomz);
         true ->
@@ -612,6 +682,7 @@ make_delta() ->
             %% Perhaps somewhat naughty we're incrementing stats from within
             %% couch_stats itself? Might need to handle this differently
             %% TODO: determine appropriate course of action here
+            io:format("~n**********MISSING STARTING DELTA************~n~n", 
[]),
             couch_stats:increment_counter(
                 [couchdb, csrt, delta_missing_t0]),
                 %%[couch_stats_resource_tracker, delta_missing_t0]),
@@ -630,13 +701,19 @@ make_delta() ->
             TA0
     end,
     TB = get_resource(),
-    make_delta(TA, TB).
+    Delta = make_delta(TA, TB),
+    set_delta_a(TB),
+    Delta.
 
 
 make_delta(#rctx{}=TA, #rctx{}=TB) ->
     Delta = #{
         docs_read => TB#rctx.docs_read - TA#rctx.docs_read,
+        js_filter => TB#rctx.js_filter - TA#rctx.js_filter,
+        js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error,
+        js_filtered_docs => TB#rctx.js_filtered_docs - 
TA#rctx.js_filtered_docs,
         rows_read => TB#rctx.rows_read - TA#rctx.rows_read,
+        changes_returned => TB#rctx.changes_returned - 
TA#rctx.changes_returned,
         btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds,
         get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node,
         get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node,
@@ -761,4 +838,5 @@ log_process_lifetime_report(PidRef) ->
     %% More safely assert this can't ever be undefined
     #rctx{} = Rctx = get_resource(PidRef),
     %% TODO: catch error out of here, report crashes on depth>1 json
+    io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]),
     couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)).
diff --git a/src/fabric/priv/stats_descriptions.cfg 
b/src/fabric/priv/stats_descriptions.cfg
index 7a062cca0..528942be8 100644
--- a/src/fabric/priv/stats_descriptions.cfg
+++ b/src/fabric/priv/stats_descriptions.cfg
@@ -52,6 +52,14 @@
     {type, counter},
     {desc, <<"number of fabric_rpc worker changes spawns">>}
 ]}.
+{[fabric_rpc, changes, processed], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes row invocations">>}
+]}.
+{[fabric_rpc, changes, returned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes rows returned">>}
+]}.
 {[fabric_rpc, view, rows_read], [
     {type, counter},
     {desc, <<"number of fabric_rpc view_cb row invocations">>}
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index da0710c96..b5f41782a 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -570,6 +570,7 @@ changes_enumerator(DocInfo, Acc) ->
                     {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
                 ]};
         Results ->
+            couch_stats:increment_counter([fabric_rpc, changes, returned]),
             Opts =
                 if
                     Conflicts -> [conflicts | DocOptions];
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 0db6ac5de..35e884a9f 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -243,7 +243,9 @@ stream2(Msg, Limit, Timeout) ->
         {ok, Count} ->
             put(rexi_unacked, Count + 1),
             {Caller, Ref} = get(rexi_from),
-            erlang:send(Caller, {Ref, self(), Msg}),
+            %% TODO: why do the numbers go whacky when we add the delta here
+            erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+            %%erlang:send(Caller, {Ref, self(), Msg}),
             ok
     catch
         throw:timeout ->
@@ -281,7 +283,7 @@ ping() ->
     %% filtered queries will be silent on usage until they finally return
     %% a row or no results. This delay is proportional to the database size,
     %% so instead we make sure ping/0 keeps live stats flowing.
-    erlang:send(Caller, {rexi, '$rexi_ping'}).
+    erlang:send(Caller, {rexi, '$rexi_ping'}, get_delta()).
 
 %% internal functions %%
 
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index 3125f75db..642087fab 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -107,6 +107,9 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, 
PerMsgTO) ->
         {Ref, Msg} ->
             %% TODO: add stack trace to log entry
             couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} 
=> {~p, ~p}~n", [Ref, Msg]),
+            timer:sleep(100),
+            %%erlang:halt(enodelta),
+            
erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} 
rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg])))),
             %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
@@ -119,6 +122,9 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, 
PerMsgTO) ->
             %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
             %% TODO: add stack trace to log entry
             couch_log:debug("rexi_utils:process_message no delta: {Ref, From, 
Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
+            timer:sleep(100),
+            %%erlang:halt(enodelta),
+            
erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} 
rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, 
From, Msg])))),
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
                     {ok, Acc0};

Reply via email to