This is an automated email from the ASF dual-hosted git repository. nickva pushed a commit to branch optimize-replicator in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d0ae439af625939bb1624b42abc33fd84105b001 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue May 5 01:47:23 2026 -0400 Optimize replicator There are two related optimizations: Use gen casts to queue docs for _bulk_docs in replication workers instead of gen calls. Previously, queue_fetch loop from the worker used a gen_sever call to add each doc to the parent's (worker's) batch. Even though it was an immediate gen_server return, it was still synchronous call, and if the worker was in the middle of a _bulk_docs flush, it couldn't reply to the queue_fetch_loop and stall it for that moment. The optimization is to use casts instead. This way, the fetch loop can continue fetching (running bulk_gets) without having to periodically wait for individual batch_doc calls. The bulk_docs pending queue is still properly bounded, as at the end of the fetch loop there is a `gen_server:call(Parent, flush, infinity)` call which will properly backpropagate the pressure for a slow target The second optimization is to increase the bulk_docs worker memory limit a bit from 500KB to 4MB. Previously 500KB always restricted _bulk_docs batch size to always be less than 500KB regardless what the user set as the replicator worker batch size. This way we'll still limit the maximum batch size at 4MB, but since it's a higher limit and users's replicator batch size will have a wider range effective range to take effect (in other words if some users want to set a batch size of 2500, they can now) A quick benchmark with a script [1] replicating 100k 2KB docs shows a local speedup of 32 -> 22 seconds ``` ./rep_bench.py --ndocs 100000 --doc-size 2048 --source-url http://localhost:15984 --target-url http://localhost:25984 source: http://localhost:15984/rep_bench_source target: http://localhost:25984 (db prefix: rep_bench_target) coordinator: http://localhost:15984 poll: 2.0s http_timeout: 600s docs: 100000 doc_size: 2048 n_jobs: 1 == source == loading 100000 docs into rep_bench_source (doc_size=2048, batch=500)... loaded 100000 docs in 23.8s, size 27.0MB setting applied: {('replicator', 'startup_jitter'): '0', ('replicator', 'interval'): '1000'} == bench == wall=30.28s job 00 elapsed= 30.27s docs_read= 100000 docs_written= 100000 missing= 100000 ``` ``` ./rep_bench.py --ndocs 100000 --doc-size 2048 --source-url http://localhost:15984 --target-url http://localhost:25984 source: http://localhost:15984/rep_bench_source target: http://localhost:25984 (db prefix: rep_bench_target) coordinator: http://localhost:15984 poll: 2.0s http_timeout: 600s docs: 100000 doc_size: 2048 n_jobs: 1 == source == loading 100000 docs into rep_bench_source (doc_size=2048, batch=500)... loaded 100000 docs in 24.2s, size 26.9MB setting applied: {('replicator', 'startup_jitter'): '0', ('replicator', 'interval'): '1000'} == bench == wall=22.11s job 00 elapsed= 22.10s docs_read= 100000 docs_written= 100000 missing= 100000 ``` [1] https://gist.github.com/nickva/2a49f6e624208c45dc0dafdd935a4aae --- src/couch_replicator/src/couch_replicator_worker.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 528ff0f3f..441947aec 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -24,7 +24,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). +-define(DOC_BUFFER_BYTE_SIZE, 4 * 1024 * 1024). -define(STATS_DELAY_SEC, 10). -define(MISSING_DOC_RETRY_MSEC, 2000). @@ -148,9 +148,6 @@ handle_call( }, {noreply, NewState} end; -handle_call({batch_doc, Doc}, From, State) -> - gen_server:reply(From, ok), - {noreply, maybe_flush_docs(Doc, State)}; handle_call( flush, {Pid, _} = From, @@ -171,6 +168,8 @@ handle_call( end, {noreply, State2#state{flush_waiter = From}}. +handle_cast({batch_doc, Doc}, State) -> + {noreply, maybe_flush_docs(Doc, State)}; handle_cast({sum_stats, IncStats}, #state{stats = Stats} = State) -> SumStats = couch_replicator_utils:sum_stats(Stats, IncStats), {noreply, maybe_report_stats(State#state{stats = SumStats})}; @@ -314,7 +313,7 @@ queue_fetch_loop(#fetch_st{} = St) -> IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs), {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent, BgSt), BatchFun = fun({_, #doc{} = Doc}) -> - ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) + gen_server:cast(Parent, {batch_doc, Doc}) end, lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))), % Individually upload docs with attachments. @@ -461,7 +460,7 @@ remote_doc_handler({ok, #doc{atts = [_ | _]} = Doc}, Acc) -> couch_log:debug("Worker flushing doc with attachments", []), doc_handler_flush_doc(Doc, Acc); remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> - ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), + gen_server:cast(Parent, {batch_doc, Doc}), {ok, Acc}; remote_doc_handler({{not_found, missing}, _}, _Acc) -> throw(missing_doc).
