This is an automated email from the ASF dual-hosted git repository. ronny pushed a commit to branch nouveau4win in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 10ba2012382ae3ed1a58fa8974c3c9678391fd34 Author: Jan Lehnardt <[email protected]> AuthorDate: Wed Jul 19 14:32:03 2023 +0200 feat: make rexi timeouts and replication batches for shard split topoff configurable --- rel/overlay/etc/default.ini | 2 ++ src/mem3/src/mem3_rep.erl | 41 ++++++++++++++++++++++++++------------- src/mem3/src/mem3_reshard_job.erl | 13 +++++++++++-- src/mem3/src/mem3_rpc.erl | 3 +++ 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 2903e7603..bb440e0aa 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -373,6 +373,8 @@ hash_algorithms = sha256, sha ;buffer_count = 2000 ;server_per_node = true ;stream_limit = 5 +;shard_split_timeout_msec = 600000 +;shard_split_topoff_batch_size = 500 ; Use a single message to kill a group of remote workers. This feature is ; available starting with 3.0. When performing a rolling upgrade from 2.x to diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index b452fd2fa..ed79e69cb 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -41,7 +41,8 @@ filter, db, hashfun, - incomplete_ranges + incomplete_ranges, + rexi_timeout }). -record(tgt, { @@ -54,6 +55,8 @@ remaining = 0 }). +-define(DEFAULT_REXI_TIMEOUT, 600000). + go(Source, Target) -> go(Source, Target, []). @@ -90,6 +93,11 @@ go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 -> "incomplete_ranges", false ), + RexiTimeout = + case proplists:get_value(rexi_timeout, Opts) of + T when is_integer(T), T > 0 -> T; + _ -> ?DEFAULT_REXI_TIMEOUT + end, Filter = proplists:get_value(filter, Opts), Acc = #acc{ batch_size = BatchSize, @@ -97,7 +105,8 @@ go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 -> source = Source, targets = Targets, filter = Filter, - incomplete_ranges = IncompleteRanges + incomplete_ranges = IncompleteRanges, + rexi_timeout = RexiTimeout }, go(Acc); false -> @@ -606,16 +615,16 @@ changes_append_fdi( ) end. -replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db} = Acc) -> +replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db, rexi_timeout = Timeout} = Acc) -> Targets = maps:map( fun(_, #tgt{} = T) -> - replicate_batch(T, Db, Seq) + replicate_batch(T, Db, Seq, Timeout) end, Targets0 ), {ok, Acc#acc{targets = Targets, revcount = 0}}. -replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) -> +replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq, Timeout) -> #shard{node = Node, name = Name} = TgtShard, case find_missing_revs(Target) of [] -> @@ -624,7 +633,7 @@ replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) -> lists:map( fun(Chunk) -> Docs = open_docs(Db, Infos, Chunk), - ok = save_on_target(Node, Name, Docs) + ok = save_on_target(Node, Name, Docs, Timeout) end, chunk_revs(Missing) ) @@ -694,13 +703,19 @@ open_docs(Db, Infos, Missing) -> Missing ). -save_on_target(Node, Name, Docs) -> - mem3_rpc:update_docs(Node, Name, Docs, [ - ?REPLICATED_CHANGES, - full_commit, - ?ADMIN_CTX, - {io_priority, {internal_repl, Name}} - ]), +save_on_target(Node, Name, Docs, Timeout) -> + mem3_rpc:update_docs( + Node, + Name, + Docs, + [ + ?REPLICATED_CHANGES, + full_commit, + ?ADMIN_CTX, + {io_priority, {internal_repl, Name}} + ], + Timeout + ), ok. purge_on_target(Node, Name, PurgeInfos) -> diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl index 4c93bf98f..aa5b028d1 100644 --- a/src/mem3/src/mem3_reshard_job.erl +++ b/src/mem3/src/mem3_reshard_job.erl @@ -49,7 +49,8 @@ -include("mem3_reshard.hrl"). % Batch size for internal replication topoffs --define(INTERNAL_REP_BATCH_SIZE, 2000). +-define(INTERNAL_REP_BATCH_SIZE, 500). +-define(DEFAULT_REXI_TIMEOUT, 600000). % The list of possible job states. The order of this % list is important as a job will progress linearly @@ -406,8 +407,16 @@ topoff_impl(#job{source = #shard{} = Source, target = Targets}) -> couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]), check_source_exists(Source, topoff), check_targets_exist(Targets, topoff), + Timeout = config:get_integer("rexi", "shard_split_timeout_msec", ?DEFAULT_REXI_TIMEOUT), + BatchSize = config:get_integer( + "rexi", "shard_split_topoff_batch_size", ?INTERNAL_REP_BATCH_SIZE + ), TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]), - Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}], + Opts = [ + {batch_size, BatchSize}, + {batch_count, all}, + {rexi_timeout, Timeout} + ], case mem3_rep:go(Source, TMap, Opts) of {ok, Count} -> Args = [?MODULE, shardsstr(Source, Targets), Count], diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 468bdee21..60c24e1d4 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -18,6 +18,7 @@ find_common_seq/4, get_missing_revs/4, update_docs/4, + update_docs/5, pull_replication/1, load_checkpoint/4, load_checkpoint/5, @@ -61,6 +62,8 @@ get_missing_revs(Node, DbName, IdsRevs, Options) -> update_docs(Node, DbName, Docs, Options) -> rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}). +update_docs(Node, DbName, Docs, Options, Timeout) -> + rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}, Timeout). load_checkpoint(Node, DbName, SourceNode, SourceUUID, <<>>) -> % Upgrade clause for a mixed cluster for old nodes that don't have
