Author: fdmanana Date: Fri Jul 22 00:04:30 2011 New Revision: 1149412 URL: http://svn.apache.org/viewvc?rev=1149412&view=rev Log: Refactor/simplify replicator
For each worker (doc copier) process there used to be a rev finder process. A rev finder dequeues from the _changes rows queue, finds which document IDs and revisions are missing in the target and queues the IDs and revisions of those that are missing into a second queue. Finally worker processes dequeue up to N elements from this queue. This model was reduced to have the workers dequeing directly from the _changes rows queue and find which document IDs and revisions are missing in the target. This makes the model simpler without suffering any performance penalty - there's still enough parallelism to continue to be as efficient as before. Added: couchdb/trunk/src/couchdb/couch_replicator_worker.erl Removed: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Modified: couchdb/trunk/src/couchdb/Makefile.am couchdb/trunk/src/couchdb/couch_replicator.erl couchdb/trunk/test/etap/001-load.t Modified: couchdb/trunk/src/couchdb/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=1149412&r1=1149411&r2=1149412&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/Makefile.am (original) +++ couchdb/trunk/src/couchdb/Makefile.am Fri Jul 22 00:04:30 2011 @@ -73,8 +73,7 @@ source_files = \ couch_replication_manager.erl \ couch_replication_notifier.erl \ couch_replicator.erl \ - couch_replicator_doc_copier.erl \ - couch_replicator_rev_finder.erl \ + couch_replicator_worker.erl \ couch_replicator_utils.erl \ couch_secondary_sup.erl \ couch_server.erl \ @@ -143,8 +142,7 @@ compiled_files = \ couch_replication_manager.beam \ couch_replication_notifier.beam \ couch_replicator.beam \ - couch_replicator_doc_copier.beam \ - couch_replicator_rev_finder.beam \ + couch_replicator_worker.beam \ couch_replicator_utils.beam \ couch_secondary_sup.beam \ couch_server.beam \ Modified: couchdb/trunk/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1149412&r1=1149411&r2=1149412&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul 22 00:04:30 2011 @@ -58,10 +58,9 @@ src_starttime, tgt_starttime, timer, % checkpoint timer - missing_revs_queue, changes_queue, + changes_manager, changes_reader, - missing_rev_finders, workers, stats = #rep_stats{}, session_id, @@ -217,41 +216,28 @@ do_init(#rep{options = Options, id = {Ba source_seq = SourceCurSeq } = State = init_state(Rep), - CopiersCount = get_value(worker_processes, Options), - RevFindersCount = CopiersCount, + NumWorkers = get_value(worker_processes, Options), BatchSize = get_value(worker_batch_size, Options), - {ok, MissingRevsQueue} = couch_work_queue:new([ - {multi_workers, true}, - {max_items, trunc(CopiersCount * 2.0)} - ]), {ok, ChangesQueue} = couch_work_queue:new([ - {multi_workers, true}, - {max_items, trunc(BatchSize * RevFindersCount * 2.0)} + {max_items, trunc(BatchSize * NumWorkers * 2.0)} ]), % This starts the _changes reader process. It adds the changes from % the source db to the ChangesQueue. - ChangesReader = spawn_changes_reader( - StartSeq, Source, ChangesQueue, Options), - % This starts the missing rev finders. They check the target for changes - % in the ChangesQueue to see if they exist on the target or not. If not, - % adds them to MissingRevsQueue. - MissingRevFinders = lists:map( - fun(_) -> - {ok, Pid} = couch_replicator_rev_finder:start_link( - self(), Target, ChangesQueue, MissingRevsQueue, BatchSize), - Pid - end, - lists:seq(1, RevFindersCount)), - % This starts the doc copy processes. They fetch documents from the - % MissingRevsQueue and copy them from the source to the target database. + ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options), + % Changes manager - responsible for dequeing batches from the changes queue + % and deliver them to the worker processes. + ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize), + % This starts the worker processes. They ask the changes queue manager for a + % a batch of _changes rows to process -> check which revs are missing in the + % target, and for the missing ones, it copies them from the source to the target. MaxConns = get_value(http_connections, Options), Workers = lists:map( fun(_) -> - {ok, Pid} = couch_replicator_doc_copier:start_link( - self(), Source, Target, MissingRevsQueue, MaxConns), + {ok, Pid} = couch_replicator_worker:start_link( + self(), Source, Target, ChangesManager, MaxConns), Pid end, - lists:seq(1, CopiersCount)), + lists:seq(1, NumWorkers)), couch_task_status:add_task( "Replication", @@ -275,7 +261,7 @@ do_init(#rep{options = Options, id = {Ba "~c~p HTTP connections~n" "~ca connection timeout of ~p milliseconds~n" "~csocket options are: ~s~s", - [BaseId ++ Ext, $\t, CopiersCount, $\t, BatchSize, $\t, + [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t, MaxConns, $\t, get_value(connection_timeout, Options), $\t, io_lib:format("~p", [get_value(socket_options, Options)]), case StartSeq of @@ -285,16 +271,14 @@ do_init(#rep{options = Options, id = {Ba io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq]) end]), - ?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]), ?LOG_DEBUG("Worker pids are: ~p", [Workers]), couch_replication_manager:replication_started(Rep), {ok, State#rep_state{ - missing_revs_queue = MissingRevsQueue, changes_queue = ChangesQueue, + changes_manager = ChangesManager, changes_reader = ChangesReader, - missing_rev_finders = MissingRevFinders, workers = Workers } }. @@ -315,12 +299,12 @@ handle_info({'EXIT', Pid, Reason}, #rep_ ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]), {stop, changes_reader_died, cancel_timer(State)}; -handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) -> - {noreply, St}; +handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> + {noreply, State}; -handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) -> - ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]), - {stop, missing_revs_queue_died, cancel_timer(St)}; +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> + ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]), + {stop, changes_manager_died, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> {noreply, State}; @@ -329,53 +313,26 @@ handle_info({'EXIT', Pid, Reason}, #rep_ ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]), {stop, changes_queue_died, cancel_timer(State)}; -handle_info({'EXIT', Pid, normal}, State) -> - #rep_state{ - workers = Workers, - missing_rev_finders = RevFinders, - missing_revs_queue = RevsQueue - } = State, - case lists:member(Pid, RevFinders) of - false -> - case lists:member(Pid, Workers) of - false -> - {stop, {unknown_process_died, Pid, normal}, State}; - true -> - case Workers -- [Pid] of - [] -> - do_last_checkpoint(State); - Workers2 -> - {noreply, State#rep_state{workers = Workers2}} - end - end; - true -> - case RevFinders -- [Pid] of - [] -> - couch_work_queue:close(RevsQueue), - {noreply, State#rep_state{missing_rev_finders = []}}; - RevFinders2 -> - {noreply, State#rep_state{missing_rev_finders = RevFinders2}} - end +handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> + case Workers -- [Pid] of + Workers -> + {stop, {unknown_process_died, Pid, normal}, State}; + [] -> + catch unlink(State#rep_state.changes_manager), + catch exit(State#rep_state.changes_manager, kill), + do_last_checkpoint(State); + Workers2 -> + {noreply, State#rep_state{workers = Workers2}} end; -handle_info({'EXIT', Pid, Reason}, State) -> - #rep_state{ - workers = Workers, - missing_rev_finders = RevFinders - } = State, +handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> State2 = cancel_timer(State), case lists:member(Pid, Workers) of false -> - case lists:member(Pid, RevFinders) of - false -> - {stop, {unknown_process_died, Pid, Reason}, State2}; - true -> - ?LOG_ERROR("RevsFinder ~p died with reason: ~p", [Pid, Reason]), - {stop, {revs_finder_died, Pid, Reason}, State2} - end; + {stop, {unknown_process_died, Pid, Reason}, State2}; true -> - ?LOG_ERROR("DocCopier ~p died with reason: ~p", [Pid, Reason]), - {stop, {doc_copier_died, Pid, Reason}, State2} + ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]), + {stop, {worker_died, Pid, Reason}, State2} end. @@ -592,12 +549,10 @@ spawn_changes_reader(StartSeq, #httpdb{} spawn_link(fun() -> put(last_seq, StartSeq), put(retries_left, Db#httpdb.retries), - put(row_ts, 1), read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options) end); spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) -> spawn_link(fun() -> - put(row_ts, 1), read_changes(StartSeq, Db, ChangesQueue, Options) end). @@ -605,9 +560,7 @@ read_changes(StartSeq, Db, ChangesQueue, try couch_api_wrap:changes_since(Db, all_docs, StartSeq, fun(#doc_info{high_seq = Seq} = DocInfo) -> - Ts = get(row_ts), - ok = couch_work_queue:queue(ChangesQueue, {Ts, DocInfo}), - put(row_ts, Ts + 1), + ok = couch_work_queue:queue(ChangesQueue, DocInfo), put(last_seq, Seq) end, Options), couch_work_queue:close(ChangesQueue) @@ -635,6 +588,27 @@ read_changes(StartSeq, Db, ChangesQueue, end. +spawn_changes_manager(Parent, ChangesQueue, BatchSize) -> + spawn_link(fun() -> + changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1) + end). + +changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> + receive + {get_changes, From} -> + case couch_work_queue:dequeue(ChangesQueue, BatchSize) of + closed -> + From ! {closed, self()}; + {ok, Changes} -> + #doc_info{high_seq = Seq} = lists:last(Changes), + ReportSeq = {Ts, Seq}, + ok = gen_server:cast(Parent, {report_seq, ReportSeq}), + From ! {changes, self(), Changes, ReportSeq} + end, + changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1) + end. + + checkpoint_interval(_State) -> 5000. Added: couchdb/trunk/src/couchdb/couch_replicator_worker.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_worker.erl?rev=1149412&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_worker.erl (added) +++ couchdb/trunk/src/couchdb/couch_replicator_worker.erl Fri Jul 22 00:04:30 2011 @@ -0,0 +1,521 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_worker). +-behaviour(gen_server). + +% public API +-export([start_link/5]). + +% gen_server callbacks +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). +-include("couch_replicator.hrl"). + +% TODO: maybe make both buffer max sizes configurable +-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets +-define(DOC_BUFFER_LEN, 10). % for local targets, # of documents +-define(MAX_BULK_ATT_SIZE, 64 * 1024). +-define(MAX_BULK_ATTS_PER_DOC, 8). + +-import(couch_replicator_utils, [ + open_db/1, + close_db/1, + start_db_compaction_notifier/2, + stop_db_compaction_notifier/1 +]). +-import(couch_util, [ + to_binary/1, + get_value/2, + get_value/3 +]). + + +-record(batch, { + docs = [], + size = 0 +}). + +-record(state, { + loop, + max_parallel_conns, + source, + target, + readers = [], + writer = nil, + pending_fetch = nil, + flush_waiter = nil, + stats = #rep_stats{}, + source_db_compaction_notifier = nil, + target_db_compaction_notifier = nil, + batch = #batch{} +}). + + + +start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) -> + Pid = spawn_link(fun() -> + queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) + end), + {ok, Pid}; + +start_link(Cp, Source, Target, ChangesManager, MaxConns) -> + gen_server:start_link( + ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []). + + +init({Cp, Source, Target, ChangesManager, MaxConns}) -> + process_flag(trap_exit, true), + Parent = self(), + LoopPid = spawn_link(fun() -> + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + end), + State = #state{ + max_parallel_conns = MaxConns, + loop = LoopPid, + source = open_db(Source), + target = open_db(Target), + source_db_compaction_notifier = + start_db_compaction_notifier(Source, self()), + target_db_compaction_notifier = + start_db_compaction_notifier(Target, self()) + }, + {ok, State}. + + +handle_call({fetch_doc, {_Id, Revs, _PAs} = Params}, {Pid, _} = From, + #state{loop = Pid, readers = Readers, pending_fetch = nil, + stats = Stats, source = Src, target = Tgt, + max_parallel_conns = MaxConns} = State) -> + Stats2 = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + length(Revs), + missing_found = Stats#rep_stats.missing_found + length(Revs) + }, + case length(Readers) of + Size when Size < MaxConns -> + Reader = spawn_doc_reader(Src, Tgt, Params), + NewState = State#state{ + stats = Stats2, + readers = [Reader | Readers] + }, + {reply, ok, NewState}; + _ -> + NewState = State#state{ + stats = Stats2, + pending_fetch = {From, Params} + }, + {noreply, NewState} + end; + +handle_call({batch_doc, Doc}, From, State) -> + gen_server:reply(From, ok), + {noreply, maybe_flush_docs(Doc, State)}; + +handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + docs_written = Stats#rep_stats.docs_written + 1 + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + doc_write_failures = Stats#rep_stats.doc_write_failures + 1 + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call({add_write_stats, Written, Failed}, _From, + #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + Failed + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call(flush, {Pid, _} = From, + #state{loop = Pid, writer = nil, flush_waiter = nil, + target = Target, batch = Batch} = State) -> + State2 = case State#state.readers of + [] -> + State#state{writer = spawn_writer(Target, Batch)}; + _ -> + State + end, + {noreply, State2#state{flush_waiter = From}}. + + +handle_cast({db_compacted, DbName}, + #state{source = #db{name = DbName} = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast({db_compacted, DbName}, + #state{target = #db{name = DbName} = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; + +handle_cast(Msg, State) -> + {stop, {unexpected_async_call, Msg}, State}. + + +handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) -> + #state{ + batch = #batch{docs = []}, readers = [], writer = nil, + pending_fetch = nil, flush_waiter = nil + } = State, + {stop, normal, State}; + +handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) -> + {noreply, after_full_flush(State)}; + +handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> + #state{ + readers = Readers, writer = Writer, batch = Batch, + source = Source, target = Target, + pending_fetch = Fetch, flush_waiter = FlushWaiter + } = State, + case Readers -- [Pid] of + Readers -> + {noreply, State}; + Readers2 -> + State2 = case Fetch of + nil -> + case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso + (Readers2 =:= []) of + true -> + State#state{ + readers = Readers2, + writer = spawn_writer(Target, Batch) + }; + false -> + State#state{readers = Readers2} + end; + {From, FetchParams} -> + Reader = spawn_doc_reader(Source, Target, FetchParams), + gen_server:reply(From, ok), + State#state{ + readers = [Reader | Readers2], + pending_fetch = nil + } + end, + {noreply, State2} + end; + +handle_info({'EXIT', Pid, Reason}, State) -> + {stop, {process_died, Pid, Reason}, State}. + + +terminate(_Reason, State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_compaction_notifier(State#state.source_db_compaction_notifier), + stop_db_compaction_notifier(State#state.target_db_compaction_notifier). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> + ChangesManager ! {get_changes, self()}, + receive + {closed, ChangesManager} -> + ok; + {changes, ChangesManager, Changes, ReportSeq} -> + Target2 = open_db(Target), + {IdRevs, NotMissingCount} = find_missing(Changes, Target2), + ok = gen_server:cast(Cp, {report_seq, ReportSeq}), + case Source of + #db{} -> + Source2 = open_db(Source), + Stats = local_process_batch( + IdRevs, Source2, Target2, #batch{}, #rep_stats{}), + close_db(Source2); + #httpdb{} -> + remote_process_batch(IdRevs, Parent), + {ok, Stats} = gen_server:call(Parent, flush, infinity) + end, + close_db(Target2), + Stats2 = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + NotMissingCount + }, + ok = gen_server:cast(Cp, {report_seq_done, ReportSeq, Stats2}), + ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]), + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + end. + + +local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) -> + Stats; + +local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, Stats) -> + case Target of + #httpdb{} -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); + #db{} -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]) + end, + {Written, WriteFailures} = flush_docs(Target, Docs), + Stats#rep_stats{ + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures + }; + +local_process_batch([IdRevs | Rest], Source, Target, Batch, Stats) -> + {_Id, Revs, _PAs} = IdRevs, + {ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc( + Source, IdRevs, fun local_doc_handler/2, {Target, [], 0, 0}), + Read = length(DocList) + Written0 + WriteFailures0, + {Batch2, Written, WriteFailures} = lists:foldl( + fun(Doc, {Batch0, W0, F0}) -> + {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc), + {Batch1, W0 + W, F0 + F} + end, + {Batch, Written0, WriteFailures0}, DocList), + Stats2 = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + length(Revs), + missing_found = Stats#rep_stats.missing_found + length(Revs), + docs_read = Stats#rep_stats.docs_read + Read, + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures + }, + local_process_batch(Rest, Source, Target, Batch2, Stats2). + + +remote_process_batch([], _Parent) -> + ok; + +remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> + % When the source is a remote database, we fetch a single document revision + % per HTTP request. This is mostly to facilitate retrying of HTTP requests + % due to network transient failures. It also helps not exceeding the maximum + % URL length allowed by proxies and Mochiweb. + lists:foreach( + fun(Rev) -> + ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity) + end, + Revs), + remote_process_batch(Rest, Parent). + + +spawn_doc_reader(Source, Target, FetchParams) -> + Parent = self(), + spawn_link(fun() -> + Source2 = open_db(Source), + fetch_doc( + Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), + close_db(Source2) + end). + + +fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> + try + couch_api_wrap:open_doc_revs( + Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc) + catch + throw:{missing_stub, _} -> + ?LOG_ERROR("Retrying fetch and update of document `~p` due to out of " + "sync attachment stubs. Missing revisions are: ~s", + [Id, couch_doc:revs_to_strs(Revs)]), + couch_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc) + end. + + +local_doc_handler({ok, Doc}, {Target, DocList, W, F}) -> + case batch_doc(Doc) of + true -> + {ok, {Target, [Doc | DocList], W, F}}; + false -> + ?LOG_DEBUG("Worker flushing doc with attachments", []), + Target2 = open_db(Target), + Success = (flush_doc(Target2, Doc) =:= ok), + close_db(Target2), + case Success of + true -> + {ok, {Target, DocList, W + 1, F}}; + false -> + {ok, {Target, DocList, W, F + 1}} + end + end; +local_doc_handler(_, Acc) -> + {ok, Acc}. + + +remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> + ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), + {ok, Acc}; +remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> + % Immediately flush documents with attachments received from a remote + % source. The data property of each attachment is a function that starts + % streaming the attachment data from the remote source, therefore it's + % convenient to call it ASAP to avoid ibrowse inactivity timeouts. + ?LOG_DEBUG("Worker flushing doc with attachments", []), + Target2 = open_db(Target), + Success = (flush_doc(Target2, Doc) =:= ok), + ok = gen_server:call(Parent, {doc_flushed, Success}, infinity), + close_db(Target2), + case Success of + true -> + {ok, Acc}; + false -> + {skip, Acc} + end; +remote_doc_handler(_, Acc) -> + {ok, Acc}. + + +spawn_writer(Target, #batch{docs = DocList, size = Size}) -> + case {Target, Size > 0} of + {#httpdb{}, true} -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); + {#db{}, true} -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]); + _ -> + ok + end, + Parent = self(), + spawn_link( + fun() -> + Target2 = open_db(Target), + {Written, Failed} = flush_docs(Target2, DocList), + close_db(Target2), + ok = gen_server:call( + Parent, {add_write_stats, Written, Failed}, infinity) + end). + + +after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> + gen_server:reply(Waiter, {ok, Stats}), + State#state{ + stats = #rep_stats{}, + flush_waiter = nil, + writer = nil, + batch = #batch{} + }. + + +maybe_flush_docs(Doc, #state{target = Target, batch = Batch, + stats = Stats} = State) -> + {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc), + Stats2 = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + docs_written = Stats#rep_stats.docs_written + W, + doc_write_failures = Stats#rep_stats.doc_write_failures + F + }, + State#state{ + stats = Stats2, + batch = Batch2 + }. + + +maybe_flush_docs(#httpdb{} = Target, + #batch{docs = DocAcc, size = SizeAcc} = Batch, Doc) -> + case batch_doc(Doc) of + false -> + ?LOG_DEBUG("Worker flushing doc with attachments", []), + case flush_doc(Target, Doc) of + ok -> + {Batch, 1, 0}; + _ -> + {Batch, 0, 1} + end; + true -> + JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])), + case SizeAcc + iolist_size(JsonDoc) of + SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]), + {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]), + {#batch{}, Written, Failed}; + SizeAcc2 -> + {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0} + end + end; + +maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> + case SizeAcc + 1 of + SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]), + {Written, Failed} = flush_docs(Target, [Doc | DocAcc]), + {#batch{}, Written, Failed}; + SizeAcc2 -> + {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0} + end. + + +batch_doc(#doc{atts = []}) -> + true; +batch_doc(#doc{atts = Atts}) -> + (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso + lists:all( + fun(#att{disk_len = L, data = Data}) -> + (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub) + end, Atts). + + +flush_docs(_Target, []) -> + {0, 0}; + +flush_docs(Target, DocList) -> + {ok, Errors} = couch_api_wrap:update_docs( + Target, DocList, [delay_commit], replicated_changes), + DbUri = couch_api_wrap:db_uri(Target), + lists:foreach( + fun({Props}) -> + ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri, + get_value(error, Props, ""), get_value(reason, Props, "")]) + end, Errors), + {length(DocList) - length(Errors), length(Errors)}. + +flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> + try couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of + {ok, _} -> + ok; + Error -> + ?LOG_ERROR("Replicator: error writing document `~s` to `~s`: ~s", + [Id, couch_api_wrap:db_uri(Target), couch_util:to_binary(Error)]), + Error + catch + throw:{missing_stub, _} = MissingStub -> + throw(MissingStub); + throw:{Error, Reason} -> + ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [Id, couch_doc:rev_to_str({Pos, RevId}), + couch_api_wrap:db_uri(Target), to_binary(Error), to_binary(Reason)]), + {error, Error}; + throw:Err -> + ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`.", + [Id, couch_doc:rev_to_str({Pos, RevId}), + couch_api_wrap:db_uri(Target), to_binary(Err)]), + {error, Err} + end. + + +find_missing(DocInfos, Target) -> + {IdRevs, AllRevsCount} = lists:foldr( + fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) -> + Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo], + {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)} + end, + {[], 0}, DocInfos), + {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs), + MissingRevsCount = lists:foldl( + fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, + 0, Missing), + {Missing, AllRevsCount - MissingRevsCount}. Modified: couchdb/trunk/test/etap/001-load.t URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/001-load.t?rev=1149412&r1=1149411&r2=1149412&view=diff ============================================================================== --- couchdb/trunk/test/etap/001-load.t (original) +++ couchdb/trunk/test/etap/001-load.t Fri Jul 22 00:04:30 2011 @@ -17,7 +17,7 @@ main(_) -> test_util:init_code_path(), - etap:plan(51), + etap:plan(50), Modules = [ couch_auth_cache, couch_api_wrap, @@ -55,8 +55,7 @@ main(_) -> couch_replication_manager, couch_replication_notifier, couch_replicator, - couch_replicator_doc_copier, - couch_replicator_rev_finder, + couch_replicator_worker, couch_replicator_utils, couch_rep_sup, couch_server,