Author: fdmanana Date: Tue Nov 30 12:21:08 2010 New Revision: 1040486 URL: http://svn.apache.org/viewvc?rev=1040486&view=rev Log: Make replication by-doc-IDs use the builtin filter _doc_ids (reduces code size and allows for continuous by-doc-IDs replication).
Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl couchdb/branches/new_replicator/src/couchdb/couch_changes.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original) +++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Tue Nov 30 12:21:08 2010 @@ -732,12 +732,16 @@ couchTests.new_replication = function(de ); total = doc_ids.length - num_inexistent_docs; - T(repResult.ok === true); - T(typeof repResult.start_time === "string"); - T(typeof repResult.end_time === "string"); - T(repResult.docs_read === total); - T(repResult.docs_written === total); - T(repResult.doc_write_failures === 0); + T(true, repResult.ok); + if (total === 0) { + TEquals(true, repResult.no_changes); + } else { + TEquals('string', typeof repResult.start_time); + TEquals('string', typeof repResult.end_time); + TEquals(total, repResult.docs_read); + TEquals(total, repResult.docs_written); + TEquals(0, repResult.doc_write_failures); + } for (k = 0; k < doc_ids.length; k++) { id = decodeURIComponent(doc_ids[k]); @@ -793,12 +797,16 @@ couchTests.new_replication = function(de ); after_total = after_doc_ids.length - after_num_inexistent_docs; - T(repResult.ok === true); - T(typeof repResult.start_time === "string"); - T(typeof repResult.end_time === "string"); - T(repResult.docs_read === after_total); - T(repResult.docs_written === after_total); - T(repResult.doc_write_failures === 0); + TEquals(true, repResult.ok); + if (after_total === 0) { + TEquals(true, repResult.no_changes); + } else { + TEquals('string', typeof repResult.start_time); + TEquals('string', typeof repResult.end_time); + TEquals(after_total, repResult.docs_read); + TEquals(after_total, repResult.docs_written); + TEquals(0, repResult.doc_write_failures); + } for (k = 0; k < after_doc_ids.length; k++) { id = after_doc_ids[k]; Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Nov 30 12:21:08 2010 @@ -288,15 +288,26 @@ update_docs(Db, DocList, Options, Update {ok, bulk_results_to_errors(DocList, Result, UpdateType)}. -changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) -> - QArgs = changes_q_args( - [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}], - Options), +changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq, + UserFun, Options) -> + BaseQArgs = [ + {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)} + ], + {QArgs, Method, Body, Headers} = case get_value(doc_ids, Options) of + undefined -> + QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options), + {QArgs1, get, [], Headers1}; + DocIds -> + Headers2 = [{"Content-Type", "application/json"} | Headers1], + JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}), + {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2} + end, send_req( % Shouldn't be infinity, but somehow if it's not, issues arise % frequently with ibrowse. HttpDb#httpdb{timeout = infinity}, - [{path, "_changes"}, {qs, QArgs}, {direct, true}, + [{method, Method}, {path, "_changes"}, {qs, QArgs}, {direct, true}, + {headers, Headers}, {body, Body}, {ibrowse_options, [{stream_to, {self(), once}}]}], fun(200, _, DataStreamFun) -> case couch_util:get_value(continuous, Options, false) of @@ -310,10 +321,16 @@ changes_since(#httpdb{} = HttpDb, Style, end end); changes_since(Db, Style, StartSeq, UserFun, Options) -> + Filter = case get_value(doc_ids, Options) of + undefined -> + ?b2l(get_value(filter, Options, <<>>)); + _DocIds -> + "_doc_ids" + end, Args = #changes_args{ style = Style, since = StartSeq, - filter = ?b2l(get_value(filter, Options, <<>>)), + filter = Filter, feed = case get_value(continuous, Options, false) of true -> "continuous"; @@ -323,7 +340,7 @@ changes_since(Db, Style, StartSeq, UserF timeout = infinity }, QueryParams = get_value(query_params, Options, {[]}), - Req = changes_json_req(Db, Args#changes_args.filter, QueryParams), + Req = changes_json_req(Db, Filter, QueryParams, Options), ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db), ChangesFeedFun(fun({change, Change, _}, _) -> UserFun(json_to_doc_info(Change)); @@ -334,7 +351,7 @@ changes_since(Db, Style, StartSeq, UserF % internal functions -changes_q_args(BaseQS, Options) -> +maybe_add_changes_filter_q_args(BaseQS, Options) -> case get_value(filter, Options) of undefined -> BaseQS; @@ -359,9 +376,11 @@ changes_q_args(BaseQS, Options) -> [{"feed", "continuous"}, {"heartbeat", "10000"}] end. -changes_json_req(_Db, "", _QueryParams) -> +changes_json_req(_Db, "", _QueryParams, _Options) -> {[]}; -changes_json_req(Db, FilterName, {QueryParams}) -> +changes_json_req(_Db, "_doc_ids", _QueryParams, Options) -> + {[{<<"doc_ids">>, get_value(doc_ids, Options)}]}; +changes_json_req(Db, FilterName, {QueryParams}, _Options) -> {ok, Info} = couch_db:get_db_info(Db), % simulate a request to db_name/_changes {[ Modified: couchdb/branches/new_replicator/src/couchdb/couch_changes.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_changes.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_changes.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_changes.erl Tue Nov 30 12:21:08 2010 @@ -123,6 +123,8 @@ os_filter_fun(FilterName, Style, Req, Db "filter parameter must be of the form `designname/filtername`"}) end. +builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> + filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style); builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) -> {Props} = couch_httpd:json_body_obj(Req), DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil), Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Tue Nov 30 12:21:08 2010 @@ -74,12 +74,11 @@ replicate(#rep{id = RepId, options = Opt do_replication_loop(#rep{options = Options, source = Src} = Rep) -> - DocIds = get_value(doc_ids, Options), Continuous = get_value(continuous, Options, false), - Seq = case {DocIds, Continuous} of - {undefined, false} -> + Seq = case Continuous of + false -> last_seq(Src, Rep#rep.user_ctx); - _ -> + true -> undefined end, do_replication_loop(Rep, Seq). @@ -209,7 +208,7 @@ do_init(#rep{options = Options} = Rep) - start_seq = StartSeq } = State = init_state(Rep), - {RevFindersCount, CopiersCount} = case ?l2i( + {RevFindersCount, CopiersCount} = case list_to_integer( couch_config:get("replicator", "worker_processes", "10")) of Small when Small < 2 -> ?LOG_ERROR("The number of worker processes for the replicator " @@ -221,34 +220,20 @@ do_init(#rep{options = Options} = Rep) - {ok, MissingRevsQueue} = couch_work_queue:new([ {multi_workers, true}, {max_items, trunc(CopiersCount * 1.5)} ]), - - case get_value(doc_ids, Options) of - undefined -> - {ok, ChangesQueue} = couch_work_queue:new([ - {multi_workers, true}, - {max_items, trunc(RevFindersCount * 1.5) * ?REV_BATCH_SIZE} - ]), - - % This starts the _changes reader process. It adds the changes from - % the source db to the ChangesQueue. - ChangesReader = spawn_changes_reader( - StartSeq, Source, ChangesQueue, Options), - - % This starts the missing rev finders. They check the target for changes - % in the ChangesQueue to see if they exist on the target or not. If not, - % adds them to MissingRevsQueue. - MissingRevFinders = - couch_replicator_rev_finders:spawn_missing_rev_finders( - self(), Target, ChangesQueue, MissingRevsQueue, - RevFindersCount, ?REV_BATCH_SIZE); - DocIds -> - ChangesQueue = nil, - ChangesReader = nil, - MissingRevFinders = - couch_replicator_rev_finders:spawn_missing_rev_finders(self(), - Target, DocIds, MissingRevsQueue, RevFindersCount, ?REV_BATCH_SIZE) - end, - + {ok, ChangesQueue} = couch_work_queue:new([ + {multi_workers, true}, + {max_items, trunc(RevFindersCount * 1.5) * ?REV_BATCH_SIZE} + ]), + % This starts the _changes reader process. It adds the changes from + % the source db to the ChangesQueue. + ChangesReader = spawn_changes_reader( + StartSeq, Source, ChangesQueue, Options), + % This starts the missing rev finders. They check the target for changes + % in the ChangesQueue to see if they exist on the target or not. If not, + % adds them to MissingRevsQueue. + MissingRevFinders = couch_replicator_rev_finders:spawn_missing_rev_finders( + self(), Target, ChangesQueue, MissingRevsQueue, + RevFindersCount, ?REV_BATCH_SIZE), % This starts the doc copy processes. They fetch documents from the % MissingRevsQueue and copy them from the source to the target database. DocCopiers = couch_replicator_doc_copiers:spawn_doc_copiers( @@ -373,9 +358,10 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(normal, #rep_state{rep_details = #rep{id = RepId}} = State) -> +terminate(normal, #rep_state{rep_details = #rep{id = RepId}, + checkpoint_history = CheckpointHistory} = State) -> terminate_cleanup(State), - couch_replication_notifier:notify({finished, RepId, get_result(State)}); + couch_replication_notifier:notify({finished, RepId, CheckpointHistory}); terminate(shutdown, State) -> % cancelled replication throught ?MODULE:end_replication/1 @@ -410,18 +396,13 @@ do_last_checkpoint(State) -> cancel_timer(State2). -start_timer(#rep_state{rep_details = #rep{options = Options}} = State) -> - case get_value(doc_ids, Options) of - undefined -> - After = checkpoint_interval(State), - case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of - {ok, Ref} -> - Ref; - Error -> - ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]), - nil - end; - _DocIdList -> +start_timer(State) -> + After = checkpoint_interval(State), + case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of + {ok, Ref} -> + Ref; + Error -> + ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]), nil end. @@ -433,21 +414,6 @@ cancel_timer(#rep_state{timer = Timer} = State#rep_state{timer = nil}. -get_result(#rep_state{stats = Stats, rep_details = Rep} = State) -> - case get_value(doc_ids, Rep#rep.options) of - undefined -> - State#rep_state.checkpoint_history; - _DocIdList -> - {[ - {<<"start_time">>, ?l2b(State#rep_state.rep_starttime)}, - {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, - {<<"docs_read">>, Stats#rep_stats.docs_read}, - {<<"docs_written">>, Stats#rep_stats.docs_written}, - {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures} - ]} - end. - - init_state(Rep) -> #rep{ id = {BaseId, _Ext}, @@ -522,17 +488,20 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, - stats = Stats + stats = Stats, + rep_details = #rep{options = Options} } = State, case commit_to_both(Source, Target) of {SrcInstanceStartTime, TgtInstanceStartTime} -> ?LOG_INFO("recording a checkpoint for ~p -> ~p at source update_seq ~p", [SourceName, TargetName, NewSeq]), SessionId = couch_uuids:random(), + StartTime = ?l2b(ReplicationStartTime), + EndTime = ?l2b(httpd_util:rfc1123_date()), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, - {<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, {<<"start_last_seq">>, StartSeq}, {<<"end_last_seq">>, NewSeq}, {<<"recorded_seq">>, NewSeq}, @@ -542,12 +511,29 @@ do_checkpoint(State) -> {<<"docs_written">>, Stats#rep_stats.docs_written}, {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures} ]}, - % limit history to 50 entries - NewRepHistory = {[ + BaseHistory = [ {<<"session_id">>, SessionId}, - {<<"source_last_seq">>, NewSeq}, - {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} - ]}, + {<<"source_last_seq">>, NewSeq} + ] ++ case get_value(doc_ids, Options) of + undefined -> + []; + _DocIds -> + % backwards compatibility with the result of a replication by + % doc IDs in versions 0.11.x and 1.0.x + % TODO: deprecate (use same history format, simplify code) + [ + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"docs_read">>, Stats#rep_stats.docs_read}, + {<<"docs_written">>, Stats#rep_stats.docs_written}, + {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures} + ] + end, + % limit history to 50 entries + NewRepHistory = { + BaseHistory ++ + [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] + }, try {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Tue Nov 30 12:21:08 2010 @@ -133,10 +133,10 @@ handle_info({'EXIT', Pid, normal}, #stat } = State, {stop, normal, State}; -handle_info({'EXIT', Pid, normal}, #state{writer = Pid, cp = Cp} = State) -> - Stats = State#state.stats, +handle_info({'EXIT', Pid, normal}, #state{writer = Pid, cp = Cp, + stats = Stats, report_seq = ReportSeq} = State) -> ok = gen_server:cast(Cp, {add_stats, Stats}), - seq_done(Cp, State#state.report_seq), + ok = gen_server:cast(Cp, {seq_done, ReportSeq}), gen_server:reply(State#state.pending_flush, ok), NewState = State#state{ report_seq = nil, @@ -200,17 +200,6 @@ queue_fetch_loop(Parent, MissingRevsQueu case couch_work_queue:dequeue(MissingRevsQueue, 1) of closed -> ok; - - {ok, [{doc_id, _} | _] = DocIdList} -> - lists:foreach( - fun({doc_id, Id}) -> - ok = gen_server:call( - Parent, {fetch_doc, {Id, all, [], 0}}, infinity) - end, - DocIdList), - ok = gen_server:call(Parent, {flush, nil}, infinity), - queue_fetch_loop(Parent, MissingRevsQueue); - {ok, [{ReportSeq, IdRevList}]} -> lists:foreach( fun({Id, Revs, PAs, Seq}) -> @@ -315,9 +304,3 @@ flush_docs(Target, Doc) -> _ -> {0, 1} end. - - -seq_done(_Cp, nil) -> - ok; -seq_done(Cp, SeqDone) -> - ok = gen_server:cast(Cp, {seq_done, SeqDone}). Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Tue Nov 30 12:21:08 2010 @@ -18,18 +18,6 @@ -spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _, _) - when is_list(DocIds) -> - lists:foreach( - fun(DocId) -> - % Ensure same behaviour as old replicator: accept a list of percent - % encoded doc IDs. - Id = ?l2b(couch_httpd:unquote(DocId)), - ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, Id}) - end, DocIds), - couch_work_queue:close(MissingRevsQueue), - []; - spawn_missing_rev_finders(StatsProcess, Target, ChangesQueue, MissingRevsQueue, RevFindersCount, BatchSize) -> lists:map( Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl?rev=1040486&r1=1040485&r2=1040486&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl Tue Nov 30 12:21:08 2010 @@ -162,7 +162,10 @@ convert_options([{<<"filter">>, V} | R]) convert_options([{<<"query_params">>, V} | R]) -> [{query_params, V} | convert_options(R)]; convert_options([{<<"doc_ids">>, V} | R]) -> - [{doc_ids, V} | convert_options(R)]; + % Ensure same behaviour as old replicator: accept a list of percent + % encoded doc IDs. + DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V], + [{doc_ids, DocIds} | convert_options(R)]; convert_options([_ | R]) -> % skip unknown option convert_options(R).