iilyak-ibm commented on code in PR #5602: URL: https://github.com/apache/couchdb/pull/5602#discussion_r2349783372
########## src/couch_srt/src/couch_srt_query.erl: ########## @@ -0,0 +1,985 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_srt_query). + +-feature(maybe_expr, enable). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("couch_srt.hrl"). + +%% aggregate query api +-export([ + active/0, + active/1, + active_coordinators/0, + active_coordinators/1, + active_workers/0, + active_workers/1, + + all/0, + find_by_nonce/1, + find_by_pid/1, + find_by_pidref/1, + find_workers_by_pidref/1, + + query_matcher/1, + query_matcher/2, + query_matcher_rows/1, + query_matcher_rows/2, + + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 +]). + +-export_type([ + query/0, + query_expression/0, + query_option/0 +]). + +-type aggregation_keys_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type value_key_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type count_key_fun() :: fun((A :: pos_integer(), B :: pos_integer()) -> pos_integer()). + +-record(selector, { + aggregation_keys = undefined :: + rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + rctx_field() + | undefined +}). + +-record(unsafe_selector, { + aggregation_keys = undefined :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + value_key_fun() + | rctx_field() + | undefined +}). + +-record(query_options, { + limit = undefined :: pos_integer() | unlimited | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-type aggregation() :: group_by | sort_by | count_by. + +-record(query, { + matcher = undefined :: matcher_name() | all | undefined, + selector = undefined :: #selector{} | #unsafe_selector{} | undefined, + limit = undefined :: pos_integer() | unlimited | undefined, + aggregation = undefined :: aggregation() | undefined, + is_safe = true :: boolean() +}). + +-record(from, { + matcher = undefined :: matcher_name() | all | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-opaque query() :: #query{}. +-opaque query_expression() :: + #from{} + | #query_options{} + | #selector{} + | #unsafe_selector{} + | query_option() + | {aggregation(), #selector{}} + | {aggregation(), #unsafe_selector{}}. +-opaque query_option() :: + {limit, pos_integer() | unlimited | undefined}. + +%% +%% Aggregate query API +%% + +active() -> + active_int(all). + +active_coordinators() -> + active_int(coordinators). + +active_workers() -> + active_int(workers). + +%% active_json() or active(json)? +active(json) -> + to_json_list(active_int(all)). + +active_coordinators(json) -> + to_json_list(active_int(coordinators)). + +active_workers(json) -> + to_json_list(active_int(workers)). + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + select_by_type(all). + +select_by_type(coordinators) -> + ets:select(?CSRT_ETS, ets:fun2ms(fun(#rctx{type = #coordinator{}} = R) -> R end)); +select_by_type(workers) -> + ets:select(?CSRT_ETS, ets:fun2ms(fun(#rctx{type = #rpc_worker{}} = R) -> R end)); +select_by_type(all) -> + ets:tab2list(?CSRT_ETS). + +find_by_nonce(Nonce) -> + couch_srt_server:match_resource(#rctx{nonce = Nonce}). + +find_by_pid(Pid) -> + couch_srt_server:match_resource(#rctx{pid_ref = {Pid, '_'}}). + +find_by_pidref(PidRef) -> + couch_srt_server:match_resource(#rctx{pid_ref = PidRef}). + +find_workers_by_pidref(PidRef) -> + couch_srt_server:match_resource(#rctx{type = #rpc_worker{from = PidRef}}). + +curry_field(Field) -> + fun(Ele) -> couch_srt_entry:value(Field, Ele) end. + +-spec group_by(Matcher, KeyFun, ValFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} +when + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(). +group_by(Matcher, KeyFun, ValFun) -> + AggFun = fun erlang:'+'/2, + group_by(Matcher, KeyFun, ValFun, AggFun). + +-spec group_by(Matcher, KeyFun, ValFun, AggFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} +when + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(). +group_by(Matcher, KeyFun, ValFun, AggFun) -> + group_by(Matcher, KeyFun, ValFun, AggFun, query_cardinality_limit()). + +-spec all() -> + matcher(). + +all() -> + Spec = ets:fun2ms(fun(#rctx{} = R) -> R end), + {Spec, ets:match_spec_compile(Spec)}. + +%% eg: group_by(all(), username, docs_read). +%% eg: ^^ or: group_by(all(), [username, docs_read], ioq_calls). +%% eg: group_by(all(), [username, dbname, js_filter], docs_read). +%% eg: group_by(all(), [username, dbname, js_filter], ioq_calls). +%% eg: group_by(all(), [username, dbname, js_filter], get_kv_node). +-spec group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> + {ok, aggregation_result()} | {limit, aggregation_result()} +when + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(), + Limit :: pos_integer(). + +group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) -> + KeyFun = fun(Ele) -> list_to_tuple([couch_srt_entry:value(Key, Ele) || Key <- KeyL]) end, + group_by(Matcher, KeyFun, ValFun, AggFun, Limit); +group_by(Matcher, Key, ValFun, AggFun, Limit) when is_atom(Key) -> + group_by(Matcher, curry_field(Key), ValFun, AggFun, Limit); +group_by(Matcher, KeyFun, Val, AggFun, Limit) when is_atom(Val) -> + group_by(Matcher, KeyFun, curry_field(Val), AggFun, Limit); +group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> + %% This is a space versus speed tradeoff. Both query modes only filter + %% through the table until `Limit` rows have been returned and both will + %% utilize the compiled match_specs to do the testing, but + %% `group_by_fold/5` will sequentially copy in every row and test it + %% locally against the compiled match_spec using `ets:foldl/3`, whereas + %% `group_by_select/5` does the filtering internally in the ETS NIF, by way + %% of passing the uncompiled match_spec to `ets:select/3` to + %% `ets:select/3`. The tradeoff here is that `ets:select` will copy `Limit` + %% full `#rctx{}` records into this caller process, which we then aggregate + %% over, as opposed to `ets:foldl` only sequentially loading a singular + %% `#rctx{}` and extracting the relevant field value to aggregate on. + %% + %% The use of `query_by_fold` should only be needed if `Limit` is + %% drastically increased, and even then, the efficiencies gained here with + %% `query_by_fold` are shortlived until we can encode the values needed by + %% `ValFun` into the match_spec return fields, at which point it becomes + %% strictly worse. + %% + %% NOTE: This discrepancy of `ets:match_spec_run` taking a `match_spec()` + %% vs `ets:select` taking a `comp_match_spec()` is why our CSRT `matcher()` + %% type_spec funnels around both versions instead of just reference to the + %% compiled spec stored by ETS internally. + case config:get_boolean(?CSRT, "use_query_fold", false) of + true -> + group_by_fold(Matcher, KeyFun, ValFun, AggFun, Limit); + false -> + group_by_select(Matcher, KeyFun, ValFun, AggFun, Limit) + end. + +group_by_fold(Matcher, KeyFun, ValFun, AggFun, Limit) -> + FoldFun = fun(Ele, Acc) -> + case maps:size(Acc) =< Limit of + true -> + case ets_match(Ele, Matcher) of + true -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + case AggFun(CurrVal, Val) of + 0 -> + Acc; + NewVal -> + maps:put(Key, NewVal, Acc) + end; + false -> + Acc + end; + false -> + throw({limit, Acc}) + end + end, + try + {ok, ets:foldl(FoldFun, #{}, ?CSRT_ETS)} + catch + throw:{limit, Acc} -> + {limit, Acc} + end. + +ets_match(Ele, {_, CMS}) -> + ets:match_spec_run([Ele], CMS) =/= []. + +group_by_select(Matcher, KeyFun, ValFun, AggFun, Limit) -> + {Status, Rctxs} = group_by_select_rows(Matcher, Limit), + %% If we hit `Status=limit` rows, still aggregate over what we found + Aggregated = lists:foldl( + fun(Rctx, Acc) -> + Key = KeyFun(Rctx), + Val = ValFun(Rctx), + CurrVal = maps:get(Key, Acc, 0), + case AggFun(CurrVal, Val) of + 0 -> + Acc; + NewVal -> + maps:put(Key, NewVal, Acc) + end + end, + #{}, + Rctxs + ), + {Status, Aggregated}. + +group_by_select_rows(Matcher, Limit) -> + try + %% Use `ets:select/3` as this does the ets fold internally in a space + %% efficient way that is still faster than the sequential traversal + %% through the table. See the `ets:select/3` documentation for more + %% info. We also use `ets:select/3` to pass the limit along, which + %% results in ets effeciently traversing rows until `Limit` rows have + %% been accumulated and returned. + %% ets:select/* takes match_spec(), not comp_match_spec() + {MSpec, _CMSpec} = Matcher, + case ets:select(?CSRT_ETS, MSpec, Limit) of + %% Technically the {Rctxs, `continuation()`} here is an `opaque()` + %% type, but we assume `'$end_of_table'` is a reasonable indication + %% of no more rows. However, we fallback to checking the quantity + %% returned in case this is ever no longer true. + {Rctxs, '$end_of_table'} -> + {ok, Rctxs}; + {Rctxs, _Continuation} -> + %% Continuation is opaque, and there's no `is_more_rows` API to + %% check to see if we actually limit the table Limit or we hit + %% the edge case where exactly `Limit` rows were found. The + %% continuation can be passed back to `ets:select/1` to see if + %% explicity returns `'$end_of_table'`, but if it did hit the + %% `Limit`, we now wastefully fetch the next chunk of rows, so + %% instead for now we assume that when the length of rows + %% equals `Limit` that we hit the cap. Note that this is only + %% relevant because the API returning `'$end_of_table'` is not + %% formally specified, but in theory this clause should not be + %% hit. + case length(Rctxs) >= Limit of + true -> + {limit, Rctxs}; + false -> + {ok, Rctxs} + end; + %% Handle '$end_of_table' + _ -> + {ok, []} + end + catch + _:_ -> + {ok, []} + end. + +%% +%% Auxiliary functions to calculate topK +%% + +-record(topK, { + % we store ordered elements in ascending order + seq = [] :: list({aggregation_key(), pos_integer()}), + % we rely on erlang sorting order where `number < atom` + min = infinite :: infinite | pos_integer(), + max = 0 :: non_neg_integer(), + size = 0 :: non_neg_integer(), + % capacity cannot be less than 1 + capacity = 1 :: pos_integer() +}). + +new_topK(K) when K >= 1 -> + #topK{capacity = K}. + +% when we are at capacity +% don't bother adding the value since it is less than what we already saw +update_topK(_Key, Value, #topK{size = S, capacity = S, min = Min} = Top) when Value < Min -> + Top#topK{min = Value}; +% when we are at capacity evict smallest value +update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} = Top) when + Value > Max +-> + % capacity cannot be less than 1, so we can avoid handling the case when Seq is empty + [_ | Truncated] = Seq, + Top#topK{max = Value, seq = lists:keysort(2, [{Key, Value} | Truncated])}; +% when we are at capacity and value is in between min and max evict smallest value +update_topK(Key, Value, #topK{size = S, capacity = S, seq = Seq} = Top) -> + % capacity cannot be less than 1, so we can avoid handling the case when Seq is empty + [_ | Truncated] = Seq, + Top#topK{seq = lists:keysort(2, [{Key, Value} | Truncated])}; +update_topK(Key, Value, #topK{size = S, min = Min, seq = Seq} = Top) when Value < Min -> + Top#topK{size = S + 1, min = Value, seq = lists:keysort(2, [{Key, Value} | Seq])}; +update_topK(Key, Value, #topK{size = S, max = Max, seq = Seq} = Top) when Value > Max -> + Top#topK{size = S + 1, max = Value, seq = lists:keysort(2, [{Key, Value} | Seq])}; +update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) -> + Top#topK{size = S + 1, seq = lists:keysort(2, [{Key, Value} | Seq])}. Review Comment: > I would even skip the whole update_topK function and the record and just use the topk from above as is. The `update_topK` skips the update when `Value < Min`, we would have many small values so we could safe a lot of updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@couchdb.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org