Author: fdmanana
Date: Tue Nov 30 12:21:08 2010
New Revision: 1040486

URL: http://svn.apache.org/viewvc?rev=1040486&view=rev
Log:
Make replication by-doc-IDs use the builtin filter _doc_ids (reduces code size 
and allows for continuous by-doc-IDs replication).

Modified:
    couchdb/branches/new_replicator/share/www/script/test/new_replication.js
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl

Modified: 
couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js 
(original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js 
Tue Nov 30 12:21:08 2010
@@ -732,12 +732,16 @@ couchTests.new_replication = function(de
       );
 
       total = doc_ids.length - num_inexistent_docs;
-      T(repResult.ok === true);
-      T(typeof repResult.start_time === "string");
-      T(typeof repResult.end_time === "string");
-      T(repResult.docs_read === total);
-      T(repResult.docs_written === total);
-      T(repResult.doc_write_failures === 0);
+      T(true, repResult.ok);
+      if (total === 0) {
+        TEquals(true, repResult.no_changes);
+      } else {
+        TEquals('string', typeof repResult.start_time);
+        TEquals('string', typeof repResult.end_time);
+        TEquals(total, repResult.docs_read);
+        TEquals(total, repResult.docs_written);
+        TEquals(0, repResult.doc_write_failures);
+      }
 
       for (k = 0; k < doc_ids.length; k++) {
         id = decodeURIComponent(doc_ids[k]);
@@ -793,12 +797,16 @@ couchTests.new_replication = function(de
       );
 
       after_total = after_doc_ids.length - after_num_inexistent_docs;
-      T(repResult.ok === true);
-      T(typeof repResult.start_time === "string");
-      T(typeof repResult.end_time === "string");
-      T(repResult.docs_read === after_total);
-      T(repResult.docs_written === after_total);
-      T(repResult.doc_write_failures === 0);
+      TEquals(true, repResult.ok);
+      if (after_total === 0) {
+        TEquals(true, repResult.no_changes);
+      } else {
+        TEquals('string', typeof repResult.start_time);
+        TEquals('string', typeof repResult.end_time);
+        TEquals(after_total, repResult.docs_read);
+        TEquals(after_total, repResult.docs_written);
+        TEquals(0, repResult.doc_write_failures);
+      }
 
       for (k = 0; k < after_doc_ids.length; k++) {
         id = after_doc_ids[k];

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Nov 30 
12:21:08 2010
@@ -288,15 +288,26 @@ update_docs(Db, DocList, Options, Update
     {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
 
 
-changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) ->
-    QArgs = changes_q_args(
-        [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
-        Options),
+changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
+    UserFun, Options) ->
+    BaseQArgs = [
+        {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)}
+    ],
+    {QArgs, Method, Body, Headers} = case get_value(doc_ids, Options) of
+    undefined ->
+        QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
+        {QArgs1, get, [], Headers1};
+    DocIds ->
+        Headers2 = [{"Content-Type", "application/json"} | Headers1],
+        JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
+        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
+    end,
     send_req(
         % Shouldn't be infinity, but somehow if it's not, issues arise
         % frequently with ibrowse.
         HttpDb#httpdb{timeout = infinity},
-        [{path, "_changes"}, {qs, QArgs}, {direct, true},
+        [{method, Method}, {path, "_changes"}, {qs, QArgs}, {direct, true},
+            {headers, Headers}, {body, Body},
             {ibrowse_options, [{stream_to, {self(), once}}]}],
         fun(200, _, DataStreamFun) ->
             case couch_util:get_value(continuous, Options, false) of
@@ -310,10 +321,16 @@ changes_since(#httpdb{} = HttpDb, Style,
             end
         end);
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
+    Filter = case get_value(doc_ids, Options) of
+    undefined ->
+        ?b2l(get_value(filter, Options, <<>>));
+    _DocIds ->
+        "_doc_ids"
+    end,
     Args = #changes_args{
         style = Style,
         since = StartSeq,
-        filter = ?b2l(get_value(filter, Options, <<>>)),
+        filter = Filter,
         feed = case get_value(continuous, Options, false) of
             true ->
                 "continuous";
@@ -323,7 +340,7 @@ changes_since(Db, Style, StartSeq, UserF
         timeout = infinity
     },
     QueryParams = get_value(query_params, Options, {[]}),
-    Req = changes_json_req(Db, Args#changes_args.filter, QueryParams),
+    Req = changes_json_req(Db, Filter, QueryParams, Options),
     ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
     ChangesFeedFun(fun({change, Change, _}, _) ->
             UserFun(json_to_doc_info(Change));
@@ -334,7 +351,7 @@ changes_since(Db, Style, StartSeq, UserF
 
 % internal functions
 
-changes_q_args(BaseQS, Options) ->
+maybe_add_changes_filter_q_args(BaseQS, Options) ->
     case get_value(filter, Options) of
     undefined ->
         BaseQS;
@@ -359,9 +376,11 @@ changes_q_args(BaseQS, Options) ->
         [{"feed", "continuous"}, {"heartbeat", "10000"}]
     end.
 
-changes_json_req(_Db, "", _QueryParams) ->
+changes_json_req(_Db, "", _QueryParams, _Options) ->
     {[]};
-changes_json_req(Db, FilterName, {QueryParams}) ->
+changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
+    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
     {ok, Info} = couch_db:get_db_info(Db),
     % simulate a request to db_name/_changes
     {[

Modified: couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_changes.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_changes.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_changes.erl Tue Nov 30 
12:21:08 2010
@@ -123,6 +123,8 @@ os_filter_fun(FilterName, Style, Req, Db
             "filter parameter must be of the form `designname/filtername`"})
     end.
 
+builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
+    filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style);
 builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
     {Props} = couch_httpd:json_body_obj(Req),
     DocIds =  couch_util:get_value(<<"doc_ids">>, Props, nil),

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Tue Nov 30 
12:21:08 2010
@@ -74,12 +74,11 @@ replicate(#rep{id = RepId, options = Opt
 
 
 do_replication_loop(#rep{options = Options, source = Src} = Rep) ->
-    DocIds = get_value(doc_ids, Options),
     Continuous = get_value(continuous, Options, false),
-    Seq = case {DocIds, Continuous} of
-    {undefined, false} ->
+    Seq = case Continuous of
+    false ->
         last_seq(Src, Rep#rep.user_ctx);
-    _ ->
+    true ->
         undefined
     end,
     do_replication_loop(Rep, Seq).
@@ -209,7 +208,7 @@ do_init(#rep{options = Options} = Rep) -
         start_seq = StartSeq
     } = State = init_state(Rep),
 
-    {RevFindersCount, CopiersCount} = case ?l2i(
+    {RevFindersCount, CopiersCount} = case list_to_integer(
         couch_config:get("replicator", "worker_processes", "10")) of
     Small when Small < 2 ->
         ?LOG_ERROR("The number of worker processes for the replicator "
@@ -221,34 +220,20 @@ do_init(#rep{options = Options} = Rep) -
     {ok, MissingRevsQueue} = couch_work_queue:new([
         {multi_workers, true}, {max_items, trunc(CopiersCount * 1.5)}
     ]),
-
-    case get_value(doc_ids, Options) of
-    undefined ->
-        {ok, ChangesQueue} = couch_work_queue:new([
-            {multi_workers, true},
-            {max_items, trunc(RevFindersCount * 1.5) * ?REV_BATCH_SIZE}
-        ]),
-
-        % This starts the _changes reader process. It adds the changes from
-        % the source db to the ChangesQueue.
-        ChangesReader = spawn_changes_reader(
-            StartSeq, Source, ChangesQueue, Options),
-
-        % This starts the missing rev finders. They check the target for 
changes
-        % in the ChangesQueue to see if they exist on the target or not. If 
not,
-        % adds them to MissingRevsQueue.
-        MissingRevFinders =
-            couch_replicator_rev_finders:spawn_missing_rev_finders(
-                self(), Target, ChangesQueue, MissingRevsQueue,
-                RevFindersCount, ?REV_BATCH_SIZE);
-    DocIds ->
-        ChangesQueue = nil,
-        ChangesReader = nil,
-        MissingRevFinders =
-            couch_replicator_rev_finders:spawn_missing_rev_finders(self(),
-                Target, DocIds, MissingRevsQueue, RevFindersCount, 
?REV_BATCH_SIZE)
-    end,
-
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {multi_workers, true},
+        {max_items, trunc(RevFindersCount * 1.5) * ?REV_BATCH_SIZE}
+    ]),
+    % This starts the _changes reader process. It adds the changes from
+    % the source db to the ChangesQueue.
+    ChangesReader = spawn_changes_reader(
+        StartSeq, Source, ChangesQueue, Options),
+    % This starts the missing rev finders. They check the target for changes
+    % in the ChangesQueue to see if they exist on the target or not. If not,
+    % adds them to MissingRevsQueue.
+    MissingRevFinders = couch_replicator_rev_finders:spawn_missing_rev_finders(
+        self(), Target, ChangesQueue, MissingRevsQueue,
+        RevFindersCount, ?REV_BATCH_SIZE),
     % This starts the doc copy processes. They fetch documents from the
     % MissingRevsQueue and copy them from the source to the target database.
     DocCopiers = couch_replicator_doc_copiers:spawn_doc_copiers(
@@ -373,9 +358,10 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+terminate(normal, #rep_state{rep_details = #rep{id = RepId},
+    checkpoint_history = CheckpointHistory} = State) ->
     terminate_cleanup(State),
-    couch_replication_notifier:notify({finished, RepId, get_result(State)});
+    couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
 
 terminate(shutdown, State) ->
     % cancelled replication throught ?MODULE:end_replication/1
@@ -410,18 +396,13 @@ do_last_checkpoint(State) ->
     cancel_timer(State2).
 
 
-start_timer(#rep_state{rep_details = #rep{options = Options}} = State) ->
-    case get_value(doc_ids, Options) of
-    undefined ->
-        After = checkpoint_interval(State),
-        case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) 
of
-        {ok, Ref} ->
-            Ref;
-        Error ->
-            ?LOG_ERROR("Replicator, error scheduling checkpoint:  ~p", 
[Error]),
-            nil
-        end;
-    _DocIdList ->
+start_timer(State) ->
+    After = checkpoint_interval(State),
+    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+    {ok, Ref} ->
+        Ref;
+    Error ->
+        ?LOG_ERROR("Replicator, error scheduling checkpoint:  ~p", [Error]),
         nil
     end.
 
@@ -433,21 +414,6 @@ cancel_timer(#rep_state{timer = Timer} =
     State#rep_state{timer = nil}.
 
 
-get_result(#rep_state{stats = Stats, rep_details = Rep} = State) ->
-    case get_value(doc_ids, Rep#rep.options) of
-    undefined ->
-        State#rep_state.checkpoint_history;
-    _DocIdList ->
-        {[
-            {<<"start_time">>, ?l2b(State#rep_state.rep_starttime)},
-            {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
-            {<<"docs_read">>, Stats#rep_stats.docs_read},
-            {<<"docs_written">>, Stats#rep_stats.docs_written},
-            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
-        ]}
-    end.
-
-
 init_state(Rep) ->
     #rep{
         id = {BaseId, _Ext},
@@ -522,17 +488,20 @@ do_checkpoint(State) ->
         rep_starttime = ReplicationStartTime,
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
-        stats = Stats
+        stats = Stats,
+        rep_details = #rep{options = Options}
     } = State,
     case commit_to_both(Source, Target) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
         ?LOG_INFO("recording a checkpoint for ~p -> ~p at source update_seq 
~p",
             [SourceName, TargetName, NewSeq]),
         SessionId = couch_uuids:random(),
+        StartTime = ?l2b(ReplicationStartTime),
+        EndTime = ?l2b(httpd_util:rfc1123_date()),
         NewHistoryEntry = {[
             {<<"session_id">>, SessionId},
-            {<<"start_time">>, list_to_binary(ReplicationStartTime)},
-            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+            {<<"start_time">>, StartTime},
+            {<<"end_time">>, EndTime},
             {<<"start_last_seq">>, StartSeq},
             {<<"end_last_seq">>, NewSeq},
             {<<"recorded_seq">>, NewSeq},
@@ -542,12 +511,29 @@ do_checkpoint(State) ->
             {<<"docs_written">>, Stats#rep_stats.docs_written},
             {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
         ]},
-        % limit history to 50 entries
-        NewRepHistory = {[
+        BaseHistory = [
             {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
-        ]},
+            {<<"source_last_seq">>, NewSeq}
+        ] ++ case get_value(doc_ids, Options) of
+        undefined ->
+            [];
+        _DocIds ->
+            % backwards compatibility with the result of a replication by
+            % doc IDs in versions 0.11.x and 1.0.x
+            % TODO: deprecate (use same history format, simplify code)
+            [
+                {<<"start_time">>, StartTime},
+                {<<"end_time">>, EndTime},
+                {<<"docs_read">>, Stats#rep_stats.docs_read},
+                {<<"docs_written">>, Stats#rep_stats.docs_written},
+                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+            ]
+        end,
+        % limit history to 50 entries
+        NewRepHistory = {
+            BaseHistory ++
+            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 
50)}]
+        },
 
         try
             {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source,

Modified: 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl 
(original)
+++ 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl 
Tue Nov 30 12:21:08 2010
@@ -133,10 +133,10 @@ handle_info({'EXIT', Pid, normal}, #stat
     } = State,
     {stop, normal, State};
 
-handle_info({'EXIT', Pid, normal}, #state{writer = Pid, cp = Cp} = State) ->
-    Stats = State#state.stats,
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid, cp = Cp,
+    stats = Stats, report_seq = ReportSeq} = State) ->
     ok = gen_server:cast(Cp, {add_stats, Stats}),
-    seq_done(Cp, State#state.report_seq),
+    ok = gen_server:cast(Cp, {seq_done, ReportSeq}),
     gen_server:reply(State#state.pending_flush, ok),
     NewState = State#state{
         report_seq = nil,
@@ -200,17 +200,6 @@ queue_fetch_loop(Parent, MissingRevsQueu
     case couch_work_queue:dequeue(MissingRevsQueue, 1) of
     closed ->
         ok;
-
-    {ok, [{doc_id, _} | _] = DocIdList} ->
-        lists:foreach(
-            fun({doc_id, Id}) ->
-                ok = gen_server:call(
-                    Parent, {fetch_doc, {Id, all, [], 0}}, infinity)
-            end,
-            DocIdList),
-        ok = gen_server:call(Parent, {flush, nil}, infinity),
-        queue_fetch_loop(Parent, MissingRevsQueue);
-
     {ok, [{ReportSeq, IdRevList}]} ->
         lists:foreach(
             fun({Id, Revs, PAs, Seq}) ->
@@ -315,9 +304,3 @@ flush_docs(Target, Doc) ->
     _ ->
         {0, 1}
     end.
-
-
-seq_done(_Cp, nil) ->
-    ok;
-seq_done(Cp, SeqDone) ->
-    ok = gen_server:cast(Cp, {seq_done, SeqDone}).

Modified: 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl 
(original)
+++ 
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl 
Tue Nov 30 12:21:08 2010
@@ -18,18 +18,6 @@
 
 
 
-spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _, _)
-    when is_list(DocIds) ->
-    lists:foreach(
-        fun(DocId) ->
-            % Ensure same behaviour as old replicator: accept a list of percent
-            % encoded doc IDs.
-            Id = ?l2b(couch_httpd:unquote(DocId)),
-            ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, Id})
-        end, DocIds),
-    couch_work_queue:close(MissingRevsQueue),
-    [];
-
 spawn_missing_rev_finders(StatsProcess,
         Target, ChangesQueue, MissingRevsQueue, RevFindersCount, BatchSize) ->
     lists:map(

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl
URL: 
http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl?rev=1040486&r1=1040485&r2=1040486&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl 
(original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl Tue 
Nov 30 12:21:08 2010
@@ -162,7 +162,10 @@ convert_options([{<<"filter">>, V} | R])
 convert_options([{<<"query_params">>, V} | R]) ->
     [{query_params, V} | convert_options(R)];
 convert_options([{<<"doc_ids">>, V} | R]) ->
-    [{doc_ids, V} | convert_options(R)];
+    % Ensure same behaviour as old replicator: accept a list of percent
+    % encoded doc IDs.
+    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    [{doc_ids, DocIds} | convert_options(R)];
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 


Reply via email to