Author: fdmanana
Date: Fri Jul 22 00:04:30 2011
New Revision: 1149412

URL: http://svn.apache.org/viewvc?rev=1149412&view=rev
Log:
Refactor/simplify replicator

For each worker (doc copier) process there used to be
a rev finder process. A rev finder dequeues from the
_changes rows queue, finds which document IDs and revisions
are missing in the target and queues the IDs and revisions
of those that are missing into a second queue. Finally
worker processes dequeue up to N elements from this queue.

This model was reduced to have the workers dequeing directly
from the _changes rows queue and find which document IDs and
revisions are missing in the target. This makes the model
simpler without suffering any performance penalty - there's
still enough parallelism to continue to be as efficient as
before.


Added:
    couchdb/trunk/src/couchdb/couch_replicator_worker.erl
Removed:
    couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
    couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
Modified:
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_replicator.erl
    couchdb/trunk/test/etap/001-load.t

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=1149412&r1=1149411&r2=1149412&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Fri Jul 22 00:04:30 2011
@@ -73,8 +73,7 @@ source_files = \
     couch_replication_manager.erl \
     couch_replication_notifier.erl \
     couch_replicator.erl \
-    couch_replicator_doc_copier.erl \
-    couch_replicator_rev_finder.erl \
+    couch_replicator_worker.erl \
     couch_replicator_utils.erl \
     couch_secondary_sup.erl \
     couch_server.erl \
@@ -143,8 +142,7 @@ compiled_files = \
     couch_replication_manager.beam \
     couch_replication_notifier.beam \
     couch_replicator.beam \
-    couch_replicator_doc_copier.beam \
-    couch_replicator_rev_finder.beam \
+    couch_replicator_worker.beam \
     couch_replicator_utils.beam \
     couch_secondary_sup.beam \
     couch_server.beam \

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1149412&r1=1149411&r2=1149412&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul 22 00:04:30 2011
@@ -58,10 +58,9 @@
     src_starttime,
     tgt_starttime,
     timer, % checkpoint timer
-    missing_revs_queue,
     changes_queue,
+    changes_manager,
     changes_reader,
-    missing_rev_finders,
     workers,
     stats = #rep_stats{},
     session_id,
@@ -217,41 +216,28 @@ do_init(#rep{options = Options, id = {Ba
         source_seq = SourceCurSeq
     } = State = init_state(Rep),
 
-    CopiersCount = get_value(worker_processes, Options),
-    RevFindersCount = CopiersCount,
+    NumWorkers = get_value(worker_processes, Options),
     BatchSize = get_value(worker_batch_size, Options),
-    {ok, MissingRevsQueue} = couch_work_queue:new([
-        {multi_workers, true},
-        {max_items, trunc(CopiersCount * 2.0)}
-    ]),
     {ok, ChangesQueue} = couch_work_queue:new([
-        {multi_workers, true},
-        {max_items, trunc(BatchSize * RevFindersCount * 2.0)}
+        {max_items, trunc(BatchSize * NumWorkers * 2.0)}
     ]),
     % This starts the _changes reader process. It adds the changes from
     % the source db to the ChangesQueue.
