This is an automated email from the ASF dual-hosted git repository.
nickva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 169643d9d Optimize replicator
169643d9d is described below
commit 169643d9dd23f381c7818324001b651861f6fd13
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue May 5 01:47:23 2026 -0400
Optimize replicator
There are two related optimizations:
Use gen casts to queue docs for _bulk_docs in replication workers instead of
gen calls.
Previously, queue_fetch loop from the worker used a gen_sever call to add
each
doc to the parent's (worker's) batch. Even though it was an immediate
gen_server return, it was still synchronous call, and if the worker was in
the
middle of a _bulk_docs flush, it couldn't reply to the queue_fetch_loop and
stall it for that moment. The optimization is to use casts instead. This
way,
the fetch loop can continue fetching (running bulk_gets) without having to
periodically wait for individual batch_doc calls. The bulk_docs pending
queue
is still properly bounded, as at the end of the fetch loop there is a
`gen_server:call(Parent, flush, infinity)` call which will properly
backpropagate the pressure for a slow target
The second optimization is to increase the bulk_docs worker memory limit a
bit from
500KB to 4MB. Previously 500KB always restricted _bulk_docs batch size to
always be less than 500KB regardless what the user set as the replicator
worker
batch size. This way we'll still limit the maximum batch size at 4MB, but
since
it's a higher limit and user's replicator batch size will have a wider range
effective range to take effect (in other words if some users want to set a
batch size of 2500, they can now)
A quick benchmark with a script [1] replicating 100k 2KB docs shows a local
speedup of 32 -> 22 seconds
```
./rep_bench.py --ndocs 100000 --doc-size 2048 --source-url
http://localhost:15984 --target-url http://localhost:25984
source: http://localhost:15984/rep_bench_source
target: http://localhost:25984 (db prefix: rep_bench_target)
coordinator: http://localhost:15984 poll: 2.0s http_timeout: 600s
docs: 100000 doc_size: 2048 n_jobs: 1
== source ==
loading 100000 docs into rep_bench_source (doc_size=2048, batch=500)...
loaded 100000 docs in 23.8s, size 27.0MB
setting applied: {('replicator', 'startup_jitter'): '0', ('replicator',
'interval'): '1000'}
== bench ==
wall=30.28s
job 00 elapsed= 30.27s docs_read= 100000 docs_written= 100000
missing= 100000
```
```
./rep_bench.py --ndocs 100000 --doc-size 2048 --source-url
http://localhost:15984 --target-url http://localhost:25984
source: http://localhost:15984/rep_bench_source
target: http://localhost:25984 (db prefix: rep_bench_target)
coordinator: http://localhost:15984 poll: 2.0s http_timeout: 600s
docs: 100000 doc_size: 2048 n_jobs: 1
== source ==
loading 100000 docs into rep_bench_source (doc_size=2048, batch=500)...
loaded 100000 docs in 24.2s, size 26.9MB
setting applied: {('replicator', 'startup_jitter'): '0', ('replicator',
'interval'): '1000'}
== bench ==
wall=22.11s
job 00 elapsed= 22.10s docs_read= 100000 docs_written= 100000
missing= 100000
```
[1] https://gist.github.com/nickva/2a49f6e624208c45dc0dafdd935a4aae
---
src/couch_replicator/src/couch_replicator_worker.erl | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl
b/src/couch_replicator/src/couch_replicator_worker.erl
index 528ff0f3f..441947aec 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -24,7 +24,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).
+-define(DOC_BUFFER_BYTE_SIZE, 4 * 1024 * 1024).
-define(STATS_DELAY_SEC, 10).
-define(MISSING_DOC_RETRY_MSEC, 2000).
@@ -148,9 +148,6 @@ handle_call(
},
{noreply, NewState}
end;
-handle_call({batch_doc, Doc}, From, State) ->
- gen_server:reply(From, ok),
- {noreply, maybe_flush_docs(Doc, State)};
handle_call(
flush,
{Pid, _} = From,
@@ -171,6 +168,8 @@ handle_call(
end,
{noreply, State2#state{flush_waiter = From}}.
+handle_cast({batch_doc, Doc}, State) ->
+ {noreply, maybe_flush_docs(Doc, State)};
handle_cast({sum_stats, IncStats}, #state{stats = Stats} = State) ->
SumStats = couch_replicator_utils:sum_stats(Stats, IncStats),
{noreply, maybe_report_stats(State#state{stats = SumStats})};
@@ -314,7 +313,7 @@ queue_fetch_loop(#fetch_st{} = St) ->
IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs),
{Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent,
BgSt),
BatchFun = fun({_, #doc{} = Doc}) ->
- ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+ gen_server:cast(Parent, {batch_doc, Doc})
end,
lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
% Individually upload docs with attachments.
@@ -461,7 +460,7 @@ remote_doc_handler({ok, #doc{atts = [_ | _]} = Doc}, Acc) ->
couch_log:debug("Worker flushing doc with attachments", []),
doc_handler_flush_doc(Doc, Acc);
remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
- ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+ gen_server:cast(Parent, {batch_doc, Doc}),
{ok, Acc};
remote_doc_handler({{not_found, missing}, _}, _Acc) ->
throw(missing_doc).