This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch import-nouveau in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit be41929142343c772bcac400fdaf2329e2a8d011 Author: Robert Newson <rnew...@apache.org> AuthorDate: Mon Dec 5 21:17:58 2022 +0000 Import nouveau erlang application --- rebar.config.script | 1 + rel/apps/couch_epi.config | 1 + rel/overlay/etc/default.ini | 3 + rel/reltool.config | 2 + share/server/loop.js | 5 +- share/server/nouveau.js | 142 +++++++ share/server/state.js | 4 +- share/server/util.js | 4 +- src/chttpd/src/chttpd.erl | 4 + src/chttpd/src/chttpd_misc.erl | 9 +- src/ken/src/ken_server.erl | 49 ++- src/mango/src/mango_cursor.erl | 2 + src/mango/src/mango_cursor_nouveau.erl | 294 +++++++++++++ src/mango/src/mango_idx.erl | 30 +- src/mango/src/mango_idx_nouveau.erl | 459 +++++++++++++++++++++ src/mango/src/mango_native_proc.erl | 97 ++++- src/nouveau/include/nouveau.hrl | 24 ++ src/nouveau/src/nouveau.app.src | 30 ++ .../nouveau/src/nouveau.erl | 20 +- src/nouveau/src/nouveau_api.erl | 172 ++++++++ src/nouveau/src/nouveau_app.erl | 31 ++ src/nouveau/src/nouveau_bookmark.erl | 68 +++ src/nouveau/src/nouveau_epi.erl | 50 +++ src/nouveau/src/nouveau_fabric.erl | 39 ++ src/nouveau/src/nouveau_fabric_search.erl | 173 ++++++++ src/nouveau/src/nouveau_httpd.erl | 70 ++++ src/nouveau/src/nouveau_httpd_handlers.erl | 34 ++ src/nouveau/src/nouveau_index_manager.erl | 148 +++++++ src/nouveau/src/nouveau_index_updater.erl | 136 ++++++ src/nouveau/src/nouveau_rpc.erl | 45 ++ src/nouveau/src/nouveau_sup.erl | 32 ++ src/nouveau/src/nouveau_util.erl | 103 +++++ support/build_js.escript | 2 + 33 files changed, 2245 insertions(+), 38 deletions(-) diff --git a/rebar.config.script b/rebar.config.script index 3952ad857..4347c5ad2 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -129,6 +129,7 @@ SubDirs = [ "src/custodian", "src/ddoc_cache", "src/dreyfus", + "src/nouveau", "src/fabric", "src/global_changes", "src/ioq", diff --git a/rel/apps/couch_epi.config b/rel/apps/couch_epi.config index a53721a48..882f1841e 100644 --- a/rel/apps/couch_epi.config +++ b/rel/apps/couch_epi.config @@ -18,5 +18,6 @@ global_changes_epi, mango_epi, mem3_epi, + nouveau_epi, setup_epi ]}. diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index ae691bb8d..37be53e38 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -865,3 +865,6 @@ port = {{prometheus_port}} ; `false`, the expected n value is based on the number of available copies in ; the shard map. ;use_cluster_n_as_expected_n = false + +[nouveau] +enable = true diff --git a/rel/reltool.config b/rel/reltool.config index ab26fb2ed..5e3292378 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -66,6 +66,7 @@ couch_prometheus, %% extra + nouveau, recon ]}, {rel, "start_clean", "", [kernel, stdlib]}, @@ -130,6 +131,7 @@ {app, couch_prometheus, [{incl_cond, include}]}, %% extra + {app, nouveau, [{incl_cond, include}]}, {app, recon, [{incl_cond, include}]} ]}. diff --git a/share/server/loop.js b/share/server/loop.js index 91dd1d6b0..d95e90eec 100644 --- a/share/server/loop.js +++ b/share/server/loop.js @@ -10,7 +10,7 @@ // License for the specific language governing permissions and limitations under // the License. -function create_sandbox() { +function create_sandbox(sandbox_option) { try { // if possible, use evalcx (not always available) var sandbox = evalcx(''); @@ -25,7 +25,7 @@ function create_sandbox() { sandbox.send = Render.send; sandbox.getRow = Render.getRow; sandbox.isArray = isArray; - sandbox.index = Dreyfus.index; + sandbox.index = sandbox_option === 'nouveau'? Nouveau.index : Dreyfus.index; } catch (e) { var sandbox = {}; } @@ -116,6 +116,7 @@ var Loop = function() { "add_lib" : State.addLib, "map_doc" : Views.mapDoc, "index_doc": Dreyfus.indexDoc, + "nouveau_index_doc": Nouveau.indexDoc, "reduce" : Views.reduce, "rereduce" : Views.rereduce }; diff --git a/share/server/nouveau.js b/share/server/nouveau.js new file mode 100644 index 000000000..e799aa134 --- /dev/null +++ b/share/server/nouveau.js @@ -0,0 +1,142 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +var Nouveau = (function() { + + var index_results = []; // holds temporary emitted values during index + + function handleIndexError(err, doc) { + if (err == "fatal_error") { + throw(["error", "map_runtime_error", "function raised 'fatal_error'"]); + } else if (err[0] == "fatal") { + throw(err); + } + var message = "function raised exception " + err.toSource(); + if (doc) message += " with doc._id " + doc._id; + log(message); + }; + + function assertType(name, expected, actual) { + if (typeof actual !== expected) { + throw({name: 'TypeError', message: 'type of ' + name + ' must be a ' + expected + ' not ' + typeof actual}); + } + }; + + return { + index: function(name, value) { + assertType('name', 'string', name); + + if (name.substring(0, 1) === '_') { + throw({name: 'ReservedName', message: 'name must not start with an underscore'}); + } + + // Dreyfus compatibility. + if (arguments.length == 2 || (arguments.length == 3 && typeof arguments[2] == 'object')) { + options = arguments[2] || {}; + if (typeof value == 'boolean') { + // coerce to string as handling is the same. + value = value ? 'true' : 'false' + } + switch (typeof value) { + case 'string': + index_results.push({'@type': 'text', 'name': name, 'value': value, 'stored': options.store || false}); + index_results.push({'@type': 'sorted_dv', 'name': name, 'value': value}); // for sorting. + if (options.facet) { + index_results.push({'@type': 'sorted_set_dv', 'name': name, 'value': value}); + } + break; + case 'number': + index_results.push({'@type': 'double_point', 'name': name, 'value': value}); + if (options.store) { + index_results.push({'@type': 'stored_double', 'name': name, 'value': value}); + } + if (options.facet) { + index_results.push({'@type': 'double_dv', 'name': name, 'value': value}); + } + default: + throw({name: 'TypeError', message: 'value must be a string, a number or boolean not ' + typeof value}); + } + return; + } + + var type = arguments[2]; + assertType('type', 'string', type); + + switch (type) { + case 'binary_dv': + case 'stored_binary': + case 'sorted_set_dv': + case 'sorted_dv': + assertType('value', 'string', value); + index_results.push({'@type': type, 'name': name, 'value': value}); + break; + case 'double_point': + case 'float_dv': + case 'float_point': + case 'int_point': + case 'long_point': + case 'sorted_numeric_dv': + case 'double_dv': + assertType('value', 'number', value); + index_results.push({'@type': type, 'name': name, 'value': value}); + break; + case 'latlon_dv': + case 'latlon_point': + assertType('value', 'number', arguments[3]); + assertType('value', 'number', arguments[4]); + index_results.push({'@type': type, 'name': name, 'lat': arguments[3], 'lon': arguments[4]}); + break; + case 'xy_dv': + case 'xy_point': + assertType('value', 'number', arguments[3]); + assertType('value', 'number', arguments[4]); + index_results.push({'@type': type, 'name': name, 'x': arguments[3], 'y': arguments[4]}); + break; + case 'string': + case 'text': + assertType('value', 'string', value); + if (arguments.length === 4) { + assertType('boolean', arguments[3]); + } + index_results.push({'@type': type, 'name': name, 'value': value, 'stored': arguments[3] || false}); + break; + case 'stored_double': + assertType('value', 'number', value); + index_results.push({'@type': type, 'name': name, 'value': value}); + break; + case 'stored_string': + assertType('value', 'string', value); + index_results.push({'@type': type, 'name': name, 'value': value}); + break; + default: + throw({name: 'TypeError', message: type + ' not supported'}); + } + }, + + indexDoc: function(doc) { + Couch.recursivelySeal(doc); + var buf = []; + for (var fun in State.funs) { + index_results = []; + try { + State.funs[fun](doc); + buf.push(index_results); + } catch (err) { + handleIndexError(err, doc); + buf.push([]); + } + } + print(JSON.stringify(buf)); + } + + } +})(); diff --git a/share/server/state.js b/share/server/state.js index ff553dd57..0bbae7728 100644 --- a/share/server/state.js +++ b/share/server/state.js @@ -19,9 +19,9 @@ var State = { gc(); print("true"); // indicates success }, - addFun : function(newFun) { + addFun : function(newFun, option) { // Compile to a function and add it to funs array - State.funs.push(Couch.compileFunction(newFun, {views : {lib : State.lib}})); + State.funs.push(Couch.compileFunction(newFun, {views : {lib : State.lib}}, undefined, option)); print("true"); }, addLib : function(lib) { diff --git a/share/server/util.js b/share/server/util.js index f570acebd..527dc8f4f 100644 --- a/share/server/util.js +++ b/share/server/util.js @@ -58,11 +58,11 @@ var resolveModule = function(names, mod, root) { var Couch = { // moving this away from global so we can move to json2.js later - compileFunction : function(source, ddoc, name) { + compileFunction : function(source, ddoc, name, sandbox_option) { if (!source) throw(["error","not_found","missing function"]); var functionObject = null; - var sandbox = create_sandbox(); + var sandbox = create_sandbox(sandbox_option); var require = function(name, module) { module = module || {}; diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index c25c18838..94e9915a6 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -1100,6 +1100,8 @@ error_info({error, <<"endpoint has an invalid url">> = Reason}) -> {400, <<"invalid_replication">>, Reason}; error_info({error, <<"proxy has an invalid url">> = Reason}) -> {400, <<"invalid_replication">>, Reason}; +error_info({method_not_allowed, Reason}) -> + {405, <<"method_not_allowed">>, Reason}; error_info({missing_stub, Reason}) -> {412, <<"missing_stub">>, Reason}; error_info(request_entity_too_large) -> @@ -1118,6 +1120,8 @@ error_info(all_workers_died) -> "Nodes are unable to service this " "request due to overloading or maintenance mode." >>}; +error_info({internal_server_error, Reason}) -> + {500, <<"internal server error">>, Reason}; error_info(not_implemented) -> {501, <<"not_implemented">>, <<"this feature is not yet implemented">>}; error_info(timeout) -> diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 0dedeba4d..856650f2a 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -68,12 +68,9 @@ handle_welcome_req(Req, _) -> send_method_not_allowed(Req, "GET,HEAD"). get_features() -> - case dreyfus:available() of - true -> - [search | config:features()]; - false -> - config:features() - end. + case dreyfus:available() of true -> [search]; false -> [] end ++ + case nouveau:enabled() of true -> [nouveau]; false -> [] end ++ + config:features(). handle_favicon_req(Req) -> handle_favicon_req(Req, get_docroot()). diff --git a/src/ken/src/ken_server.erl b/src/ken/src/ken_server.erl index 9f869b379..11503b935 100644 --- a/src/ken/src/ken_server.erl +++ b/src/ken/src/ken_server.erl @@ -171,6 +171,16 @@ handle_cast({trigger_update, #job{name = {_, _, hastings}, server = GPid, seq = Now = erlang:monotonic_time(), ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}), {noreply, State, 0}; +handle_cast({trigger_update, #job{name = {_, Index, nouveau}} = Job}, State) -> + % nouveau_index_manager:update_index will trigger a search index update. + {Pid, _} = erlang:spawn_monitor( + nouveau_index_manager, + update_index, + [Index] + ), + Now = erlang:monotonic_time(), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}), + {noreply, State, 0}; % search index job names have 3 elements. See job record definition. handle_cast({trigger_update, #job{name = {_, _, _}, server = GPid, seq = Seq} = Job}, State) -> % dreyfus_index:await will trigger a search index update. @@ -329,8 +339,9 @@ update_ddoc_indexes(Name, #doc{} = Doc, State) -> end, SearchUpdated = search_updated(Name, Doc, Seq, State), STUpdated = st_updated(Name, Doc, Seq, State), - case {ViewUpdated, SearchUpdated, STUpdated} of - {ok, ok, ok} -> ok; + NouveauUpdated = nouveau_updated(Name, Doc, Seq, State), + case {ViewUpdated, SearchUpdated, STUpdated, NouveauUpdated} of + {ok, ok, ok, ok} -> ok; _ -> resubmit end. @@ -370,6 +381,19 @@ st_updated(_Name, _Doc, _Seq, _State) -> ok. -endif. +nouveau_updated(Name, Doc, Seq, State) -> + case should_update(Doc, <<"indexes">>) of + true -> + try nouveau_util:design_doc_to_indexes(Name, Doc) of + SIndexes -> update_ddoc_nouveau_indexes(Name, SIndexes, Seq, State) + catch + _:_ -> + ok + end; + false -> + ok + end. + should_update(#doc{body = {Props}}, IndexType) -> case couch_util:get_value(<<"autoupdate">>, Props) of false -> @@ -451,6 +475,24 @@ update_ddoc_st_indexes(DbName, Indexes, Seq, State) -> end. -endif. +update_ddoc_nouveau_indexes(DbName, Indexes, Seq, State) -> + if + Indexes =/= [] -> + % Spawn a job for each search index in the ddoc + lists:foldl( + fun(Index, Acc) -> + case maybe_start_job({DbName, Index, nouveau}, nil, Seq, State) of + resubmit -> resubmit; + _ -> Acc + end + end, + ok, + Indexes + ); + true -> + ok + end. + should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) -> Threshold = list_to_integer(config("max_incremental_updates", "1000")), IncrementalChannels = list_to_integer(config("incremental_channels", "80")), @@ -476,6 +518,9 @@ should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) -> {ok, MRSt} = couch_index:get_state(Pid, 0), CurrentSeq = couch_mrview_index:get(update_seq, MRSt), (Seq - CurrentSeq) < Threshold; + % Nouveau has three elements + {_, Index, nouveau} -> + nouveau_index_updated:outdated(Index); % Search name has three elements. {_, _, _} -> {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0), diff --git a/src/mango/src/mango_cursor.erl b/src/mango/src/mango_cursor.erl index e9db4c3cf..a3805159d 100644 --- a/src/mango/src/mango_cursor.erl +++ b/src/mango/src/mango_cursor.erl @@ -31,11 +31,13 @@ -define(CURSOR_MODULES, [ mango_cursor_view, mango_cursor_text, + mango_cursor_nouveau, mango_cursor_special ]). -else. -define(CURSOR_MODULES, [ mango_cursor_view, + mango_cursor_nouveau, mango_cursor_special ]). -endif. diff --git a/src/mango/src/mango_cursor_nouveau.erl b/src/mango/src/mango_cursor_nouveau.erl new file mode 100644 index 000000000..cd7b7ecd1 --- /dev/null +++ b/src/mango/src/mango_cursor_nouveau.erl @@ -0,0 +1,294 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mango_cursor_nouveau). + +-export([ + create/4, + explain/1, + execute/3 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("nouveau/include/nouveau.hrl"). +-include("mango_cursor.hrl"). +-include("mango.hrl"). + +-record(cacc, { + selector, + dbname, + ddocid, + idx_name, + query_args, + bookmark, + limit, + skip, + user_fun, + user_acc, + fields, + execution_stats +}). + +create(Db, Indexes, Selector, Opts) -> + Index = + case Indexes of + [Index0] -> + Index0; + _ -> + ?MANGO_ERROR(multiple_text_indexes) + end, + + NouveauLimit = get_nouveau_limit(), + Limit = erlang:min(NouveauLimit, couch_util:get_value(limit, Opts, mango_opts:default_limit())), + Skip = couch_util:get_value(skip, Opts, 0), + Fields = couch_util:get_value(fields, Opts, all_fields), + + {ok, #cursor{ + db = Db, + index = Index, + ranges = null, + selector = Selector, + opts = Opts, + limit = Limit, + skip = Skip, + fields = Fields + }}. + +explain(Cursor) -> + #cursor{ + selector = Selector, + opts = Opts + } = Cursor, + [ + {'query', mango_selector_text:convert(Selector)}, + {partition, get_partition(Opts, null)}, + {sort, sort_query(Opts, Selector)} + ]. + +execute(Cursor, UserFun, UserAcc) -> + #cursor{ + db = Db, + index = Idx, + limit = Limit, + skip = Skip, + selector = Selector, + opts = Opts, + execution_stats = Stats + } = Cursor, + Query = mango_selector_text:convert(Selector), + QueryArgs = #{ + 'query' => Query, + partition => get_partition(Opts, null), + sort => sort_query(Opts, Selector) + }, + CAcc = #cacc{ + selector = Selector, + dbname = couch_db:name(Db), + ddocid = ddocid(Idx), + idx_name = mango_idx:name(Idx), + bookmark = get_bookmark(Opts), + limit = Limit, + skip = Skip, + query_args = QueryArgs, + user_fun = UserFun, + user_acc = UserAcc, + fields = Cursor#cursor.fields, + execution_stats = mango_execution_stats:log_start(Stats) + }, + try + case Query of + <<>> -> + throw({stop, CAcc}); + _ -> + execute(CAcc) + end + catch + throw:{stop, FinalCAcc} -> + #cacc{ + bookmark = FinalBM, + user_fun = UserFun, + user_acc = LastUserAcc, + execution_stats = Stats0 + } = FinalCAcc, + JsonBM = nouveau_bookmark:pack(FinalBM), + Arg = {add_key, bookmark, JsonBM}, + {_Go, FinalUserAcc} = UserFun(Arg, LastUserAcc), + FinalUserAcc0 = mango_execution_stats:maybe_add_stats( + Opts, UserFun, Stats0, FinalUserAcc + ), + FinalUserAcc1 = mango_cursor:maybe_add_warning(UserFun, Cursor, Stats0, FinalUserAcc0), + {ok, FinalUserAcc1} + end. + +execute(CAcc) -> + case search_docs(CAcc) of + {ok, #{bookmark := Bookmark, <<"hits">> := []}} -> + % If we don't have any results from the + % query it means the request has paged through + % all possible results and the request is over. + NewCAcc = CAcc#cacc{bookmark = Bookmark}, + throw({stop, NewCAcc}); + {ok, #{bookmark := Bookmark, <<"hits">> := Hits}} -> + NewCAcc = CAcc#cacc{bookmark = nouveau_bookmark:to_ejson(Bookmark)}, + HitDocs = get_json_docs(CAcc#cacc.dbname, Hits), + {ok, FinalCAcc} = handle_hits(NewCAcc, HitDocs), + execute(FinalCAcc) + end. + +search_docs(CAcc) -> + #cacc{ + dbname = DbName, + ddocid = DDocId, + idx_name = IdxName + } = CAcc, + QueryArgs = update_query_args(CAcc), + case nouveau_fabric_search:go(DbName, DDocId, IdxName, QueryArgs) of + {ok, SearchResults} -> + {ok, SearchResults}; + {error, Reason} -> + ?MANGO_ERROR({text_search_error, {error, Reason}}) + end. + +handle_hits(CAcc, []) -> + {ok, CAcc}; +handle_hits(CAcc0, [{Hit, Doc} | Rest]) -> + CAcc1 = handle_hit(CAcc0, Hit, Doc), + handle_hits(CAcc1, Rest). + +handle_hit(CAcc0, Hit, not_found) -> + update_bookmark(CAcc0, Hit); +handle_hit(CAcc0, Hit, Doc) -> + #cacc{ + limit = Limit, + skip = Skip, + execution_stats = Stats + } = CAcc0, + CAcc1 = update_bookmark(CAcc0, Hit), + Stats1 = mango_execution_stats:incr_docs_examined(Stats), + couch_stats:increment_counter([mango, docs_examined]), + CAcc2 = CAcc1#cacc{execution_stats = Stats1}, + case mango_selector:match(CAcc2#cacc.selector, Doc) of + true when Skip > 0 -> + CAcc2#cacc{skip = Skip - 1}; + true when Limit == 0 -> + % We hit this case if the user spcified with a + % zero limit. Notice that in this case we need + % to return the bookmark from before this match + throw({stop, CAcc0}); + true when Limit == 1 -> + NewCAcc = apply_user_fun(CAcc2, Doc), + throw({stop, NewCAcc}); + true when Limit > 1 -> + NewCAcc = apply_user_fun(CAcc2, Doc), + NewCAcc#cacc{limit = Limit - 1}; + false -> + CAcc2 + end. + +apply_user_fun(CAcc, Doc) -> + FinalDoc = mango_fields:extract(Doc, CAcc#cacc.fields), + #cacc{ + user_fun = UserFun, + user_acc = UserAcc, + execution_stats = Stats + } = CAcc, + Stats0 = mango_execution_stats:incr_results_returned(Stats), + case UserFun({row, FinalDoc}, UserAcc) of + {ok, NewUserAcc} -> + CAcc#cacc{user_acc = NewUserAcc, execution_stats = Stats0}; + {stop, NewUserAcc} -> + throw({stop, CAcc#cacc{user_acc = NewUserAcc, execution_stats = Stats0}}) + end. + +%% Convert Query to Nouveau sort specifications +%% Convert <<"Field">>, <<"desc">> to <<"-Field">> +%% and append to the nouveau query +sort_query(Opts, Selector) -> + {sort, {Sort}} = lists:keyfind(sort, 1, Opts), + SortList = lists:map( + fun(SortField) -> + {Dir, RawSortField} = + case SortField of + {Field, <<"asc">>} -> {asc, Field}; + {Field, <<"desc">>} -> {desc, Field}; + Field when is_binary(Field) -> {asc, Field} + end, + SField = mango_selector_text:append_sort_type(RawSortField, Selector), + case Dir of + asc -> + SField; + desc -> + <<"-", SField/binary>> + end + end, + Sort + ), + case SortList of + [] -> null; + _ -> SortList + end. + +get_partition(Opts, Default) -> + case couch_util:get_value(partition, Opts) of + <<>> -> Default; + Else -> Else + end. + +get_bookmark(Opts) -> + case lists:keyfind(bookmark, 1, Opts) of + {_, BM} when is_list(BM), BM /= [] -> + BM; + _ -> + nil + end. + +update_bookmark(CAcc, Hit) -> + BM = CAcc#cacc.bookmark, + DbName = CAcc#cacc.dbname, + NewBM = nouveau_bookmark:update(DbName, BM, #{<<"hits">> => [Hit]}), + CAcc#cacc{bookmark = NewBM}. + + +ddocid(Idx) -> + case mango_idx:ddoc(Idx) of + <<"_design/", Rest/binary>> -> + Rest; + Else -> + Else + end. + +update_query_args(CAcc) -> + #cacc{ + bookmark = Bookmark, + query_args = QueryArgs + } = CAcc, + QueryArgs#{ + bookmark => nouveau_bookmark:pack(Bookmark), + limit => get_limit(CAcc) + }. + +get_limit(CAcc) -> + erlang:min(get_nouveau_limit(), CAcc#cacc.limit + CAcc#cacc.skip). + +get_nouveau_limit() -> + config:get_integer("nouveau", "max_limit", 200). + +get_json_docs(DbName, Hits) -> + Ids = lists:map( + fun(Hit) -> + maps:get(<<"id">>, Hit) + end, + Hits + ), + % TODO: respect R query parameter (same as json indexes) + {ok, Docs} = nouveau_fabric:get_json_docs(DbName, Ids), + lists:zipwith(fun(Hit, {doc, Doc}) -> {Hit, Doc} end, Hits, Docs). diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl index a20d730a2..831289af7 100644 --- a/src/mango/src/mango_idx.erl +++ b/src/mango/src/mango_idx.erl @@ -175,12 +175,9 @@ from_ddoc(Db, {Props}) -> _ -> ?MANGO_ERROR(invalid_query_ddoc_language) end, IdxMods = - case dreyfus:available() of - true -> - [mango_idx_view, mango_idx_text]; - false -> - [mango_idx_view] - end, + case dreyfus:available() of true -> [mango_idx_text]; false -> [] end ++ + case nouveau:enabled() of true -> [mango_idx_nouveau]; false -> [] end ++ + [mango_idx_view], Idxs = lists:flatmap(fun(Mod) -> Mod:from_ddoc({Props}) end, IdxMods), lists:map( fun(Idx) -> @@ -249,6 +246,13 @@ cursor_mod(#idx{type = <<"json">>}) -> mango_cursor_view; cursor_mod(#idx{def = all_docs, type = <<"special">>}) -> mango_cursor_special; +cursor_mod(#idx{type = <<"nouveau">>}) -> + case nouveau:enabled() of + true -> + mango_cursor_nouveau; + false -> + ?MANGO_ERROR({index_service_unavailable, <<"nouveau">>}) + end; cursor_mod(#idx{type = <<"text">>}) -> case dreyfus:available() of true -> @@ -261,6 +265,13 @@ idx_mod(#idx{type = <<"json">>}) -> mango_idx_view; idx_mod(#idx{type = <<"special">>}) -> mango_idx_special; +idx_mod(#idx{type = <<"nouveau">>}) -> + case nouveau:enabled() of + true -> + mango_idx_nouveau; + false -> + ?MANGO_ERROR({index_service_unavailable, <<"nouveau">>}) + end; idx_mod(#idx{type = <<"text">>}) -> case dreyfus:available() of true -> @@ -288,6 +299,13 @@ get_idx_type(Opts) -> case proplists:get_value(type, Opts) of <<"json">> -> <<"json">>; + <<"nouveau">> -> + case nouveau:enabled() of + true -> + <<"nouveau">>; + false -> + ?MANGO_ERROR({index_service_unavailable, <<"nouveau">>}) + end; <<"text">> -> case dreyfus:available() of true -> diff --git a/src/mango/src/mango_idx_nouveau.erl b/src/mango/src/mango_idx_nouveau.erl new file mode 100644 index 000000000..074a755ee --- /dev/null +++ b/src/mango/src/mango_idx_nouveau.erl @@ -0,0 +1,459 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mango_idx_nouveau). + +-export([ + validate_new/2, + validate_fields/1, + validate_index_def/1, + add/2, + remove/2, + from_ddoc/1, + to_json/1, + columns/1, + is_usable/3, + get_default_field_options/1 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include("mango.hrl"). +-include("mango_idx.hrl"). + +validate_new(#idx{} = Idx, Db) -> + {ok, Def} = do_validate(Idx#idx.def), + maybe_reject_index_all_req(Def, Db), + {ok, Idx#idx{def = Def}}. + +validate_index_def(IndexInfo) -> + do_validate(IndexInfo). + +add(#doc{body = {Props0}} = DDoc, Idx) -> + Texts1 = + case proplists:get_value(<<"nouveau">>, Props0) of + {Texts0} -> Texts0; + _ -> [] + end, + NewText = make_text(Idx), + Texts2 = lists:keystore(element(1, NewText), 1, Texts1, NewText), + Props1 = lists:keystore(<<"nouveau">>, 1, Props0, {<<"nouveau">>, {Texts2}}), + {ok, DDoc#doc{body = {Props1}}}. + +remove(#doc{body = {Props0}} = DDoc, Idx) -> + Texts1 = + case proplists:get_value(<<"nouveau">>, Props0) of + {Texts0} -> + Texts0; + _ -> + ?MANGO_ERROR({index_not_found, Idx#idx.name}) + end, + Texts2 = lists:keydelete(Idx#idx.name, 1, Texts1), + if + Texts2 /= Texts1 -> ok; + true -> ?MANGO_ERROR({index_not_found, Idx#idx.name}) + end, + Props1 = + case Texts2 of + [] -> + lists:keydelete(<<"nouveau">>, 1, Props0); + _ -> + lists:keystore(<<"nouveau">>, 1, Props0, {<<"nouveau">>, {Texts2}}) + end, + {ok, DDoc#doc{body = {Props1}}}. + +from_ddoc({Props}) -> + case lists:keyfind(<<"nouveau">>, 1, Props) of + {<<"nouveau">>, {Texts}} when is_list(Texts) -> + lists:flatmap( + fun({Name, {VProps}}) -> + case validate_ddoc(VProps) of + invalid_ddoc -> + []; + Def -> + I = #idx{ + type = <<"nouveau">>, + name = Name, + def = Def + }, + [I] + end + end, + Texts + ); + _ -> + [] + end. + +to_json(Idx) -> + {[ + {ddoc, Idx#idx.ddoc}, + {name, Idx#idx.name}, + {type, Idx#idx.type}, + {partitioned, Idx#idx.partitioned}, + {def, {def_to_json(Idx#idx.def)}} + ]}. + +columns(Idx) -> + {Props} = Idx#idx.def, + {<<"fields">>, Fields} = lists:keyfind(<<"fields">>, 1, Props), + case Fields of + <<"all_fields">> -> + all_fields; + _ -> + {DFProps} = couch_util:get_value(<<"default_field">>, Props, {[]}), + Enabled = couch_util:get_value(<<"enabled">>, DFProps, true), + Default = + case Enabled of + true -> [<<"$default">>]; + false -> [] + end, + Default ++ + lists:map( + fun({FProps}) -> + {_, Name} = lists:keyfind(<<"name">>, 1, FProps), + {_, Type} = lists:keyfind(<<"type">>, 1, FProps), + iolist_to_binary([Name, ":", Type]) + end, + Fields + ) + end. + +is_usable(_, Selector, _) when Selector =:= {[]} -> + false; +is_usable(Idx, Selector, _) -> + case columns(Idx) of + all_fields -> + true; + Cols -> + Fields = indexable_fields(Selector), + sets:is_subset(sets:from_list(Fields), sets:from_list(Cols)) + end. + +do_validate({Props}) -> + {ok, Opts} = mango_opts:validate(Props, opts()), + {ok, {Opts}}; +do_validate(Else) -> + ?MANGO_ERROR({invalid_index_text, Else}). + +def_to_json({Props}) -> + def_to_json(Props); +def_to_json([]) -> + []; +def_to_json([{<<"fields">>, <<"all_fields">>} | Rest]) -> + [{<<"fields">>, []} | def_to_json(Rest)]; +def_to_json([{fields, Fields} | Rest]) -> + [{<<"fields">>, fields_to_json(Fields)} | def_to_json(Rest)]; +def_to_json([{<<"fields">>, Fields} | Rest]) -> + [{<<"fields">>, fields_to_json(Fields)} | def_to_json(Rest)]; +% Don't include partial_filter_selector in the json conversion +% if its the default value +def_to_json([{<<"partial_filter_selector">>, {[]}} | Rest]) -> + def_to_json(Rest); +def_to_json([{Key, Value} | Rest]) -> + [{Key, Value} | def_to_json(Rest)]. + +fields_to_json([]) -> + []; +fields_to_json([{[{<<"name">>, Name}, {<<"type">>, Type0}]} | Rest]) -> + ok = validate_field_name(Name), + Type = validate_field_type(Type0), + [{[{Name, Type}]} | fields_to_json(Rest)]; +fields_to_json([{[{<<"type">>, Type0}, {<<"name">>, Name}]} | Rest]) -> + ok = validate_field_name(Name), + Type = validate_field_type(Type0), + [{[{Name, Type}]} | fields_to_json(Rest)]. + +%% In the future, we can possibly add more restrictive validation. +%% For now, let's make sure the field name is not blank. +validate_field_name(<<"">>) -> + throw(invalid_field_name); +validate_field_name(Else) when is_binary(Else) -> + ok; +validate_field_name(_) -> + throw(invalid_field_name). + +validate_field_type(<<"string">>) -> + <<"string">>; +validate_field_type(<<"number">>) -> + <<"number">>; +validate_field_type(<<"boolean">>) -> + <<"boolean">>. + +validate_fields(<<"all_fields">>) -> + {ok, all_fields}; +validate_fields(Fields) -> + try fields_to_json(Fields) of + _ -> + mango_fields:new(Fields) + catch + error:function_clause -> + ?MANGO_ERROR({invalid_index_fields_definition, Fields}); + throw:invalid_field_name -> + ?MANGO_ERROR({invalid_index_fields_definition, Fields}) + end. + +validate_ddoc(VProps) -> + try + Def = proplists:get_value(<<"index">>, VProps), + validate_index_def(Def), + Def + catch + Error:Reason -> + couch_log:error( + "Invalid Index Def ~p: Error. ~p, Reason: ~p", + [VProps, Error, Reason] + ), + invalid_ddoc + end. + +opts() -> + [ + {<<"default_analyzer">>, [ + {tag, default_analyzer}, + {optional, true}, + {default, <<"keyword">>} + ]}, + {<<"default_field">>, [ + {tag, default_field}, + {optional, true}, + {default, {[]}} + ]}, + {<<"partial_filter_selector">>, [ + {tag, partial_filter_selector}, + {optional, true}, + {default, {[]}}, + {validator, fun mango_opts:validate_selector/1} + ]}, + {<<"selector">>, [ + {tag, selector}, + {optional, true}, + {default, {[]}}, + {validator, fun mango_opts:validate_selector/1} + ]}, + {<<"fields">>, [ + {tag, fields}, + {optional, true}, + {default, []}, + {validator, fun ?MODULE:validate_fields/1} + ]}, + {<<"index_array_lengths">>, [ + {tag, index_array_lengths}, + {optional, true}, + {default, true}, + {validator, fun mango_opts:is_boolean/1} + ]} + ]. + +make_text(Idx) -> + Text = + {[ + {<<"index">>, Idx#idx.def}, + {<<"analyzer">>, construct_analyzer(Idx#idx.def)} + ]}, + {Idx#idx.name, Text}. + +get_default_field_options(Props) -> + Default = couch_util:get_value(default_field, Props, {[]}), + case Default of + Bool when is_boolean(Bool) -> + {Bool, <<"standard">>}; + {[]} -> + {true, <<"standard">>}; + {Opts} -> + Enabled = couch_util:get_value(<<"enabled">>, Opts, true), + Analyzer = couch_util:get_value( + <<"analyzer">>, + Opts, + <<"standard">> + ), + {Enabled, Analyzer} + end. + +construct_analyzer({Props}) -> + DefaultAnalyzer = couch_util:get_value( + default_analyzer, + Props, + <<"keyword">> + ), + {DefaultField, DefaultFieldAnalyzer} = get_default_field_options(Props), + DefaultAnalyzerDef = + case DefaultField of + true -> + [{<<"$default">>, DefaultFieldAnalyzer}]; + _ -> + [] + end, + case DefaultAnalyzerDef of + [] -> + <<"keyword">>; + _ -> + {[ + {<<"name">>, <<"perfield">>}, + {<<"default">>, DefaultAnalyzer}, + {<<"fields">>, {DefaultAnalyzerDef}} + ]} + end. + +indexable_fields(Selector) -> + TupleTree = mango_selector_text:convert([], Selector), + indexable_fields([], TupleTree). + +indexable_fields(Fields, {op_and, Args}) when is_list(Args) -> + lists:foldl( + fun(Arg, Fields0) -> indexable_fields(Fields0, Arg) end, + Fields, + Args + ); +%% For queries that use array element access or $in operations, two +%% fields get generated by mango_selector_text:convert. At index +%% definition time, only one field gets defined. In this situation, we +%% remove the extra generated field so that the index can be used. For +%% all other situations, we include the fields as normal. +indexable_fields( + Fields, + {op_or, [ + {op_field, Field0}, + {op_field, {[Name | _], _}} = Field1 + ]} +) -> + case lists:member(<<"[]">>, Name) of + true -> + indexable_fields(Fields, {op_field, Field0}); + false -> + Fields1 = indexable_fields(Fields, {op_field, Field0}), + indexable_fields(Fields1, Field1) + end; +indexable_fields(Fields, {op_or, Args}) when is_list(Args) -> + lists:foldl( + fun(Arg, Fields0) -> indexable_fields(Fields0, Arg) end, + Fields, + Args + ); +indexable_fields(Fields, {op_not, {ExistsQuery, Arg}}) when is_tuple(Arg) -> + Fields0 = indexable_fields(Fields, ExistsQuery), + indexable_fields(Fields0, Arg); +% forces "$exists" : false to use _all_docs +indexable_fields(_, {op_not, {_, false}}) -> + []; +indexable_fields(Fields, {op_insert, Arg}) when is_binary(Arg) -> + Fields; +%% fieldname.[]:length is not a user defined field. +indexable_fields(Fields, {op_field, {[_, <<":length">>], _}}) -> + Fields; +indexable_fields(Fields, {op_field, {Name, _}}) -> + [iolist_to_binary(Name) | Fields]; +%% In this particular case, the lucene index is doing a field_exists query +%% so it is looking at all sorts of combinations of field:* and field.* +%% We don't add the field because we cannot pre-determine what field will exist. +%% Hence we just return Fields and make it less restrictive. +indexable_fields(Fields, {op_fieldname, {_, _}}) -> + Fields; +%% Similar idea to op_fieldname but with fieldname:null +indexable_fields(Fields, {op_null, {_, _}}) -> + Fields; +indexable_fields(Fields, {op_default, _}) -> + [<<"$default">> | Fields]. + +maybe_reject_index_all_req({Def}, Db) -> + DbName = couch_db:name(Db), + #user_ctx{name = User} = couch_db:get_user_ctx(Db), + Fields = couch_util:get_value(fields, Def), + case {Fields, forbid_index_all()} of + {all_fields, "true"} -> + ?MANGO_ERROR(index_all_disabled); + {all_fields, "warn"} -> + couch_log:warning( + "User ~p is indexing all fields in db ~p", + [User, DbName] + ); + _ -> + ok + end. + +forbid_index_all() -> + config:get("mango", "index_all_disabled", "false"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +setup_all() -> + Ctx = test_util:start_couch(), + meck:expect( + couch_log, + warning, + 2, + fun(_, _) -> + throw({test_error, logged_warning}) + end + ), + Ctx. + +teardown_all(Ctx) -> + meck:unload(), + test_util:stop_couch(Ctx). + +setup() -> + %default index all def that generates {fields, all_fields} + Index = #idx{def = {[]}}, + DbName = <<"testdb">>, + UserCtx = #user_ctx{name = <<"u1">>}, + {ok, Db} = couch_db:clustered_db(DbName, UserCtx), + {Index, Db}. + +teardown(_) -> + ok. + +index_all_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun forbid_index_all/1, + fun default_and_false_index_all/1, + fun warn_index_all/1 + ] + } + }. + +forbid_index_all({Idx, Db}) -> + ?_test(begin + ok = config:set("mango", "index_all_disabled", "true", false), + ?assertThrow( + {mango_error, ?MODULE, index_all_disabled}, + validate_new(Idx, Db) + ) + end). + +default_and_false_index_all({Idx, Db}) -> + ?_test(begin + config:delete("mango", "index_all_disabled", false), + {ok, #idx{def = {Def}}} = validate_new(Idx, Db), + Fields = couch_util:get_value(fields, Def), + ?assertEqual(all_fields, Fields), + ok = config:set("mango", "index_all_disabled", "false", false), + {ok, #idx{def = {Def2}}} = validate_new(Idx, Db), + Fields2 = couch_util:get_value(fields, Def2), + ?assertEqual(all_fields, Fields2) + end). + +warn_index_all({Idx, Db}) -> + ?_test(begin + ok = config:set("mango", "index_all_disabled", "warn", false), + ?assertThrow({test_error, logged_warning}, validate_new(Idx, Db)) + end). + +-endif. diff --git a/src/mango/src/mango_native_proc.erl b/src/mango/src/mango_native_proc.erl index d3d200517..648ffe140 100644 --- a/src/mango/src/mango_native_proc.erl +++ b/src/mango/src/mango_native_proc.erl @@ -62,7 +62,7 @@ handle_call({prompt, [<<"reset">>]}, _From, St) -> {reply, true, St#st{indexes = []}}; handle_call({prompt, [<<"reset">>, _QueryConfig]}, _From, St) -> {reply, true, St#st{indexes = []}}; -handle_call({prompt, [<<"add_fun">>, IndexInfo]}, _From, St) -> +handle_call({prompt, [<<"add_fun">>, IndexInfo | _IgnoreRest]}, _From, St) -> Indexes = case validate_index_info(IndexInfo) of true -> @@ -88,6 +88,15 @@ handle_call({prompt, [<<"index_doc">>, Doc]}, _From, St) -> Else end, {reply, Vals, St}; +handle_call({prompt, [<<"nouveau_index_doc">>, Doc]}, _From, St) -> + Vals = + case nouveau_index_doc(St, mango_json:to_binary(Doc)) of + [] -> + [[]]; + Else -> + Else + end, + {reply, Vals, St}; handle_call(Msg, _From, St) -> {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. @@ -111,6 +120,9 @@ map_doc(#st{indexes = Indexes}, Doc) -> index_doc(#st{indexes = Indexes}, Doc) -> lists:map(fun(Idx) -> get_text_entries(Idx, Doc) end, Indexes). +nouveau_index_doc(#st{indexes = Indexes}, Doc) -> + lists:map(fun(Idx) -> get_nouveau_entries(Idx, Doc) end, Indexes). + get_index_entries({IdxProps}, Doc) -> {Fields} = couch_util:get_value(<<"fields">>, IdxProps), Selector = get_index_partial_filter_selector(IdxProps), @@ -146,6 +158,15 @@ get_text_entries({IdxProps}, Doc) -> [] end. +get_nouveau_entries({IdxProps}, Doc) -> + Selector = get_index_partial_filter_selector(IdxProps), + case should_index(Selector, Doc) of + true -> + get_nouveau_entries0(IdxProps, Doc); + false -> + [] + end. + get_index_partial_filter_selector(IdxProps) -> case couch_util:get_value(<<"partial_filter_selector">>, IdxProps, {[]}) of {[]} -> @@ -307,14 +328,76 @@ make_text_field_name([P | Rest], Type) -> Escaped = [mango_util:lucene_escape_field(N) || N <- Parts], iolist_to_binary(mango_util:join(".", Escaped)). + +get_nouveau_entries0(IdxProps, Doc) -> + DefaultEnabled = get_default_enabled(IdxProps), + IndexArrayLengths = get_index_array_lengths(IdxProps), + FieldsList = get_text_field_list(IdxProps), + TAcc = #tacc{ + index_array_lengths = IndexArrayLengths, + fields = FieldsList + }, + Fields0 = get_text_field_values(Doc, TAcc), + Fields = + if + not DefaultEnabled -> Fields0; + true -> add_default_text_field(Fields0) + end, + FieldNames0 = get_field_names(Fields), + FieldNames1 = lists:map(fun convert_to_nouveau_string_field/1, FieldNames0), + Converted = convert_nouveau_fields(Fields), + FieldNames1 ++ Converted. + +convert_to_nouveau_string_field([Name, Value, []]) when is_binary(Name), is_binary(Value) -> + {[ + {<<"@type">>, <<"string">>}, + {<<"name">>, Name}, + {<<"value">>, Value}, + {<<"stored">>, false} + ]}. + +convert_nouveau_fields([]) -> + []; +convert_nouveau_fields([{Name, <<"string">>, Value} | Rest]) -> + Field = {[ + {<<"@type">>, <<"text">>}, + {<<"name">>, Name}, + {<<"value">>, Value}, + {<<"stored">>, false} + ]}, + [Field | convert_nouveau_fields(Rest)]; +convert_nouveau_fields([{Name, <<"number">>, Value} | Rest]) -> + PointField = {[ + {<<"@type">>, <<"double_point">>}, + {<<"name">>, Name}, + {<<"value">>, Value} + ]}, + DocValuesField = {[ + {<<"@type">>, <<"double_dv">>}, + {<<"name">>, Name}, + {<<"value">>, Value} + ]}, + [PointField, DocValuesField | convert_nouveau_fields(Rest)]; +convert_nouveau_fields([{Name, <<"boolean">>, true} | Rest]) -> + Field = {[ + {<<"@type">>, <<"string">>}, + {<<"name">>, Name}, + {<<"value">>, <<"true">>} + ]}, + [Field | convert_nouveau_fields(Rest)]; +convert_nouveau_fields([{Name, <<"boolean">>, false} | Rest]) -> + Field = {[ + {<<"@type">>, <<"string">>}, + {<<"name">>, Name}, + {<<"value">>, <<"false">>} + ]}, + [Field | convert_nouveau_fields(Rest)]. + validate_index_info(IndexInfo) -> IdxTypes = - case dreyfus:available() of - true -> - [mango_idx_view, mango_idx_text]; - false -> - [mango_idx_view] - end, + case dreyfus:available() of true -> [mango_idx_text]; false -> [] end ++ + case nouveau:enabled() of true -> [mango_idx_nouveau]; false -> [] end ++ + [mango_idx_view], Results = lists:foldl( fun(IdxType, Results0) -> try diff --git a/src/nouveau/include/nouveau.hrl b/src/nouveau/include/nouveau.hrl new file mode 100644 index 000000000..56cee5a8d --- /dev/null +++ b/src/nouveau/include/nouveau.hrl @@ -0,0 +1,24 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-record(index, { + dbname, + ddoc_id, + default_analyzer, + field_analyzers, + def, + def_lang, + name, + sig=nil +}). diff --git a/src/nouveau/src/nouveau.app.src b/src/nouveau/src/nouveau.app.src new file mode 100644 index 000000000..bb49d0bc2 --- /dev/null +++ b/src/nouveau/src/nouveau.app.src @@ -0,0 +1,30 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +{application, nouveau, [ + {description, "FuLL tExT SeArcH"}, + {vsn, git}, + {applications, [ + config, + ibrowse, + kernel, + stdlib, + mem3, + rexi + ]}, + {mod, {nouveau_app, []}}, + {registered, [nouveau_index_manager, nouveau_sup]} +]}. diff --git a/rel/apps/couch_epi.config b/src/nouveau/src/nouveau.erl similarity index 70% copy from rel/apps/couch_epi.config copy to src/nouveau/src/nouveau.erl index a53721a48..a8f8fa8ec 100644 --- a/rel/apps/couch_epi.config +++ b/src/nouveau/src/nouveau.erl @@ -2,7 +2,7 @@ % use this file except in compliance with the License. You may obtain a copy of % the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT @@ -10,13 +10,11 @@ % License for the specific language governing permissions and limitations under % the License. -{plugins, [ - couch_db_epi, - chttpd_epi, - couch_index_epi, - dreyfus_epi, - global_changes_epi, - mango_epi, - mem3_epi, - setup_epi -]}. +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau). + +-export([enabled/0]). + +enabled() -> + config:get_boolean("nouveau", "enable", false). diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl new file mode 100644 index 000000000..b441fca8a --- /dev/null +++ b/src/nouveau/src/nouveau_api.erl @@ -0,0 +1,172 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_api). + +-include("nouveau.hrl"). + +-export([ + analyze/2, + index_info/1, + create_index/2, + delete_path/1, + delete_doc/3, + update_doc/4, + search/2 +]). + +-define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). + +analyze(Text, Analyzer) + when is_binary(Text), is_binary(Analyzer) -> + ReqBody = {[{<<"text">>, Text}, {<<"analyzer">>, Analyzer}]}, + Resp = ibrowse:send_req(nouveau_util:nouveau_url() ++ "/analyze", [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody)), + case Resp of + {ok, "200", _, RespBody} -> + Json = jiffy:decode(RespBody, [return_maps]), + {ok, maps:get(<<"tokens">>, Json)}; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end; +analyze(_, _) -> + {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty strings">>}}. + + +index_info(IndexName) + when is_binary(IndexName) -> + Resp = ibrowse:send_req(index_url(IndexName), [], get), + case Resp of + {ok, "200", _, RespBody} -> + {ok, jiffy:decode(RespBody)}; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + + +create_index(IndexName, IndexDefinition) + when is_binary(IndexName) -> + Resp = ibrowse:send_req(index_url(IndexName), [?JSON_CONTENT_TYPE], put, jiffy:encode(IndexDefinition)), + case Resp of + {ok, "204", _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + + +delete_path(Path) + when is_binary(Path) -> + Resp = ibrowse:send_req(index_url(Path), [?JSON_CONTENT_TYPE], delete, []), + case Resp of + {ok, "204", _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + +delete_doc(IndexName, DocId, UpdateSeq) + when is_binary(IndexName), is_binary(DocId) -> + ReqBody = {[{<<"seq">>, UpdateSeq}]}, + Resp = ibrowse:send_req(doc_url(IndexName, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody)), + case Resp of + {ok, "204", _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + +update_doc(IndexName, DocId, UpdateSeq, Fields) + when is_binary(IndexName), is_binary(DocId), is_integer(UpdateSeq), is_list(Fields) -> + ReqBody = {[{<<"seq">>, UpdateSeq}, {<<"fields">>, Fields}]}, + Resp = ibrowse:send_req(doc_url(IndexName, DocId), [?JSON_CONTENT_TYPE], put, jiffy:encode(ReqBody)), + case Resp of + {ok, "204", _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + +search(IndexName, QueryArgs) + when is_binary(IndexName) -> + Resp = ibrowse:send_req(search_url(IndexName), [?JSON_CONTENT_TYPE], post, jiffy:encode(QueryArgs)), + case Resp of + {ok, "200", _, RespBody} -> + {ok, jiffy:decode(RespBody, [return_maps])}; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + +%% private functions + +index_url(IndexName) -> + lists:flatten(io_lib:format("~s/index/~s", + [nouveau_util:nouveau_url(), couch_util:url_encode(IndexName)])). + + +doc_url(IndexName, DocId) -> + lists:flatten(io_lib:format("~s/index/~s/doc/~s", + [nouveau_util:nouveau_url(), couch_util:url_encode(IndexName), couch_util:url_encode(DocId)])). + + +search_url(IndexName) -> + index_url(IndexName) ++ "/search". + + +jaxrs_error("400", Body) -> + {bad_request, message(Body)}; + +jaxrs_error("404", Body) -> + {not_found, message(Body)}; + +jaxrs_error("405", Body) -> + {method_not_allowed, message(Body)}; + +jaxrs_error("422", Body) -> + {bad_request, lists:join(" and ", errors(Body))}; + +jaxrs_error("500", Body) -> + {internal_server_error, message(Body)}. + + +send_error({conn_failed, _}) -> + {error, {service_unavailable, <<"Search service unavailable.">>}}; + +send_error(Reason) -> + {error, Reason}. + + +message(Body) -> + Json = jiffy:decode(Body, [return_maps]), + maps:get(<<"message">>, Json). + + +errors(Body) -> + Json = jiffy:decode(Body, [return_maps]), + maps:get(<<"errors">>, Json). diff --git a/src/nouveau/src/nouveau_app.erl b/src/nouveau/src/nouveau_app.erl new file mode 100644 index 000000000..484a16878 --- /dev/null +++ b/src/nouveau/src/nouveau_app.erl @@ -0,0 +1,31 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_app). +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + nouveau_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/nouveau/src/nouveau_bookmark.erl b/src/nouveau/src/nouveau_bookmark.erl new file mode 100644 index 000000000..942865c23 --- /dev/null +++ b/src/nouveau/src/nouveau_bookmark.erl @@ -0,0 +1,68 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_bookmark). + +-include_lib("mem3/include/mem3.hrl"). + +-export([new/0, update/3, unpack/2, pack/1, to_ejson/1]). + +new() -> + #{}. + +%% Form a bookmark from the last contribution from each shard range +update(DbName, PreviousBookmark, SearchResults) when is_binary(PreviousBookmark) -> + update(DbName, unpack(DbName, PreviousBookmark), SearchResults); +update(DbName, {EJson}, SearchResults) when is_list(EJson) -> + update(DbName, from_ejson({EJson}), SearchResults); +update(DbName, PreviousBookmark, SearchResults) when is_map(PreviousBookmark) -> + Hits = maps:get(<<"hits">>, SearchResults), + NewBookmark0 = lists:foldl(fun(Hit, Acc) -> + maps:put(range_of(DbName, maps:get(<<"id">>, Hit)), maps:get(<<"order">>, Hit), Acc) + end, new(), Hits), + maps:merge(PreviousBookmark, NewBookmark0). + +range_of(DbName, DocId) when is_binary(DbName), is_binary(DocId) -> + [#shard{range = Range} | _] = mem3_shards:for_docid(DbName, DocId), + Range; + +range_of(DbName, Order) when is_binary(DbName), is_list(Order) -> + #{<<"type">> := <<"bytes">>, <<"value">> := EncodedDocId} = lists:last(Order), + range_of(DbName, base64:decode(EncodedDocId)). + +unpack(_DbName, Empty) when Empty == undefined; Empty == nil; Empty == null -> + new(); + +unpack(DbName, PackedBookmark) when is_list(PackedBookmark) -> + unpack(DbName, list_to_binary(PackedBookmark)); +unpack(DbName, PackedBookmark) when is_binary(PackedBookmark) -> + Bookmark = jiffy:decode(base64:decode(PackedBookmark), [return_maps]), + maps:from_list([{range_of(DbName, V), V} || V <- Bookmark]). + +pack(nil) -> + null; +pack({EJson}) when is_list(EJson) -> + pack(from_ejson(EJson)); +pack(UnpackedBookmark) when is_map(UnpackedBookmark) -> + base64:encode(jiffy:encode(maps:values(UnpackedBookmark))). + +%% legacy use of ejson within mango +from_ejson({Props}) -> + maps:from_list(Props). + +to_ejson(Bookmark) when is_map(Bookmark) -> + {maps:to_list(Bookmark)}. + diff --git a/src/nouveau/src/nouveau_epi.erl b/src/nouveau/src/nouveau_epi.erl new file mode 100644 index 000000000..51c3d7ccc --- /dev/null +++ b/src/nouveau/src/nouveau_epi.erl @@ -0,0 +1,50 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_epi). + +-behaviour(couch_epi_plugin). + +-export([ + app/0, + providers/0, + services/0, + data_subscriptions/0, + data_providers/0, + processes/0, + notify/3 +]). + +app() -> + nouveau. + +providers() -> + [{chttpd_handlers, nouveau_httpd_handlers}]. + +services() -> + []. + +data_subscriptions() -> + []. + +data_providers() -> + []. + +processes() -> + []. + +notify(_Key, _Old, _New) -> + ok. diff --git a/src/nouveau/src/nouveau_fabric.erl b/src/nouveau/src/nouveau_fabric.erl new file mode 100644 index 000000000..511cf073c --- /dev/null +++ b/src/nouveau/src/nouveau_fabric.erl @@ -0,0 +1,39 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% inspired by dreyfus_fabric.erl but better + +-module(nouveau_fabric). +-export([get_json_docs/2]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +get_json_docs(DbName, DocIds) -> + fabric:all_docs(DbName, fun callback/2, [], [{keys, DocIds}, {include_docs, true}]). + +callback({meta, _}, Acc) -> + {ok, Acc}; +callback({error, Reason}, _Acc) -> + {error, Reason}; +callback({row, Row}, Acc) -> + case lists:keyfind(value, 1, Row) of + {value, not_found} -> + {ok, [not_found | Acc]}; + {value, _} -> + {ok, [lists:keyfind(doc, 1, Row) | Acc]} + end; +callback(complete, Acc) -> + {ok, lists:reverse(Acc)}; +callback(timeout, _Acc) -> + {error, timeout}. diff --git a/src/nouveau/src/nouveau_fabric_search.erl b/src/nouveau/src/nouveau_fabric_search.erl new file mode 100644 index 000000000..769723eda --- /dev/null +++ b/src/nouveau/src/nouveau_fabric_search.erl @@ -0,0 +1,173 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_fabric_search). + +-export([go/4]). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, { + limit, + sort, + counters, + search_results +}). + +go(DbName, GroupId, IndexName, QueryArgs0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc( + DbName, + <<"_design/", GroupId/binary>>, + [ejson_body] + ), + go(DbName, DDoc, IndexName, QueryArgs0); +go(DbName, #doc{} = DDoc, IndexName, QueryArgs0) -> + {ok, Index} = nouveau_util:design_doc_to_index(DbName, DDoc, IndexName), + Shards = mem3:shards(DbName), + Bookmark = nouveau_bookmark:unpack(DbName, maps:get(bookmark, QueryArgs0)), + QueryArgs1 = maps:remove(bookmark, QueryArgs0), + Counters0 = lists:map( + fun(#shard{} = Shard) -> + After = maps:get(Shard#shard.range, Bookmark, null), + Ref = rexi:cast(Shard#shard.node, {nouveau_rpc, search, + [Shard#shard.name, Index, QueryArgs1#{'after' => After}]}), + Shard#shard{ref = Ref} + end, + Shards), + Counters = fabric_dict:init(Counters0, nil), + Workers = fabric_dict:fetch_keys(Counters), + RexiMon = fabric_util:create_monitors(Workers), + State = #state{ + limit = maps:get(limit, QueryArgs0), + sort = maps:get(sort, QueryArgs0), + counters = Counters, + search_results = #{}}, + try + rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 1000 * 60 * 60) + of + {ok, SearchResults} -> + NewBookmark = nouveau_bookmark:update(DbName, Bookmark, SearchResults), + {ok, SearchResults#{bookmark => NewBookmark}}; + {error, Reason} -> + {error, Reason} + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers) + end. + + +handle_message({ok, Response}, Shard, State) -> + case fabric_dict:lookup_element(Shard, State#state.counters) of + undefined -> + %% already heard from someone else in this range + {ok, State}; + nil -> + SearchResults = merge_search_results(State#state.search_results, Response, State), + Counters1 = fabric_dict:store(Shard, ok, State#state.counters), + Counters2 = fabric_view:remove_overlapping_shards(Shard, Counters1), + State1 = State#state{counters = Counters2, search_results = SearchResults}, + case fabric_dict:any(nil, Counters2) of + true -> + {ok, State1}; + false -> + {stop, SearchResults} + end + end; + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, State) -> + #state{counters = Counters0} = State, + case fabric_util:remove_down_workers(Counters0, NodeRef, []) of + {ok, Counters1} -> + {ok, Counters1}; + error -> + {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({error, Reason}, _Shard, _State) -> + {error, Reason}; + +handle_message(Else, _Shard, _State) -> + {error, Else}. + + +merge_search_results(A, B, #state{} = State) -> + #{ + <<"total_hits">> => maps:get(<<"total_hits">>, A, 0) + maps:get(<<"total_hits">>, B, 0), + <<"hits">> => merge_hits(maps:get(<<"hits">>, A, []), maps:get(<<"hits">>, B, []), State#state.sort, State#state.limit), + <<"counts">> => merge_facets(maps:get(<<"counts">>, A, null), maps:get(<<"counts">>, B, null), State#state.limit), + <<"ranges">> => merge_facets(maps:get(<<"ranges">>, A, null), maps:get(<<"ranges">>, B, null), State#state.limit) + }. + + +merge_hits(HitsA, HitsB, Sort, Limit) -> + MergedHits = lists:merge(merge_fun(Sort), HitsA, HitsB), + lists:sublist(MergedHits, Limit). + +merge_fun(Sort) -> + fun(HitA, HitB) -> + OrderA = maps:get(<<"order">>, HitA), + OrderB = maps:get(<<"order">>, HitB), + compare_order(Sort, OrderA, OrderB) + end. + + +%% no sort order specified +compare_order(null, [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(A), convert_item(B)) of + 0 -> + compare_order(null, ARest, BRest); + Less -> + Less < 1 + end; +%% server-side adds _id on the end of sort order if not present +compare_order([], [A], [B]) -> + couch_ejson_compare:less(convert_item(A), convert_item(B)) < 1; +%% reverse order specified +compare_order([<<"-", _/binary>> | SortRest], [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(B), convert_item(A)) of + 0 -> + compare_order(SortRest, ARest, BRest); + Less -> + Less < 1 + end; +%% forward order specified +compare_order([_ | SortRest], [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(A), convert_item(B)) of + 0 -> + compare_order(SortRest, ARest, BRest); + Less -> + Less < 1 + end. + +convert_item(Item) -> + case maps:get(<<"type">>, Item) of + <<"bytes">> -> + base64:decode(maps:get(<<"value">>, Item)); + _ -> + maps:get(<<"value">>, Item) + end. + + +merge_facets(FacetsA, null, _Limit) -> + FacetsA; + +merge_facets(null, FacetsB, _Limit) -> + FacetsB; + +merge_facets(FacetsA, FacetsB, Limit) -> + Combiner = fun(_, V1, V2) -> maps:merge_with(fun(_, V3, V4) -> V3 + V4 end, V1, V2) end, + maps:merge_with(Combiner, FacetsA, FacetsB). diff --git a/src/nouveau/src/nouveau_httpd.erl b/src/nouveau/src/nouveau_httpd.erl new file mode 100644 index 000000000..936554c86 --- /dev/null +++ b/src/nouveau/src/nouveau_httpd.erl @@ -0,0 +1,70 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_httpd). + +-include_lib("couch/include/couch_db.hrl"). +-include("nouveau.hrl"). + +-export([handle_analyze_req/1, handle_search_req/3, handle_info_req/3]). + +-import(chttpd, [ + send_method_not_allowed/2, + send_json/2, send_json/3, + send_error/2 +]). + +handle_analyze_req(#httpd{method = 'POST'} = Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + {Fields} = chttpd:json_body_obj(Req), + Analyzer = couch_util:get_value(<<"analyzer">>, Fields), + Text = couch_util:get_value(<<"text">>, Fields), + case nouveau_api:analyze(Text, Analyzer) of + {ok, Tokens} -> + send_json(Req, 200, {[{<<"tokens">>, Tokens}]}); + {error, Reason} -> + send_error(Req, Reason) + end; +handle_analyze_req(Req) -> + send_method_not_allowed(Req, "POST"). + + +handle_search_req(#httpd{method = 'GET', path_parts = [_, _, _, _, IndexName]} = Req, Db, DDoc) -> + DbName = couch_db:name(Db), + Query = ?l2b(chttpd:qs_value(Req, "q")), + Limit = list_to_integer(chttpd:qs_value(Req, "limit", "25")), + Sort = ?JSON_DECODE(chttpd:qs_value(Req, "sort", "null")), + Ranges = ?JSON_DECODE(chttpd:qs_value(Req, "ranges", "null")), + Counts = ?JSON_DECODE(chttpd:qs_value(Req, "counts", "null")), + Update = chttpd:qs_value(Req, "update", "true"), + Bookmark = chttpd:qs_value(Req, "bookmark"), + QueryArgs = #{query => Query, limit => Limit, sort => Sort, ranges => Ranges, counts => Counts, update => Update, bookmark => Bookmark}, + case nouveau_fabric_search:go(DbName, DDoc, IndexName, QueryArgs) of + {ok, SearchResults} -> + RespBody = #{ + <<"bookmark">> => nouveau_bookmark:pack(maps:get(bookmark, SearchResults)), + <<"total_hits">> => maps:get(<<"total_hits">>, SearchResults), + <<"hits">> => maps:get(<<"hits">>, SearchResults), + <<"counts">> => maps:get(<<"counts">>, SearchResults, null), + <<"ranges">> => maps:get(<<"ranges">>, SearchResults, null) + }, + send_json(Req, 200, RespBody); + {error, Reason} -> + send_error(Req, Reason) + end. + +handle_info_req(_Req, _Db, _DDoc) -> + ok. diff --git a/src/nouveau/src/nouveau_httpd_handlers.erl b/src/nouveau/src/nouveau_httpd_handlers.erl new file mode 100644 index 000000000..9e616fbd5 --- /dev/null +++ b/src/nouveau/src/nouveau_httpd_handlers.erl @@ -0,0 +1,34 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_httpd_handlers). + +-export([url_handler/1, db_handler/1, design_handler/1]). + +url_handler(<<"_nouveau_analyze">>) -> + fun nouveau_httpd:handle_analyze_req/1; +url_handler(_) -> + no_match. + +db_handler(_) -> + no_match. + +design_handler(<<"_nouveau">>) -> + fun nouveau_httpd:handle_search_req/3; +design_handler(<<"_nouveau_info">>) -> + nomatch; +design_handler(_) -> + no_match. diff --git a/src/nouveau/src/nouveau_index_manager.erl b/src/nouveau/src/nouveau_index_manager.erl new file mode 100644 index 000000000..f3d04d733 --- /dev/null +++ b/src/nouveau/src/nouveau_index_manager.erl @@ -0,0 +1,148 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +%% index manager ensures only one process is updating a nouveau index at a time. +%% calling update_index will block until at least one attempt has been made to +%% make the index as current as the database at the time update_index was called. + +-module(nouveau_index_manager). +-behaviour(gen_server). +-behaviour(config_listener). +-include("nouveau.hrl"). + +%% public api +-export([ + update_index/1 +]). + +%% gen_server bits +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +% config_listener api +-export([handle_config_change/5, handle_config_terminate/3]). + +-export([handle_db_event/3]). + +-define(BY_DBSIG, nouveau_by_dbsig). +-define(BY_REF, nouveau_by_ref). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +update_index(#index{} = Index) -> + gen_server:call(?MODULE, {update, Index}, infinity). + + +init(_) -> + couch_util:set_mqd_off_heap(?MODULE), + ets:new(?BY_DBSIG, [set, named_table]), + ets:new(?BY_REF, [set, named_table]), + couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), + set_max_sessions(nouveau_util:nouveau_url()), + ok = config:listen_for_changes(?MODULE, nil), + {ok, nil}. + + +handle_call({update, #index{} = Index0}, From, State) -> + DbSig = {Index0#index.dbname, Index0#index.sig}, + case ets:lookup(?BY_DBSIG, DbSig) of + [] -> + {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [Index0]), + Queue = queue:in(From, queue:new()), + true = ets:insert(?BY_DBSIG, {DbSig, Index0, Queue}), + true = ets:insert(?BY_REF, {IndexerRef, DbSig}); + [{_DbSig, Index1, Queue}] -> + ets:insert(?BY_DBSIG, {DbSig, Index1, queue:in(From, Queue)}) + end, + {noreply, State}; + +handle_call(_Msg, _From, State) -> + {reply, unexpected_msg, State}. + + +handle_cast(_Msg, State) -> + {noreply, State}. + + +handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, State) -> + case ets:lookup(?BY_REF, IndexerRef) of + [] -> + {noreply, State}; % not one of ours, somehow... + [{_, DbSig}] -> + true = ets:delete(?BY_REF, IndexerRef), + [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, DbSig), + {{value, From}, Queue1} = queue:out(Queue0), + case Reason of + normal -> + gen_server:reply(From, ok); + Other -> + couch_log:error("~p: db:~s ddoc:~s index:~s failed with: ~p", + [?MODULE, mem3:dbname(Index#index.dbname), Index#index.ddoc_id, Index#index.name, Other]), + gen_server:reply(From, {error, {internal_server_error, <<"indexing failed">>}}) + end, + case queue:is_empty(Queue1) of + true -> + true = ets:delete(?BY_DBSIG, DbSig); + false -> + {_IndexerPid, NewIndexerRef} = spawn_monitor(nouveau_index_updater, update, [Index]), + true = ets:insert(?BY_DBSIG, {DbSig, Index, Queue1}), + true = ets:insert(?BY_REF, {NewIndexerRef, DbSig}) + end, + {noreply, State} + end; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; + +handle_info(_Msg, State) -> + {noreply, State}. + +handle_db_event(DbName, deleted, State) -> + couch_log:notice("Deleting indexes for ~s as database was deleted", [DbName]), + nouveau_api:delete_path(nouveau_util:index_path(DbName)), + {ok, State}; + +handle_db_event(_DbName, _Event, State) -> + {ok, State}. + +handle_config_change("nouveau", "url", URL, _Persist, State) -> + set_max_sessions(URL), + {ok, State}. + +handle_config_terminate(_Server, stop, _State) -> + ok; + +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after( + 5000, + whereis(?MODULE), + restart_config_listener + ). + + +set_max_sessions(URL) -> + #{host := Host, port := Port} = uri_string:parse(URL), + ibrowse:set_max_sessions(Host, Port, nouveau_util:max_sessions()). + diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl new file mode 100644 index 000000000..00e13fdae --- /dev/null +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -0,0 +1,136 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_index_updater). +-include_lib("couch/include/couch_db.hrl"). +-include("nouveau.hrl"). + +%% public api +-export([outdated/1]). + +%% callbacks +-export([update/1]). + +-import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). +-import(nouveau_util, [index_path/1]). + +outdated(#index{} = Index) -> + case open_or_create_index(Index) of + {ok, IndexSeq} -> + DbSeq = get_db_seq(Index), + DbSeq > IndexSeq; + {error, Reason} -> + {error, Reason} + end. + + +update(#index{} = Index) -> + {ok, Db} = couch_db:open_int(Index#index.dbname, []), + try + case open_or_create_index(Index) of + {error, Reason} -> + exit({error, Reason}); + {ok, CurSeq} -> + TotalChanges = couch_db:count_changes_since(Db, CurSeq), + couch_task_status:add_task([ + {type, search_indexer}, + {database, Index#index.dbname}, + {design_document, Index#index.ddoc_id}, + {index, Index#index.name}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ]), + + %% update status every half second + couch_task_status:set_update_frequency(500), + + Proc = get_os_process(Index#index.def_lang), + try + true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), + Acc0 = {Db, Index, Proc, 0, TotalChanges}, + {ok, _} = couch_db:fold_changes(Db, CurSeq, fun load_docs/2, Acc0, []) + after + ret_os_process(Proc) + end + end + after + couch_db:close(Db) + end. + + +load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> + {ok, Acc}; + +load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) -> + couch_task_status:update([{changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges}]), + + DI = couch_doc:to_doc_info(FDI), + #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, + + case Del of + true -> + ok = nouveau_api:delete_doc(index_path(Index), Id, Seq); + false -> + {ok, Doc} = couch_db:open_doc(Db, DI, []), + Json = couch_doc:to_json_obj(Doc, []), + [Fields | _] = proc_prompt(Proc, [<<"nouveau_index_doc">>, Json]), + case Fields of + [] -> + ok = nouveau_api:delete_doc(index_path(Index), Id, Seq); + _ -> + ok = nouveau_api:update_doc(index_path(Index), Id, Seq, Fields) + end + end, + {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}. + + +open_or_create_index(#index{} = Index) -> + case get_index_seq(Index) of + {ok, UpdateSeq} -> + {ok, UpdateSeq}; + {error, {not_found, _}} -> + case nouveau_api:create_index(index_path(Index), index_definition(Index)) of + ok -> + {ok, 0}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +get_db_seq(#index{} = Index) -> + {ok, Db} = couch_db:open_int(Index#index.dbname, []), + try + couch_db:get_update_seq(Db) + after + couch_db:close(Db) + end. + +get_index_seq(#index{} = Index) -> + case nouveau_api:index_info(index_path(Index)) of + {ok, {Fields}} -> + {ok, couch_util:get_value(<<"update_seq">>, Fields)}; + {error, Reason} -> + {error, Reason} + end. + +index_definition(#index{} = Index) -> + #{ + <<"default_analyzer">> => Index#index.default_analyzer, + <<"field_analyzers">> => Index#index.field_analyzers + }. diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl new file mode 100644 index 000000000..3140e67f4 --- /dev/null +++ b/src/nouveau/src/nouveau_rpc.erl @@ -0,0 +1,45 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_rpc). + +-export([search/3]). + +-include("nouveau.hrl"). +-import(nouveau_util, [index_path/1]). + +search(DbName, #index{} = Index0, QueryArgs) -> + %% Incorporate the shard name into the record. + Index1 = Index0#index{dbname = DbName}, + Update = maps:get(update, QueryArgs, "true") == "true", + + %% check if index is up to date + case Update andalso nouveau_index_updater:outdated(Index1) of + true -> + case nouveau_index_manager:update_index(Index1) of + ok -> + ok; + {error, Reason} -> + rexi:reply({error, Reason}) + end; + false -> + ok; + {error, Reason} -> + rexi:reply({error, Reason}) + end, + + %% Run the search + rexi:reply(nouveau_api:search(index_path(Index1), QueryArgs)). diff --git a/src/nouveau/src/nouveau_sup.erl b/src/nouveau/src/nouveau_sup.erl new file mode 100644 index 000000000..5174a3358 --- /dev/null +++ b/src/nouveau/src/nouveau_sup.erl @@ -0,0 +1,32 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_sup). +-behaviour(supervisor). + +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + Children = [ + child(nouveau_index_manager) + ], + {ok, {{one_for_one, 10, 1}, couch_epi:register_service(nouveau_epi, Children)}}. + +child(Child) -> + {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl new file mode 100644 index 000000000..6069b4fcf --- /dev/null +++ b/src/nouveau/src/nouveau_util.erl @@ -0,0 +1,103 @@ +%% Copyright 2022 Robert Newson +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_util). + +-include("nouveau.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-export([ + index_path/1, + design_doc_to_indexes/2, + design_doc_to_index/3, + nouveau_url/0, + max_sessions/0 +]). + +index_path(Path) when is_binary(Path) -> + <<(node_prefix())/binary, "/", Path/binary>>; + +index_path(#index{} = Index) -> + <<(node_prefix())/binary, "/", (Index#index.dbname)/binary, "/", (Index#index.sig)/binary>>. + +node_prefix() -> + atom_to_binary(node(), utf8). + +%% copied from dreyfus_index.erl +design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> + RawIndexes = couch_util:get_value(<<"nouveau">>, Fields, {[]}), + case RawIndexes of + {IndexList} when is_list(IndexList) -> + {IndexNames, _} = lists:unzip(IndexList), + lists:flatmap( + fun(IndexName) -> + case (catch design_doc_to_index(DbName, Doc, IndexName)) of + {ok, #index{} = Index} -> [Index]; + _ -> [] + end + end, + IndexNames + ); + _ -> + [] + end. + + +%% copied from dreyfus_index.erl +design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) -> + Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), + {RawIndexes} = couch_util:get_value(<<"nouveau">>, Fields, {[]}), + InvalidDDocError = + {invalid_design_doc, <<"index `", IndexName/binary, "` must have parameter `index`">>}, + case lists:keyfind(IndexName, 1, RawIndexes) of + false -> + {error, {not_found, <<IndexName/binary, " not found.">>}}; + {IndexName, {Index}} -> + DefaultAnalyzer = couch_util:get_value(<<"default_analyzer">>, Index, <<"standard">>), + FieldAnalyzers = couch_util:get_value(<<"field_analyzers">>, Index, #{}), + case couch_util:get_value(<<"index">>, Index) of + undefined -> + {error, InvalidDDocError}; + Def -> + Sig = ?l2b( + couch_util:to_hex( + couch_hash:md5_hash( + term_to_binary({DefaultAnalyzer, FieldAnalyzers, Def}) + ) + ) + ), + {ok, #index{ + dbname = DbName, + default_analyzer = DefaultAnalyzer, + field_analyzers = FieldAnalyzers, + ddoc_id = Id, + def = Def, + def_lang = Language, + name = IndexName, + sig = Sig + }} + end; + _ -> + {error, InvalidDDocError} + end. + + +nouveau_url() -> + config:get("nouveau", "url", "http://127.0.0.1:8080"). + + +max_sessions() -> + config:get_integer("nouveau", "max_sessions", 1000). diff --git a/support/build_js.escript b/support/build_js.escript index 5f1e92015..64a483ece 100644 --- a/support/build_js.escript +++ b/support/build_js.escript @@ -36,6 +36,7 @@ main([]) -> JsFiles = [ "share/server/json2.js", "share/server/dreyfus.js", + "share/server/nouveau.js", "share/server/filter.js", "share/server/mimeparse.js", "share/server/render.js", @@ -49,6 +50,7 @@ main([]) -> CoffeeFiles = [ "share/server/json2.js", "share/server/dreyfus.js", + "share/server/nouveau.js", "share/server/filter.js", "share/server/mimeparse.js", "share/server/render.js",