This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch improve-replicator-doc-parsing in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit c75ced0f8baf85ec3caad3bf9c92092d4a592c26 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Nov 22 11:31:54 2022 -0500 [wip] Replace the auto-inserted replicator VDU with a BDU Switch out the VDU with a BDU (before_doc_update) check. Couch replicator already had a BDU to update the `"owner"` field so we can plug right into it and validate everything we need there. This way we'll have only single validation and parsing code. --- src/couch_replicator/src/couch_replicator.erl | 6 +- .../src/couch_replicator_doc_processor.erl | 9 +- .../src/couch_replicator_doc_processor_worker.erl | 14 +- src/couch_replicator/src/couch_replicator_docs.erl | 564 +++------------------ src/couch_replicator/src/couch_replicator_ids.erl | 8 +- .../src/couch_replicator_js_functions.hrl | 183 ------- ...licator_docs.erl => couch_replicator_parse.erl} | 357 +------------ .../src/couch_replicator_scheduler_job.erl | 4 +- .../src/couch_replicator_utils.erl | 8 +- .../test/eunit/couch_replicator_compact_tests.erl | 2 +- .../couch_replicator_error_reporting_tests.erl | 2 +- .../test/eunit/couch_replicator_proxy_tests.erl | 10 +- ...ch_replicator_retain_stats_between_job_runs.erl | 2 +- .../test/eunit/couch_replicator_test_helper.erl | 2 +- 14 files changed, 110 insertions(+), 1061 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 39b3903ea..3bb177968 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -25,10 +25,8 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("mem3/include/mem3.hrl"). --define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000). -define(REPLICATION_STATES, [ % Just added to scheduler initializing, @@ -58,7 +56,7 @@ | {error, any()} | no_return(). replicate(PostBody, Ctx) -> - {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx), + {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx), Rep = Rep0#rep{start_time = os:timestamp()}, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep, case get_value(cancel, Options, false) of @@ -138,7 +136,7 @@ replication_states() -> -spec strip_url_creds(binary() | {[_]}) -> binary(). strip_url_creds(Endpoint) -> - try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of + try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of #httpdb{url = Url} -> iolist_to_binary(couch_util:url_strip_password(Url)) catch diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 436d7c44d..eb4c02b49 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -44,9 +44,7 @@ notify_cluster_event/2 ]). --include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include_lib("mem3/include/mem3.hrl"). -import(couch_replicator_utils, [ get_json_value/2, @@ -77,9 +75,8 @@ % couch_multidb_changes API callbacks -db_created(DbName, Server) -> +db_created(_DbName, Server) -> couch_stats:increment_counter([couch_replicator, docs, dbs_created]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), Server. db_deleted(DbName, Server) -> @@ -89,7 +86,7 @@ db_deleted(DbName, Server) -> db_found(DbName, Server) -> couch_stats:increment_counter([couch_replicator, docs, dbs_found]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), + couch_replicator_docs:delete_old_rep_ddoc(DbName), Server. db_change(DbName, {ChangeProps} = Change, Server) -> @@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) -> % should propagate to db_change function and will be recorded as permanent % failure in the document. User will have to update the documet to fix the % problem. - Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), + Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc), Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, Filter = case couch_replicator_filters:parse(Rep#rep.options) of diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl index 22c5f8584..b1014ffa5 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl @@ -162,7 +162,7 @@ doc_processor_worker_test_() -> t_should_add_job() -> ?_test(begin Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), ?assert(added_job()) end). @@ -172,7 +172,7 @@ t_already_running_same_docid() -> ?_test(begin Id = {?DB, ?DOC1}, mock_already_running(?DB, ?DOC1), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), ?assert(did_not_add_job()) end). @@ -182,7 +182,7 @@ t_already_running_transient() -> ?_test(begin Id = {?DB, ?DOC1}, mock_already_running(null, null), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), ?assertMatch( {temporary_error, _}, maybe_start_replication( @@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() -> ?_test(begin Id = {?DB, ?DOC1}, mock_already_running(<<"otherdb">>, <<"otherdoc">>), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), ?assertMatch( {permanent_failure, _}, maybe_start_replication( @@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() -> t_spawn_worker() -> ?_test(begin Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), WRef = make_ref(), meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef), Pid = spawn_worker(Id, Rep, 0, WRef), @@ -236,7 +236,7 @@ t_spawn_worker() -> t_ignore_if_doc_deleted() -> ?_test(begin Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil), ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), ?assertNot(added_job()) @@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() -> t_ignore_if_worker_ref_does_not_match() -> ?_test(begin Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), + Rep = couch_replicator_parse:parse_rep_doc_without_id(change()), meck:expect( couch_replicator_doc_processor, get_worker_ref, diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index a60f1a1e1..20dbc0d67 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -13,15 +13,14 @@ -module(couch_replicator_docs). -export([ - parse_rep_doc/1, - parse_rep_doc/2, - parse_rep_db/3, - parse_rep_doc_without_id/1, - parse_rep_doc_without_id/2, + %% parse_rep_doc/1, + %% parse_rep_doc/2, + %% parse_rep_db/3, + %% parse_rep_doc_without_id/1, + %% parse_rep_doc_without_id/2, before_doc_update/3, after_doc_read/2, - ensure_rep_ddoc_exists/1, - ensure_cluster_rep_ddoc_exists/1, + delete_old_rep_ddoc/1, remove_state_fields/2, update_doc_completed/3, update_failed/3, @@ -31,24 +30,10 @@ ]). -include_lib("couch/include/couch_db.hrl"). --include_lib("ibrowse/include/ibrowse.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_js_functions.hrl"). --import(couch_util, [ - get_value/2, - get_value/3, - to_binary/1 -]). - --import(couch_replicator_utils, [ - get_json_value/2, - get_json_value/3 -]). - --define(REP_DB_NAME, <<"_replicator">>). +% The ID of now deleted design doc. On every *_replicator db discovery we try +% to delete it. At some point in the future, remove this logic altogether. -define(REP_DESIGN_DOC, <<"_design/_replicator">>). -define(OWNER, <<"owner">>). -define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}). @@ -126,176 +111,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> ]), ok. --spec ensure_rep_ddoc_exists(binary()) -> ok. -ensure_rep_ddoc_exists(RepDb) -> +-spec delete_old_rep_ddoc(binary()) -> ok. +delete_old_rep_ddoc(RepDb) -> case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of - true -> - ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); - false -> - ok + true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC); + false -> ok end. --spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. -ensure_rep_ddoc_exists(RepDb, DDocId) -> +-spec delete_old_rep_ddoc(binary(), binary()) -> ok. +delete_old_rep_ddoc(RepDb, DDocId) -> case open_rep_doc(RepDb, DDocId) of {not_found, no_db_file} -> - %% database was deleted. ok; {not_found, _Reason} -> - DocProps = replication_design_doc_props(DDocId), - DDoc = couch_doc:from_json_obj({DocProps}), - couch_log:notice("creating replicator ddoc ~p", [RepDb]), - {ok, _Rev} = save_rep_doc(RepDb, DDoc); + ok; {ok, Doc} -> - Latest = replication_design_doc_props(DDocId), - {Props0} = couch_doc:to_json_obj(Doc, []), - {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0), - case compare_ejson({Props}, {Latest}) of - true -> - ok; - false -> - LatestWithRev = [{<<"_rev">>, Rev} | Latest], - DDoc = couch_doc:from_json_obj({LatestWithRev}), - couch_log:notice("updating replicator ddoc ~p", [RepDb]), - try - {ok, _} = save_rep_doc(RepDb, DDoc) - catch - throw:conflict -> - %% ignore, we'll retry next time - ok - end + DeletedDoc = Doc#doc{deleted = true, body = {[]}}, + try + save_rep_doc(RepDb, DeletedDoc) + catch + throw:conflict -> + % ignore, we'll retry next time + ok end end, ok. --spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. -ensure_cluster_rep_ddoc_exists(RepDb) -> - DDocId = ?REP_DESIGN_DOC, - [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), - ensure_rep_ddoc_exists(DbShard, DDocId). - --spec compare_ejson({[_]}, {[_]}) -> boolean(). -compare_ejson(EJson1, EJson2) -> - EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), - EjsonSorted2 = couch_replicator_filters:ejsort(EJson2), - EjsonSorted1 == EjsonSorted2. - --spec replication_design_doc_props(binary()) -> [_]. -replication_design_doc_props(DDocId) -> - [ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]. - -% Note: parse_rep_doc can handle filtered replications. During parsing of the -% replication doc it will make possibly remote http requests to the source -% database. If failure or parsing of filter docs fails, parse_doc throws a -% {filter_fetch_error, Error} excation. This exception should be considered -% transient in respect to the contents of the document itself, since it depends -% on netowrk availability of the source db and other factors. --spec parse_rep_doc({[_]}) -> #rep{}. -parse_rep_doc(RepDoc) -> - {ok, Rep} = - try - parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) - catch - throw:{error, Reason} -> - throw({bad_rep_doc, Reason}); - throw:{filter_fetch_error, Reason} -> - throw({filter_fetch_error, Reason}); - Tag:Err -> - throw({bad_rep_doc, to_binary({Tag, Err})}) - end, - Rep. - --spec parse_rep_doc_without_id({[_]}) -> #rep{}. -parse_rep_doc_without_id(RepDoc) -> - {ok, Rep} = - try - parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) - catch - throw:{error, Reason} -> - throw({bad_rep_doc, Reason}); - Tag:Err -> - throw({bad_rep_doc, to_binary({Tag, Err})}) - end, - Rep. - --spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc(Doc, UserCtx) -> - {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), - Cancel = get_value(cancel, Rep#rep.options, false), - Id = get_value(id, Rep#rep.options, nil), - case {Cancel, Id} of - {true, nil} -> - % Cancel request with no id, must parse id out of body contents - {ok, update_rep_id(Rep)}; - {true, Id} -> - % Cancel request with an id specified, so do not parse id from body - {ok, Rep}; - {false, _Id} -> - % Not a cancel request, regular replication doc - {ok, update_rep_id(Rep)} - end. - --spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc_without_id({Props}, UserCtx) -> - {SrcProxy, TgtProxy} = parse_proxy_settings(Props), - Opts = make_options(Props), - case - get_value(cancel, Opts, false) andalso - (get_value(id, Opts, nil) =/= nil) - of - true -> - {ok, #rep{options = Opts, user_ctx = UserCtx}}; - false -> - Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts), - Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts), - {Type, View} = - case couch_replicator_filters:view_type(Props, Opts) of - {error, Error} -> - throw({bad_request, Error}); - Result -> - Result - end, - Rep = #rep{ - source = Source, - target = Target, - options = Opts, - user_ctx = UserCtx, - type = Type, - view = View, - doc_id = get_value(<<"_id">>, Props, null) - }, - % Check if can parse filter code, if not throw exception - case couch_replicator_filters:parse(Opts) of - {error, FilterError} -> - throw({error, FilterError}); - {ok, _Filter} -> - ok - end, - {ok, Rep} - end. - -parse_proxy_settings(Props) when is_list(Props) -> - Proxy = get_value(<<"proxy">>, Props, <<>>), - SrcProxy = get_value(<<"source_proxy">>, Props, <<>>), - TgtProxy = get_value(<<"target_proxy">>, Props, <<>>), - - case Proxy =/= <<>> of - true when SrcProxy =/= <<>> -> - Error = "`proxy` is mutually exclusive with `source_proxy`", - throw({bad_request, Error}); - true when TgtProxy =/= <<>> -> - Error = "`proxy` is mutually exclusive with `target_proxy`", - throw({bad_request, Error}); - true -> - {Proxy, Proxy}; - false -> - {SrcProxy, TgtProxy} - end. - % Update a #rep{} record with a replication_id. Calculating the id might involve % fetching a filter from the source db, and so it could fail intermetently. % In case of a failure to fetch the filter this function will throw a @@ -386,316 +227,46 @@ save_rep_doc(DbName, Doc) -> couch_db:close(Db) end. --spec rep_user_ctx({[_]}) -> #user_ctx{}. -rep_user_ctx({RepDoc}) -> - case get_json_value(<<"user_ctx">>, RepDoc) of - undefined -> - #user_ctx{}; - {UserCtx} -> - #user_ctx{ - name = get_json_value(<<"name">>, UserCtx, null), - roles = get_json_value(<<"roles">>, UserCtx, []) - } - end. - --spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return(). -parse_rep_db({Props}, Proxy, Options) -> - ProxyParams = parse_proxy_params(Proxy), - ProxyURL = - case ProxyParams of - [] -> undefined; - _ -> binary_to_list(Proxy) - end, - Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), - {AuthProps} = get_value(<<"auth">>, Props, {[]}), - {BinHeaders} = get_value(<<"headers">>, Props, {[]}), - Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), - DefaultHeaders = (#httpdb{})#httpdb.headers, - HttpDb = #httpdb{ - url = Url, - auth_props = AuthProps, - headers = lists:ukeymerge(1, Headers, DefaultHeaders), - ibrowse_options = lists:keysort( - 1, - [ - {socket_options, get_value(socket_options, Options)} - | ProxyParams ++ ssl_params(Url) - ] - ), - timeout = get_value(connection_timeout, Options), - http_connections = get_value(http_connections, Options), - retries = get_value(retries, Options), - proxy_url = ProxyURL - }, - couch_replicator_utils:normalize_basic_auth(HttpDb); -parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); -parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); -parse_rep_db(<<_/binary>>, _Proxy, _Options) -> - throw({error, local_endpoints_not_supported}); -parse_rep_db(undefined, _Proxy, _Options) -> - throw({error, <<"Missing replicator database">>}). - --spec maybe_add_trailing_slash(binary() | list()) -> list(). -maybe_add_trailing_slash(Url) when is_binary(Url) -> - maybe_add_trailing_slash(?b2l(Url)); -maybe_add_trailing_slash(Url) -> - case lists:member($?, Url) of - true -> - % skip if there are query params - Url; - false -> - case lists:last(Url) of - $/ -> - Url; - _ -> - Url ++ "/" - end - end. - --spec make_options([_]) -> [_]. -make_options(Props) -> - Options0 = lists:ukeysort(1, convert_options(Props)), - Options = check_options(Options0), - DefWorkers = config:get_integer("replicator", "worker_processes", 4), - DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500), - DefConns = config:get_integer("replicator", "http_connections", 20), - DefTimeout = config:get_integer("replicator", "connection_timeout", 30000), - DefRetries = config:get_integer("replicator", "retries_per_request", 5), - UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true), - UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true), - DefCheckpointInterval = config:get_integer( - "replicator", - "checkpoint_interval", - 30000 - ), - {ok, DefSocketOptions} = couch_util:parse_term( - config:get( - "replicator", - "socket_options", - "[{keepalive, true}, {nodelay, false}]" - ) - ), - lists:ukeymerge( - 1, - Options, - lists:keysort(1, [ - {connection_timeout, DefTimeout}, - {retries, DefRetries}, - {http_connections, DefConns}, - {socket_options, DefSocketOptions}, - {worker_batch_size, DefBatchSize}, - {worker_processes, DefWorkers}, - {use_checkpoints, UseCheckpoints}, - {use_bulk_get, UseBulkGet}, - {checkpoint_interval, DefCheckpointInterval} - ]) - ). - --spec convert_options([_]) -> [_]. -convert_options([]) -> - []; -convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) -> - throw({bad_request, <<"parameter `cancel` must be a boolean">>}); -convert_options([{<<"cancel">>, V} | R]) -> - [{cancel, V} | convert_options(R)]; -convert_options([{IdOpt, V} | R]) when - IdOpt =:= <<"_local_id">>; - IdOpt =:= <<"replication_id">>; - IdOpt =:= <<"id">> --> - [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; -convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) -> - throw({bad_request, <<"parameter `create_target` must be a boolean">>}); -convert_options([{<<"create_target">>, V} | R]) -> - [{create_target, V} | convert_options(R)]; -convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) -> - throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>}); -convert_options([{<<"create_target_params">>, V} | R]) -> - [{create_target_params, V} | convert_options(R)]; -convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) -> - throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>}); -convert_options([{<<"winning_revs_only">>, V} | R]) -> - [{winning_revs_only, V} | convert_options(R)]; -convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) -> - throw({bad_request, <<"parameter `continuous` must be a boolean">>}); -convert_options([{<<"continuous">>, V} | R]) -> - [{continuous, V} | convert_options(R)]; -convert_options([{<<"filter">>, V} | R]) -> - [{filter, V} | convert_options(R)]; -convert_options([{<<"query_params">>, V} | R]) -> - [{query_params, V} | convert_options(R)]; -convert_options([{<<"doc_ids">>, null} | R]) -> - convert_options(R); -convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> - throw({bad_request, <<"parameter `doc_ids` must be an array">>}); -convert_options([{<<"doc_ids">>, V} | R]) -> - % Ensure same behaviour as old replicator: accept a list of percent - % encoded doc IDs. - DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), - [{doc_ids, DocIds} | convert_options(R)]; -convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> - throw({bad_request, <<"parameter `selector` must be a JSON object">>}); -convert_options([{<<"selector">>, V} | R]) -> - [{selector, V} | convert_options(R)]; -convert_options([{<<"worker_processes">>, V} | R]) -> - [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"worker_batch_size">>, V} | R]) -> - [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"http_connections">>, V} | R]) -> - [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"connection_timeout">>, V} | R]) -> - [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"retries_per_request">>, V} | R]) -> - [{retries, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"socket_options">>, V} | R]) -> - {ok, SocketOptions} = couch_util:parse_term(V), - [{socket_options, SocketOptions} | convert_options(R)]; -convert_options([{<<"since_seq">>, V} | R]) -> - [{since_seq, V} | convert_options(R)]; -convert_options([{<<"use_checkpoints">>, V} | R]) -> - [{use_checkpoints, V} | convert_options(R)]; -convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) -> - throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>}); -convert_options([{<<"use_bulk_get">>, V} | R]) -> - [{use_bulk_get, V} | convert_options(R)]; -convert_options([{<<"checkpoint_interval">>, V} | R]) -> - [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; -% skip unknown option -convert_options([_ | R]) -> - convert_options(R). - --spec check_options([_]) -> [_]. -check_options(Options) -> - DocIds = lists:keyfind(doc_ids, 1, Options), - Filter = lists:keyfind(filter, 1, Options), - Selector = lists:keyfind(selector, 1, Options), - case {DocIds, Filter, Selector} of - {false, false, false} -> Options; - {false, false, _} -> Options; - {false, _, false} -> Options; - {_, false, false} -> Options; - _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"}) - end. - --spec parse_proxy_params(binary() | [_]) -> [_]. -parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> - parse_proxy_params(?b2l(ProxyUrl)); -parse_proxy_params([]) -> - []; -parse_proxy_params(ProxyUrl) -> - #url{ - host = Host, - port = Port, - username = User, - password = Passwd, - protocol = Protocol - } = ibrowse_lib:parse_url(ProxyUrl), - Params = - [ - {proxy_host, Host}, - {proxy_port, Port} - ] ++ - case is_list(User) andalso is_list(Passwd) of - false -> - []; - true -> - [{proxy_user, User}, {proxy_password, Passwd}] - end, - case Protocol of - socks5 -> - [proxy_to_socks5(Param) || Param <- Params]; - _ -> - Params - end. - --spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}. -proxy_to_socks5({proxy_host, Val}) -> - {socks5_host, Val}; -proxy_to_socks5({proxy_port, Val}) -> - {socks5_port, Val}; -proxy_to_socks5({proxy_user, Val}) -> - {socks5_user, Val}; -proxy_to_socks5({proxy_password, Val}) -> - {socks5_password, Val}. - --spec ssl_params([_]) -> [_]. -ssl_params(Url) -> - case ibrowse_lib:parse_url(Url) of - #url{protocol = https} -> - Depth = config:get_integer( - "replicator", - "ssl_certificate_max_depth", - 3 - ), - VerifyCerts = config:get_boolean( - "replicator", - "verify_ssl_certificates", - false - ), - CertFile = config:get("replicator", "cert_file", undefined), - KeyFile = config:get("replicator", "key_file", undefined), - Password = config:get("replicator", "password", undefined), - SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)], - SslOpts1 = - case CertFile /= undefined andalso KeyFile /= undefined of - true -> - case Password of - undefined -> - [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; - _ -> - [ - {certfile, CertFile}, - {keyfile, KeyFile}, - {password, Password} - ] ++ SslOpts - end; - false -> - SslOpts - end, - [{is_ssl, true}, {ssl_options, SslOpts1}]; - #url{protocol = http} -> - [] - end. - --spec ssl_verify_options(true | false) -> [_]. -ssl_verify_options(true) -> - CAFile = config:get("replicator", "ssl_trusted_certificates_file"), - [{verify, verify_peer}, {cacertfile, CAFile}]; -ssl_verify_options(false) -> - [{verify, verify_none}]. - -spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}. before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) -> Doc; +before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) -> + % Skip internal replicator updates + Doc; before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> - #user_ctx{ - roles = Roles, - name = Name - } = couch_db:get_user_ctx(Db), - case lists:member(<<"_replicator">>, Roles) of + #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db), + IsReplicator = lists:member(<<"_replicator">>, Roles), + Doc1 = case IsReplicator of + true -> Doc; + false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc) + end, + IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>, + case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of true -> - Doc; + ok; false -> - case couch_util:get_value(?OWNER, Body) of - undefined -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - Name -> - Doc; - Other -> - case (catch couch_db:check_is_admin(Db)) of - ok when Other =:= null -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - ok -> - Doc; - _ -> - throw( - {forbidden, - <<"Can't update replication documents", " from other users.">>} - ) - end + try + couch_replicator_parse:parse_rep_doc(Doc1#doc.body) + catch + throw:{bad_rep_doc, Error} -> + throw({forbidden, Error}) end + end, + Doc1. + +before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; +before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) -> + Doc; +before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) -> + case (catch couch_db:check_is_admin(Db)) of + ok when Other =:= null -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; + ok -> + Doc; + _ -> + Err = <<"Can't update replication documents from other users.">>, + throw({forbidden, Err}) end. -spec after_doc_read(#doc{}, Db :: any()) -> #doc{}. @@ -707,22 +278,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) -> ok -> Doc; _ -> - case couch_util:get_value(?OWNER, Body) of + case get_value(?OWNER, Body) of Name -> Doc; _Other -> - Source = strip_credentials( - couch_util:get_value( - <<"source">>, - Body - ) - ), - Target = strip_credentials( - couch_util:get_value( - <<"target">>, - Body - ) - ), + Source = strip_credentials(get_value(<<"source">>, Body)), + Target = strip_credentials(get_value(<<"target">>, Body)), NewBody0 = ?replace(Body, <<"source">>, Source), NewBody = ?replace(NewBody0, <<"target">>, Target), #doc{revs = {Pos, [_ | Revs]}} = Doc, @@ -765,6 +326,15 @@ error_reason({error, Reason}) -> error_reason(Reason) -> to_binary(Reason). +to_binary(Val) -> + couch_util:to_binary(Val). + +get_value(Key, Props) -> + couch_util:get_value(Key, Props). + +get_json_value(Key, Obj) -> + couch_replicator_utils:get_json_value(Key, Obj). + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl index 939070b95..86fe1f26e 100644 --- a/src/couch_replicator/src/couch_replicator_ids.erl +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) -> {<<"source">>, <<"http://foo.example.bar">>}, {<<"target">>, <<"http://bar.example.foo">>} ], - Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}), + Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}), Id1 = replication_id(Rep1), RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}], - Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}), + Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}), Id2 = replication_id(Rep2), ?assertNotEqual(Id1, Id2). @@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) -> {<<"source">>, <<"http://foo.example.bar">>}, {<<"target">>, <<"http://bar.example.foo">>} ], - Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}), + Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}), Id1 = replication_id(Rep1), RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}], - Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}), + Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}), Id2 = replication_id(Rep2), ?assertEqual(Id1, Id2). diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl deleted file mode 100644 index 4f4369075..000000000 --- a/src/couch_replicator/src/couch_replicator_js_functions.hrl +++ /dev/null @@ -1,183 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(REP_DB_DOC_VALIDATE_FUN, <<" - function(newDoc, oldDoc, userCtx) { - function reportError(error_msg) { - log('Error writing document `' + newDoc._id + - '\\' to the replicator database: ' + error_msg); - throw({forbidden: error_msg}); - } - - function validateEndpoint(endpoint, fieldName) { - if ((typeof endpoint !== 'string') && - ((typeof endpoint !== 'object') || (endpoint === null))) { - - reportError('The `' + fieldName + '\\' property must exist' + - ' and be either a string or an object.'); - } - - if (typeof endpoint === 'object') { - if ((typeof endpoint.url !== 'string') || !endpoint.url) { - reportError('The url property must exist in the `' + - fieldName + '\\' field and must be a non-empty string.'); - } - - if ((typeof endpoint.auth !== 'undefined') && - ((typeof endpoint.auth !== 'object') || - endpoint.auth === null)) { - - reportError('`' + fieldName + - '.auth\\' must be a non-null object.'); - } - - if ((typeof endpoint.headers !== 'undefined') && - ((typeof endpoint.headers !== 'object') || - endpoint.headers === null)) { - - reportError('`' + fieldName + - '.headers\\' must be a non-null object.'); - } - } - } - - var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); - var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); - - if (isReplicator) { - // Always let replicator update the replication document - return; - } - - if (newDoc._replication_state === 'failed') { - // Skip validation in case when we update the document with the - // failed state. In this case it might be malformed. However, - // replicator will not pay attention to failed documents so this - // is safe. - return; - } - - if (!newDoc._deleted) { - validateEndpoint(newDoc.source, 'source'); - validateEndpoint(newDoc.target, 'target'); - - if ((typeof newDoc.create_target !== 'undefined') && - (typeof newDoc.create_target !== 'boolean')) { - - reportError('The `create_target\\' field must be a boolean.'); - } - - if ((typeof newDoc.winning_revs_only !== 'undefined') && - (typeof newDoc.winning_revs_only !== 'boolean')) { - - reportError('The `winning_revs_only\\' field must be a boolean.'); - } - - if ((typeof newDoc.continuous !== 'undefined') && - (typeof newDoc.continuous !== 'boolean')) { - - reportError('The `continuous\\' field must be a boolean.'); - } - - if ((typeof newDoc.doc_ids !== 'undefined') && - !isArray(newDoc.doc_ids)) { - - reportError('The `doc_ids\\' field must be an array of strings.'); - } - - if ((typeof newDoc.selector !== 'undefined') && - (typeof newDoc.selector !== 'object')) { - - reportError('The `selector\\' field must be an object.'); - } - - if ((typeof newDoc.filter !== 'undefined') && - ((typeof newDoc.filter !== 'string') || !newDoc.filter)) { - - reportError('The `filter\\' field must be a non-empty string.'); - } - - if ((typeof newDoc.doc_ids !== 'undefined') && - (typeof newDoc.selector !== 'undefined')) { - - reportError('`doc_ids\\' field is incompatible with `selector\\'.'); - } - - if ( ((typeof newDoc.doc_ids !== 'undefined') || - (typeof newDoc.selector !== 'undefined')) && - (typeof newDoc.filter !== 'undefined') ) { - - reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.'); - } - - if ((typeof newDoc.query_params !== 'undefined') && - ((typeof newDoc.query_params !== 'object') || - newDoc.query_params === null)) { - - reportError('The `query_params\\' field must be an object.'); - } - - if (newDoc.user_ctx) { - var user_ctx = newDoc.user_ctx; - - if ((typeof user_ctx !== 'object') || (user_ctx === null)) { - reportError('The `user_ctx\\' property must be a ' + - 'non-null object.'); - } - - if (!(user_ctx.name === null || - (typeof user_ctx.name === 'undefined') || - ((typeof user_ctx.name === 'string') && - user_ctx.name.length > 0))) { - - reportError('The `user_ctx.name\\' property must be a ' + - 'non-empty string or null.'); - } - - if (!isAdmin && (user_ctx.name !== userCtx.name)) { - reportError('The given `user_ctx.name\\' is not valid'); - } - - if (user_ctx.roles && !isArray(user_ctx.roles)) { - reportError('The `user_ctx.roles\\' property must be ' + - 'an array of strings.'); - } - - if (!isAdmin && user_ctx.roles) { - for (var i = 0; i < user_ctx.roles.length; i++) { - var role = user_ctx.roles[i]; - - if (typeof role !== 'string' || role.length === 0) { - reportError('Roles must be non-empty strings.'); - } - if (userCtx.roles.indexOf(role) === -1) { - reportError('Invalid role (`' + role + - '\\') in the `user_ctx\\''); - } - } - } - } else { - if (!isAdmin) { - reportError('The `user_ctx\\' property is missing (it is ' + - 'optional for admins only).'); - } - } - } else { - if (!isAdmin) { - if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) { - reportError('Replication documents can only be deleted by ' + - 'admins or by the users who created them.'); - } - } - } - } -">>). diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_parse.erl similarity index 65% copy from src/couch_replicator/src/couch_replicator_docs.erl copy to src/couch_replicator/src/couch_replicator_parse.erl index a60f1a1e1..a584d0306 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_parse.erl @@ -10,183 +10,20 @@ % License for the specific language governing permissions and limitations under % the License. --module(couch_replicator_docs). +-module(couch_replicator_parse). -export([ parse_rep_doc/1, parse_rep_doc/2, parse_rep_db/3, parse_rep_doc_without_id/1, - parse_rep_doc_without_id/2, - before_doc_update/3, - after_doc_read/2, - ensure_rep_ddoc_exists/1, - ensure_cluster_rep_ddoc_exists/1, - remove_state_fields/2, - update_doc_completed/3, - update_failed/3, - update_rep_id/1, - update_triggered/2, - update_error/2 + parse_rep_doc_without_id/2 ]). -include_lib("couch/include/couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). --include_lib("mem3/include/mem3.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_js_functions.hrl"). - --import(couch_util, [ - get_value/2, - get_value/3, - to_binary/1 -]). - --import(couch_replicator_utils, [ - get_json_value/2, - get_json_value/3 -]). - --define(REP_DB_NAME, <<"_replicator">>). --define(REP_DESIGN_DOC, <<"_design/_replicator">>). --define(OWNER, <<"owner">>). --define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}). --define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). - -remove_state_fields(DbName, DocId) -> - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, undefined}, - {<<"_replication_state_time">>, undefined}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_id">>, undefined}, - {<<"_replication_stats">>, undefined} - ]). - --spec update_doc_completed(binary(), binary(), [_]) -> any(). -update_doc_completed(DbName, DocId, Stats) -> - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"completed">>}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_stats">>, {Stats}} - ]), - couch_stats:increment_counter([ - couch_replicator, - docs, - completed_state_updates - ]). - --spec update_failed(binary(), binary(), any()) -> any(). -update_failed(DbName, DocId, Error) -> - Reason = error_reason(Error), - couch_log:error( - "Error processing replication doc `~s` from `~s`: ~s", - [DocId, DbName, Reason] - ), - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"failed">>}, - {<<"_replication_stats">>, undefined}, - {<<"_replication_state_reason">>, Reason} - ]), - couch_stats:increment_counter([ - couch_replicator, - docs, - failed_state_updates - ]). - --spec update_triggered(#rep{}, rep_id()) -> ok. -update_triggered(Rep, {Base, Ext}) -> - #rep{ - db_name = DbName, - doc_id = DocId - } = Rep, - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_id">>, iolist_to_binary([Base, Ext])}, - {<<"_replication_stats">>, undefined} - ]), - ok. - --spec update_error(#rep{}, any()) -> ok. -update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> - Reason = error_reason(Error), - BinRepId = - case RepId of - {Base, Ext} -> - iolist_to_binary([Base, Ext]); - _Other -> - null - end, - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"error">>}, - {<<"_replication_state_reason">>, Reason}, - {<<"_replication_stats">>, undefined}, - {<<"_replication_id">>, BinRepId} - ]), - ok. - --spec ensure_rep_ddoc_exists(binary()) -> ok. -ensure_rep_ddoc_exists(RepDb) -> - case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of - true -> - ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); - false -> - ok - end. - --spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. -ensure_rep_ddoc_exists(RepDb, DDocId) -> - case open_rep_doc(RepDb, DDocId) of - {not_found, no_db_file} -> - %% database was deleted. - ok; - {not_found, _Reason} -> - DocProps = replication_design_doc_props(DDocId), - DDoc = couch_doc:from_json_obj({DocProps}), - couch_log:notice("creating replicator ddoc ~p", [RepDb]), - {ok, _Rev} = save_rep_doc(RepDb, DDoc); - {ok, Doc} -> - Latest = replication_design_doc_props(DDocId), - {Props0} = couch_doc:to_json_obj(Doc, []), - {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0), - case compare_ejson({Props}, {Latest}) of - true -> - ok; - false -> - LatestWithRev = [{<<"_rev">>, Rev} | Latest], - DDoc = couch_doc:from_json_obj({LatestWithRev}), - couch_log:notice("updating replicator ddoc ~p", [RepDb]), - try - {ok, _} = save_rep_doc(RepDb, DDoc) - catch - throw:conflict -> - %% ignore, we'll retry next time - ok - end - end - end, - ok. - --spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. -ensure_cluster_rep_ddoc_exists(RepDb) -> - DDocId = ?REP_DESIGN_DOC, - [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), - ensure_rep_ddoc_exists(DbShard, DDocId). - --spec compare_ejson({[_]}, {[_]}) -> boolean(). -compare_ejson(EJson1, EJson2) -> - EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), - EjsonSorted2 = couch_replicator_filters:ejsort(EJson2), - EjsonSorted1 == EjsonSorted2. - --spec replication_design_doc_props(binary()) -> [_]. -replication_design_doc_props(DDocId) -> - [ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]. % Note: parse_rep_doc can handle filtered replications. During parsing of the % replication doc it will make possibly remote http requests to the source @@ -304,88 +141,6 @@ update_rep_id(Rep) -> RepId = couch_replicator_ids:replication_id(Rep), Rep#rep{id = RepId}. -update_rep_doc(RepDbName, RepDocId, KVs) -> - update_rep_doc(RepDbName, RepDocId, KVs, 1). - -update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) -> - try - case open_rep_doc(RepDbName, RepDocId) of - {ok, LastRepDoc} -> - update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2); - _ -> - ok - end - catch - throw:conflict -> - Msg = "Conflict when updating replication doc `~s`. Retrying.", - couch_log:error(Msg, [RepDocId]), - ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100), - update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2) - end; -update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> - NewRepDocBody = lists:foldl( - fun - ({K, undefined}, Body) -> - lists:keydelete(K, 1, Body); - ({<<"_replication_state">> = K, State} = KV, Body) -> - case get_json_value(K, Body) of - State -> - Body; - _ -> - Body1 = lists:keystore(K, 1, Body, KV), - Timestamp = couch_replicator_utils:iso8601(os:timestamp()), - lists:keystore( - <<"_replication_state_time">>, - 1, - Body1, - {<<"_replication_state_time">>, Timestamp} - ) - end; - ({K, _V} = KV, Body) -> - lists:keystore(K, 1, Body, KV) - end, - RepDocBody, - KVs - ), - case NewRepDocBody of - RepDocBody -> - ok; - _ -> - % Might not succeed - when the replication doc is deleted right - % before this update (not an error, ignore). - save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}) - end. - -open_rep_doc(DbName, DocId) -> - ioq:maybe_set_io_priority({system, DbName}), - case couch_db:open_int(DbName, [?CTX, sys_db]) of - {ok, Db} -> - try - couch_db:open_doc(Db, DocId, [ejson_body]) - after - couch_db:close(Db) - end; - Else -> - Else - end. - -save_rep_doc(DbName, Doc) -> - ioq:maybe_set_io_priority({system, DbName}), - {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), - try - couch_db:update_doc(Db, Doc, []) - catch - % User can accidentally write a VDU which prevents _replicator from - % updating replication documents. Avoid crashing replicator and thus - % preventing all other replication jobs on the node from running. - throw:{forbidden, Reason} -> - Msg = "~p VDU function preventing doc update to ~s ~s ~p", - couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]), - {ok, forbidden} - after - couch_db:close(Db) - end. - -spec rep_user_ctx({[_]}) -> #user_ctx{}. rep_user_ctx({RepDoc}) -> case get_json_value(<<"user_ctx">>, RepDoc) of @@ -666,104 +421,20 @@ ssl_verify_options(true) -> ssl_verify_options(false) -> [{verify, verify_none}]. --spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}. -before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) -> - Doc; -before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> - #user_ctx{ - roles = Roles, - name = Name - } = couch_db:get_user_ctx(Db), - case lists:member(<<"_replicator">>, Roles) of - true -> - Doc; - false -> - case couch_util:get_value(?OWNER, Body) of - undefined -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - Name -> - Doc; - Other -> - case (catch couch_db:check_is_admin(Db)) of - ok when Other =:= null -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - ok -> - Doc; - _ -> - throw( - {forbidden, - <<"Can't update replication documents", " from other users.">>} - ) - end - end - end. +get_value(Key, Props) -> + couch_util:get_value(Key, Props). --spec after_doc_read(#doc{}, Db :: any()) -> #doc{}. -after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> - Doc; -after_doc_read(#doc{body = {Body}} = Doc, Db) -> - #user_ctx{name = Name} = couch_db:get_user_ctx(Db), - case (catch couch_db:check_is_admin(Db)) of - ok -> - Doc; - _ -> - case couch_util:get_value(?OWNER, Body) of - Name -> - Doc; - _Other -> - Source = strip_credentials( - couch_util:get_value( - <<"source">>, - Body - ) - ), - Target = strip_credentials( - couch_util:get_value( - <<"target">>, - Body - ) - ), - NewBody0 = ?replace(Body, <<"source">>, Source), - NewBody = ?replace(NewBody0, <<"target">>, Target), - #doc{revs = {Pos, [_ | Revs]}} = Doc, - NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}}, - NewRevId = couch_db:new_revid(NewDoc), - NewDoc#doc{revs = {Pos, [NewRevId | Revs]}} - end - end. +get_value(Key, Props, Default) -> + couch_util:get_value(Key, Props, Default). --spec strip_credentials - (undefined) -> undefined; - (binary()) -> binary(); - ({[_]}) -> {[_]}. -strip_credentials(undefined) -> - undefined; -strip_credentials(Url) when is_binary(Url) -> - re:replace( - Url, - "http(s)?://(?:[^:]+):[^@]+@(.*)$", - "http\\1://\\2", - [{return, binary}] - ); -strip_credentials({Props0}) -> - Props1 = lists:keydelete(<<"headers">>, 1, Props0), - % Strip "auth" just like headers, for replication plugins it can be a place - % to stash credential that are not necessarily in headers - Props2 = lists:keydelete(<<"auth">>, 1, Props1), - {Props2}. - -error_reason({shutdown, Error}) -> - error_reason(Error); -error_reason({bad_rep_doc, Reason}) -> - to_binary(Reason); -error_reason({error, {Error, Reason}}) when - is_atom(Error), is_binary(Reason) --> - to_binary(io_lib:format("~s: ~s", [Error, Reason])); -error_reason({error, Reason}) -> - to_binary(Reason); -error_reason(Reason) -> - to_binary(Reason). +to_binary(Val) -> + couch_util:to_binary(Val). + +get_json_value(Key, Obj) -> + couch_replicator_utils:get_json_value(Key, Obj). + +get_json_value(Key, Obj, Default) -> + couch_replicator_util:get_json_value(Key, Obj, Default). -ifdef(TEST). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 1ba933a5e..06d3052be 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -1149,8 +1149,8 @@ scheduler_job_format_status_test() -> Target = <<"http://u:p@h2/d2">>, Rep = #rep{ id = {"base", "+ext"}, - source = couch_replicator_docs:parse_rep_db(Source, [], []), - target = couch_replicator_docs:parse_rep_db(Target, [], []), + source = couch_replicator_parse:parse_rep_db(Source, [], []), + target = couch_replicator_parse:parse_rep_db(Target, [], []), options = [{create_target, true}], doc_id = <<"mydoc">>, db_name = <<"mydb">> diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index e4a2cd12f..36700d9d5 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -13,7 +13,6 @@ -module(couch_replicator_utils). -export([ - parse_rep_doc/2, replication_id/2, sum_stats/2, is_deleted/1, @@ -95,9 +94,6 @@ replication_id(Rep, Version) -> sum_stats(S1, S2) -> couch_replicator_stats:sum_stats(S1, S2). -parse_rep_doc(Props, UserCtx) -> - couch_replicator_docs:parse_rep_doc(Props, UserCtx). - -spec iso8601(erlang:timestamp()) -> binary(). iso8601({_Mega, _Sec, _Micro} = Timestamp) -> {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp), @@ -351,7 +347,7 @@ normalize_rep_test_() -> {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]}, {<<"other_field">>, <<"some_value">>} ]}, - Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1), + Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1), EJson2 = {[ {<<"other_field">>, <<"unrelated">>}, @@ -360,7 +356,7 @@ normalize_rep_test_() -> {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]}, {<<"other_field2">>, <<"unrelated2">>} ]}, - Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2), + Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2), ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2)) end) }. diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl index 2d5ef96b1..df8074f1f 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl @@ -304,7 +304,7 @@ replicate(Source, Target) -> {<<"target">>, db_url(Target)}, {<<"continuous">>, true} ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), Pid = couch_replicator_test_helper:get_pid(Rep#rep.id), diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index 6bdb4ecb2..7ba6bc69d 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -226,7 +226,7 @@ replicate(Source, Target) -> % Low connection timeout so _changes feed gets restarted quicker {<<"connection_timeout">>, 3000} ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), {ok, Rep#rep.id}. diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl index 3468cda73..758c44f2b 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl @@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) -> {<<"source">>, <<"http://unproxied.com">>}, {<<"target">>, <<"http://otherunproxied.com">>} ]}, - Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc), + Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc), ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined), ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined). @@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) -> {<<"target">>, <<"http://otherunproxied.com">>}, {<<"proxy">>, ProxyURL} ]}, - Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc), + Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc), ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)), ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)). @@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) -> {<<"source_proxy">>, SrcProxyURL}, {<<"target_proxy">>, TgtProxyURL} ]}, - Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc), + Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc), ?assertEqual( (Rep#rep.source)#httpdb.proxy_url, binary_to_list(SrcProxyURL) @@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) -> ]}, ?assertThrow( {bad_rep_doc, _}, - couch_replicator_docs:parse_rep_doc(ProxyDoc) + couch_replicator_parse:parse_rep_doc(ProxyDoc) ). mutually_exclusive_proxy_and_target_proxy(_) -> @@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) -> ]}, ?assertThrow( {bad_rep_doc, _}, - couch_replicator_docs:parse_rep_doc(ProxyDoc) + couch_replicator_parse:parse_rep_doc(ProxyDoc) ). diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 04c665af5..f413e5cf4 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -214,7 +214,7 @@ replicate(Source, Target) -> {<<"target">>, db_url(Target)}, {<<"continuous">>, true} ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), Pid = couch_replicator_test_helper:get_pid(Rep#rep.id), diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl index f30bdb1cd..f862527f4 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl @@ -173,7 +173,7 @@ replicate(Source, Target) -> ). replicate({[_ | _]} = RepObject) -> - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), Pid = get_pid(Rep#rep.id),