-    ChangesReader = spawn_changes_reader(
-        StartSeq, Source, ChangesQueue, Options),
-    % This starts the missing rev finders. They check the target for changes
-    % in the ChangesQueue to see if they exist on the target or not. If not,
-    % adds them to MissingRevsQueue.
-    MissingRevFinders = lists:map(
-        fun(_) ->
-            {ok, Pid} = couch_replicator_rev_finder:start_link(
-                self(), Target, ChangesQueue, MissingRevsQueue, BatchSize),
-            Pid
-        end,
-        lists:seq(1, RevFindersCount)),
-    % This starts the doc copy processes. They fetch documents from the
-    % MissingRevsQueue and copy them from the source to the target database.
+    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, 
Options),
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+    % This starts the worker processes. They ask the changes queue manager for 
a
+    % a batch of _changes rows to process -> check which revs are missing in 
the
+    % target, and for the missing ones, it copies them from the source to the 
target.
     MaxConns = get_value(http_connections, Options),
     Workers = lists:map(
         fun(_) ->
-            {ok, Pid} = couch_replicator_doc_copier:start_link(
-                self(), Source, Target, MissingRevsQueue, MaxConns),
+            {ok, Pid} = couch_replicator_worker:start_link(
+                self(), Source, Target, ChangesManager, MaxConns),
             Pid
         end,
-        lists:seq(1, CopiersCount)),
+        lists:seq(1, NumWorkers)),
 
     couch_task_status:add_task(
         "Replication",
@@ -275,7 +261,7 @@ do_init(#rep{options = Options, id = {Ba
         "~c~p HTTP connections~n"
         "~ca connection timeout of ~p milliseconds~n"
         "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, CopiersCount, $\t, BatchSize, $\t,
+        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
             MaxConns, $\t, get_value(connection_timeout, Options),
             $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
             case StartSeq of
@@ -285,16 +271,14 @@ do_init(#rep{options = Options, id = {Ba
                 io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
             end]),
 
-    ?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]),
     ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
 
     couch_replication_manager:replication_started(Rep),
 
     {ok, State#rep_state{
-            missing_revs_queue = MissingRevsQueue,
             changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
             changes_reader = ChangesReader,
-            missing_rev_finders = MissingRevFinders,
             workers = Workers
         }
     }.
@@ -315,12 +299,12 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
     {stop, changes_reader_died, cancel_timer(State)};
 
-handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
-    {noreply, St};
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) 
->
+    {noreply, State};
 
-handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
-    ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
-    {stop, missing_revs_queue_died, cancel_timer(St)};
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) 
->
+    ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, changes_manager_died, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
@@ -329,53 +313,26 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
     {stop, changes_queue_died, cancel_timer(State)};
 
-handle_info({'EXIT', Pid, normal}, State) ->
-    #rep_state{
-        workers = Workers,
-        missing_rev_finders = RevFinders,
-        missing_revs_queue = RevsQueue
-    } = State,
-    case lists:member(Pid, RevFinders) of
-    false ->
-        case lists:member(Pid, Workers) of
-        false ->
-            {stop, {unknown_process_died, Pid, normal}, State};
-        true ->
-            case Workers -- [Pid] of
-            [] ->
-                do_last_checkpoint(State);
-            Workers2 ->
-                {noreply, State#rep_state{workers = Workers2}}
-            end
-        end;
-    true ->
-        case RevFinders -- [Pid] of
-        [] ->
-            couch_work_queue:close(RevsQueue),
-            {noreply, State#rep_state{missing_rev_finders = []}};
-        RevFinders2 ->
-            {noreply, State#rep_state{missing_rev_finders = RevFinders2}}
-        end
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+    Workers ->
+        {stop, {unknown_process_died, Pid, normal}, State};
+    [] ->
+        catch unlink(State#rep_state.changes_manager),
+        catch exit(State#rep_state.changes_manager, kill),
+        do_last_checkpoint(State);
+    Workers2 ->
+        {noreply, State#rep_state{workers = Workers2}}
     end;
 
-handle_info({'EXIT', Pid, Reason}, State) ->
-    #rep_state{
-        workers = Workers,
-        missing_rev_finders = RevFinders
-    } = State,
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
     State2 = cancel_timer(State),
     case lists:member(Pid, Workers) of
     false ->
-        case lists:member(Pid, RevFinders) of
-        false ->
-            {stop, {unknown_process_died, Pid, Reason}, State2};
-        true ->
-            ?LOG_ERROR("RevsFinder ~p died with reason: ~p", [Pid, Reason]),
-            {stop, {revs_finder_died, Pid, Reason}, State2}
-        end;
+        {stop, {unknown_process_died, Pid, Reason}, State2};
     true ->
-        ?LOG_ERROR("DocCopier ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {doc_copier_died, Pid, Reason}, State2}
+        ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
+        {stop, {worker_died, Pid, Reason}, State2}
     end.
 
 
@@ -592,12 +549,10 @@ spawn_changes_reader(StartSeq, #httpdb{}
     spawn_link(fun() ->
         put(last_seq, StartSeq),
         put(retries_left, Db#httpdb.retries),
-        put(row_ts, 1),
         read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
     end);
 spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
     spawn_link(fun() ->
-        put(row_ts, 1),
         read_changes(StartSeq, Db, ChangesQueue, Options)
     end).
 
@@ -605,9 +560,7 @@ read_changes(StartSeq, Db, ChangesQueue,
     try
         couch_api_wrap:changes_since(Db, all_docs, StartSeq,
             fun(#doc_info{high_seq = Seq} = DocInfo) ->
-                Ts = get(row_ts),
-                ok = couch_work_queue:queue(ChangesQueue, {Ts, DocInfo}),
-                put(row_ts, Ts + 1),
+                ok = couch_work_queue:queue(ChangesQueue, DocInfo),
                 put(last_seq, Seq)
             end, Options),
         couch_work_queue:close(ChangesQueue)
@@ -635,6 +588,27 @@ read_changes(StartSeq, Db, ChangesQueue,
     end.
 
 
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+    spawn_link(fun() ->
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+    end).
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+    receive
+    {get_changes, From} ->
+        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+        closed ->
+            From ! {closed, self()};
+        {ok, Changes} ->
+            #doc_info{high_seq = Seq} = lists:last(Changes),
+            ReportSeq = {Ts, Seq},
+            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+            From ! {changes, self(), Changes, ReportSeq}
+        end,
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+    end.
+
+
 checkpoint_interval(_State) ->
     5000.
 

Added: couchdb/trunk/src/couchdb/couch_replicator_worker.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_worker.erl?rev=1149412&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_worker.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_worker.erl Fri Jul 22 00:04:30 
2011
@@ -0,0 +1,521 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_worker).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/5]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% TODO: maybe make both buffer max sizes configurable
+-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).   % for remote targets
+-define(DOC_BUFFER_LEN, 10).                 % for local targets, # of 
documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+-define(MAX_BULK_ATTS_PER_DOC, 8).
+
+-import(couch_replicator_utils, [
+    open_db/1,
+    close_db/1,
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1
+]).
+-import(couch_util, [
+    to_binary/1,
+    get_value/2,
+    get_value/3
+]).
+
+
+-record(batch, {
+    docs = [],
+    size = 0
+}).
+
+-record(state, {
+    loop,
+    max_parallel_conns,
+    source,
+    target,
+    readers = [],
+    writer = nil,
+    pending_fetch = nil,
+    flush_waiter = nil,
+    stats = #rep_stats{},
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    batch = #batch{}
+}).
+
+
+
+start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) ->
+    Pid = spawn_link(fun() ->
+        queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
+    end),
+    {ok, Pid};
+
+start_link(Cp, Source, Target, ChangesManager, MaxConns) ->
+    gen_server:start_link(
+        ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
+
+
+init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+    process_flag(trap_exit, true),
+    Parent = self(),
+    LoopPid = spawn_link(fun() ->
+        queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+    end),
+    State = #state{
+        max_parallel_conns = MaxConns,
+        loop = LoopPid,
+        source = open_db(Source),
+        target = open_db(Target),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self())
+    },
+    {ok, State}.
+
+
+handle_call({fetch_doc, {_Id, Revs, _PAs} = Params}, {Pid, _} = From,
+    #state{loop = Pid, readers = Readers, pending_fetch = nil,
+        stats = Stats, source = Src, target = Tgt,
+        max_parallel_conns = MaxConns} = State) ->
+    Stats2 = Stats#rep_stats{
+        missing_checked = Stats#rep_stats.missing_checked + length(Revs),
+        missing_found = Stats#rep_stats.missing_found + length(Revs)
+    },
+    case length(Readers) of
+    Size when Size < MaxConns ->
+        Reader = spawn_doc_reader(Src, Tgt, Params),
+        NewState = State#state{
+            stats = Stats2,
+            readers = [Reader | Readers]
+        },
+        {reply, ok, NewState};
+    _ ->
+        NewState = State#state{
+            stats = Stats2,
+            pending_fetch = {From, Params}
+        },
+        {noreply, NewState}
+    end;
+
+handle_call({batch_doc, Doc}, From, State) ->
+    gen_server:reply(From, ok),
+    {noreply, maybe_flush_docs(Doc, State)};
+
+handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        docs_written = Stats#rep_stats.docs_written + 1
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + 1
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call({add_write_stats, Written, Failed}, _From,
+    #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + Failed
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call(flush, {Pid, _} = From,
+    #state{loop = Pid, writer = nil, flush_waiter = nil,
+        target = Target, batch = Batch} = State) ->
+    State2 = case State#state.readers of
+    [] ->
+        State#state{writer = spawn_writer(Target, Batch)};
+    _ ->
+        State
+    end,
+    {noreply, State2#state{flush_waiter = From}}.
+
+
+handle_cast({db_compacted, DbName},
+    #state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#state{target = NewTarget}};
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_async_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+    #state{
+        batch = #batch{docs = []}, readers = [], writer = nil,
+        pending_fetch = nil, flush_waiter = nil
+    } = State,
+    {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) ->
+    {noreply, after_full_flush(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+    #state{
+        readers = Readers, writer = Writer, batch = Batch,
+        source = Source, target = Target,
+        pending_fetch = Fetch, flush_waiter = FlushWaiter
+    } = State,
+    case Readers -- [Pid] of
+    Readers ->
+        {noreply, State};
+    Readers2 ->
+        State2 = case Fetch of
+        nil ->
+            case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso
+                (Readers2 =:= [])  of
+            true ->
+                State#state{
+                    readers = Readers2,
+                    writer = spawn_writer(Target, Batch)
+                };
+            false ->
+                State#state{readers = Readers2}
+            end;
+        {From, FetchParams} ->
+            Reader = spawn_doc_reader(Source, Target, FetchParams),
+            gen_server:reply(From, ok),
+            State#state{
+                readers = [Reader | Readers2],
+                pending_fetch = nil
+            }
+        end,
+        {noreply, State2}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+   {stop, {process_died, Pid, Reason}, State}.
+
+
+terminate(_Reason, State) ->
+    close_db(State#state.source),
+    close_db(State#state.target),
+    stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+    ChangesManager ! {get_changes, self()},
+    receive
+    {closed, ChangesManager} ->
+        ok;
+    {changes, ChangesManager, Changes, ReportSeq} ->
+        Target2 = open_db(Target),
+        {IdRevs, NotMissingCount} = find_missing(Changes, Target2),
+        ok = gen_server:cast(Cp, {report_seq, ReportSeq}),
+        case Source of
+        #db{} ->
+            Source2 = open_db(Source),
+            Stats = local_process_batch(
+                IdRevs, Source2, Target2, #batch{}, #rep_stats{}),
+            close_db(Source2);
+        #httpdb{} ->
+            remote_process_batch(IdRevs, Parent),
+            {ok, Stats} = gen_server:call(Parent, flush, infinity)
+        end,
+        close_db(Target2),
+        Stats2 = Stats#rep_stats{
+            missing_checked = Stats#rep_stats.missing_checked + NotMissingCount
+        },
+        ok = gen_server:cast(Cp, {report_seq_done, ReportSeq, Stats2}),
+        ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
+        queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+    end.
+
+
+local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) ->
+    Stats;
+
+local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, 
Stats) ->
+    case Target of
+    #httpdb{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    #db{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
+    end,
+    {Written, WriteFailures} = flush_docs(Target, Docs),
+    Stats#rep_stats{
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+    };
+
+local_process_batch([IdRevs | Rest], Source, Target, Batch, Stats) ->
+    {_Id, Revs, _PAs} = IdRevs,
+    {ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc(
+        Source, IdRevs, fun local_doc_handler/2, {Target, [], 0, 0}),
+    Read = length(DocList) + Written0 + WriteFailures0,
+    {Batch2, Written, WriteFailures} = lists:foldl(
+        fun(Doc, {Batch0, W0, F0}) ->
+            {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc),
+            {Batch1, W0 + W, F0 + F}
+        end,
+        {Batch, Written0, WriteFailures0}, DocList),
+    Stats2 = Stats#rep_stats{
+        missing_checked = Stats#rep_stats.missing_checked + length(Revs),
+        missing_found = Stats#rep_stats.missing_found + length(Revs),
+        docs_read = Stats#rep_stats.docs_read + Read,
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+    },
+    local_process_batch(Rest, Source, Target, Batch2, Stats2).
+
+
+remote_process_batch([], _Parent) ->
+    ok;
+
+remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
+    % When the source is a remote database, we fetch a single document revision
+    % per HTTP request. This is mostly to facilitate retrying of HTTP requests
+    % due to network transient failures. It also helps not exceeding the 
maximum
+    % URL length allowed by proxies and Mochiweb.
+    lists:foreach(
+        fun(Rev) ->
+            ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, 
infinity)
+        end,
+        Revs),
+    remote_process_batch(Rest, Parent).
+
+
+spawn_doc_reader(Source, Target, FetchParams) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        Source2 = open_db(Source),
+        fetch_doc(
+            Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+        close_db(Source2)
+    end).
+
+
+fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
+    try
+        couch_api_wrap:open_doc_revs(
+            Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc)
+    catch
+    throw:{missing_stub, _} ->
+        ?LOG_ERROR("Retrying fetch and update of document `~p` due to out of "
+            "sync attachment stubs. Missing revisions are: ~s",
+            [Id, couch_doc:revs_to_strs(Revs)]),
+        couch_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc)
+    end.
+
+
+local_doc_handler({ok, Doc}, {Target, DocList, W, F}) ->
+    case batch_doc(Doc) of
+    true ->
+        {ok, {Target, [Doc | DocList], W, F}};
+    false ->
+        ?LOG_DEBUG("Worker flushing doc with attachments", []),
+        Target2 = open_db(Target),
+        Success = (flush_doc(Target2, Doc) =:= ok),
+        close_db(Target2),
+        case Success of
+        true ->
+            {ok, {Target, DocList, W + 1, F}};
+        false ->
+            {ok, {Target, DocList, W, F + 1}}
+        end
+    end;
+local_doc_handler(_, Acc) ->
+    {ok, Acc}.
+
+
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+    ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+    {ok, Acc};
+remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+    % Immediately flush documents with attachments received from a remote
+    % source. The data property of each attachment is a function that starts
+    % streaming the attachment data from the remote source, therefore it's
+    % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+    ?LOG_DEBUG("Worker flushing doc with attachments", []),
+    Target2 = open_db(Target),
+    Success = (flush_doc(Target2, Doc) =:= ok),
+    ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
+    close_db(Target2),
+    case Success of
+    true ->
+        {ok, Acc};
+    false ->
+        {skip, Acc}
+    end;
+remote_doc_handler(_, Acc) ->
+    {ok, Acc}.
+
+
+spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+    case {Target, Size > 0} of
+    {#httpdb{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    {#db{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]);
+    _ ->
+        ok
+    end,
+    Parent = self(),
+    spawn_link(
+        fun() ->
+            Target2 = open_db(Target),
+            {Written, Failed} = flush_docs(Target2, DocList),
+            close_db(Target2),
+            ok = gen_server:call(
+                Parent, {add_write_stats, Written, Failed}, infinity)
+        end).
+
+
+after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
+    gen_server:reply(Waiter, {ok, Stats}),
+    State#state{
+        stats = #rep_stats{},
+        flush_waiter = nil,
+        writer = nil,
+        batch = #batch{}
+    }.
+
+
+maybe_flush_docs(Doc, #state{target = Target, batch = Batch,
+        stats = Stats} = State) ->
+    {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc),
+    Stats2 = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        docs_written = Stats#rep_stats.docs_written + W,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + F
+    },
+    State#state{
+        stats = Stats2,
+        batch = Batch2
+    }.
+
+
+maybe_flush_docs(#httpdb{} = Target,
+    #batch{docs = DocAcc, size = SizeAcc} = Batch, Doc) ->
+    case batch_doc(Doc) of
+    false ->
+        ?LOG_DEBUG("Worker flushing doc with attachments", []),
+        case flush_doc(Target, Doc) of
+        ok ->
+            {Batch, 1, 0};
+        _ ->
+            {Batch, 0, 1}
+        end;
+    true ->
+        JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, 
attachments])),
+        case SizeAcc + iolist_size(JsonDoc) of
+        SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+            ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", 
[SizeAcc2]),
+            {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]),
+            {#batch{}, Written, Failed};
+        SizeAcc2 ->
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0}
+        end
+    end;
+
+maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+    case SizeAcc + 1 of
+    SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
+        {Written, Failed} = flush_docs(Target, [Doc | DocAcc]),
+        {#batch{}, Written, Failed};
+    SizeAcc2 ->
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0}
+    end.
+
+
+batch_doc(#doc{atts = []}) ->
+    true;
+batch_doc(#doc{atts = Atts}) ->
+    (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso
+        lists:all(
+            fun(#att{disk_len = L, data = Data}) ->
+                (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub)
+            end, Atts).
+
+
+flush_docs(_Target, []) ->
+    {0, 0};
+
+flush_docs(Target, DocList) ->
+    {ok, Errors} = couch_api_wrap:update_docs(
+        Target, DocList, [delay_commit], replicated_changes),
+    DbUri = couch_api_wrap:db_uri(Target),
+    lists:foreach(
+        fun({Props}) ->
+            ?LOG_ERROR("Replicator: couldn't write document `~s`, revision 
`~s`,"
+                " to target database `~s`. Error: `~s`, reason: `~s`.",
+                [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
+                    get_value(error, Props, ""), get_value(reason, Props, "")])
+        end, Errors),
+    {length(DocList) - length(Errors), length(Errors)}.
+
+flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
+    try couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    {ok, _} ->
+        ok;
+    Error ->
+        ?LOG_ERROR("Replicator: error writing document `~s` to `~s`: ~s",
+            [Id, couch_api_wrap:db_uri(Target), couch_util:to_binary(Error)]),
+        Error
+    catch
+    throw:{missing_stub, _} = MissingStub ->
+        throw(MissingStub);
+    throw:{Error, Reason} ->
+        ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+            " to target database `~s`. Error: `~s`, reason: `~s`.",
+            [Id, couch_doc:rev_to_str({Pos, RevId}),
+                couch_api_wrap:db_uri(Target), to_binary(Error), 
to_binary(Reason)]),
+        {error, Error};
+    throw:Err ->
+        ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+            " to target database `~s`. Error: `~s`.",
+            [Id, couch_doc:rev_to_str({Pos, RevId}),
+                couch_api_wrap:db_uri(Target), to_binary(Err)]),
+        {error, Err}
+    end.
+
+
+find_missing(DocInfos, Target) ->
+    {IdRevs, AllRevsCount} = lists:foldr(
+        fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) ->
+            Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
+            {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)}
+        end,
+        {[], 0}, DocInfos),
+    {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+    MissingRevsCount = lists:foldl(
+        fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
+        0, Missing),
+    {Missing, AllRevsCount - MissingRevsCount}.

Modified: couchdb/trunk/test/etap/001-load.t
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/test/etap/001-load.t?rev=1149412&r1=1149411&r2=1149412&view=diff
==============================================================================
--- couchdb/trunk/test/etap/001-load.t (original)
+++ couchdb/trunk/test/etap/001-load.t Fri Jul 22 00:04:30 2011
@@ -17,7 +17,7 @@
 
 main(_) ->
     test_util:init_code_path(),
-    etap:plan(51),
+    etap:plan(50),
     Modules = [
         couch_auth_cache,
         couch_api_wrap,
@@ -55,8 +55,7 @@ main(_) ->
         couch_replication_manager,
         couch_replication_notifier,
         couch_replicator,
-        couch_replicator_doc_copier,
-        couch_replicator_rev_finder,
+        couch_replicator_worker,
         couch_replicator_utils,
         couch_rep_sup,
         couch_server,


Reply via email to