This is an automated email from the ASF dual-hosted git repository.

ronny pushed a commit to branch nouveau4win
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 10ba2012382ae3ed1a58fa8974c3c9678391fd34
Author: Jan Lehnardt <[email protected]>
AuthorDate: Wed Jul 19 14:32:03 2023 +0200

    feat: make rexi timeouts and replication batches for shard split topoff 
configurable
---
 rel/overlay/etc/default.ini       |  2 ++
 src/mem3/src/mem3_rep.erl         | 41 ++++++++++++++++++++++++++-------------
 src/mem3/src/mem3_reshard_job.erl | 13 +++++++++++--
 src/mem3/src/mem3_rpc.erl         |  3 +++
 4 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 2903e7603..bb440e0aa 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -373,6 +373,8 @@ hash_algorithms = sha256, sha
 ;buffer_count = 2000
 ;server_per_node = true
 ;stream_limit = 5
+;shard_split_timeout_msec = 600000
+;shard_split_topoff_batch_size = 500
 
 ; Use a single message to kill a group of remote workers. This feature is
 ; available starting with 3.0. When performing a rolling upgrade from 2.x to
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index b452fd2fa..ed79e69cb 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -41,7 +41,8 @@
     filter,
     db,
     hashfun,
-    incomplete_ranges
+    incomplete_ranges,
+    rexi_timeout
 }).
 
 -record(tgt, {
@@ -54,6 +55,8 @@
     remaining = 0
 }).
 
+-define(DEFAULT_REXI_TIMEOUT, 600000).
+
 go(Source, Target) ->
     go(Source, Target, []).
 
@@ -90,6 +93,11 @@ go(#shard{} = Source, #{} = Targets0, Opts) when 
map_size(Targets0) > 0 ->
                 "incomplete_ranges",
                 false
             ),
+            RexiTimeout =
+                case proplists:get_value(rexi_timeout, Opts) of
+                    T when is_integer(T), T > 0 -> T;
+                    _ -> ?DEFAULT_REXI_TIMEOUT
+                end,
             Filter = proplists:get_value(filter, Opts),
             Acc = #acc{
                 batch_size = BatchSize,
@@ -97,7 +105,8 @@ go(#shard{} = Source, #{} = Targets0, Opts) when 
map_size(Targets0) > 0 ->
                 source = Source,
                 targets = Targets,
                 filter = Filter,
-                incomplete_ranges = IncompleteRanges
+                incomplete_ranges = IncompleteRanges,
+                rexi_timeout = RexiTimeout
             },
             go(Acc);
         false ->
@@ -606,16 +615,16 @@ changes_append_fdi(
             )
     end.
 
-replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db} = Acc) ->
+replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db, 
rexi_timeout = Timeout} = Acc) ->
     Targets = maps:map(
         fun(_, #tgt{} = T) ->
-            replicate_batch(T, Db, Seq)
+            replicate_batch(T, Db, Seq, Timeout)
         end,
         Targets0
     ),
     {ok, Acc#acc{targets = Targets, revcount = 0}}.
 
-replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) ->
+replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq, 
Timeout) ->
     #shard{node = Node, name = Name} = TgtShard,
     case find_missing_revs(Target) of
         [] ->
@@ -624,7 +633,7 @@ replicate_batch(#tgt{shard = TgtShard, infos = Infos} = 
Target, Db, Seq) ->
             lists:map(
                 fun(Chunk) ->
                     Docs = open_docs(Db, Infos, Chunk),
-                    ok = save_on_target(Node, Name, Docs)
+                    ok = save_on_target(Node, Name, Docs, Timeout)
                 end,
                 chunk_revs(Missing)
             )
@@ -694,13 +703,19 @@ open_docs(Db, Infos, Missing) ->
         Missing
     ).
 
-save_on_target(Node, Name, Docs) ->
-    mem3_rpc:update_docs(Node, Name, Docs, [
-        ?REPLICATED_CHANGES,
-        full_commit,
-        ?ADMIN_CTX,
-        {io_priority, {internal_repl, Name}}
-    ]),
+save_on_target(Node, Name, Docs, Timeout) ->
+    mem3_rpc:update_docs(
+        Node,
+        Name,
+        Docs,
+        [
+            ?REPLICATED_CHANGES,
+            full_commit,
+            ?ADMIN_CTX,
+            {io_priority, {internal_repl, Name}}
+        ],
+        Timeout
+    ),
     ok.
 
 purge_on_target(Node, Name, PurgeInfos) ->
diff --git a/src/mem3/src/mem3_reshard_job.erl 
b/src/mem3/src/mem3_reshard_job.erl
index 4c93bf98f..aa5b028d1 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -49,7 +49,8 @@
 -include("mem3_reshard.hrl").
 
 % Batch size for internal replication topoffs
--define(INTERNAL_REP_BATCH_SIZE, 2000).
+-define(INTERNAL_REP_BATCH_SIZE, 500).
+-define(DEFAULT_REXI_TIMEOUT, 600000).
 
 % The list of possible job states. The order of this
 % list is important as a job will progress linearly
@@ -406,8 +407,16 @@ topoff_impl(#job{source = #shard{} = Source, target = 
Targets}) ->
     couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]),
     check_source_exists(Source, topoff),
     check_targets_exist(Targets, topoff),
+    Timeout = config:get_integer("rexi", "shard_split_timeout_msec", 
?DEFAULT_REXI_TIMEOUT),
+    BatchSize = config:get_integer(
+        "rexi", "shard_split_topoff_batch_size", ?INTERNAL_REP_BATCH_SIZE
+    ),
     TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]),
-    Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}],
+    Opts = [
+        {batch_size, BatchSize},
+        {batch_count, all},
+        {rexi_timeout, Timeout}
+    ],
     case mem3_rep:go(Source, TMap, Opts) of
         {ok, Count} ->
             Args = [?MODULE, shardsstr(Source, Targets), Count],
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 468bdee21..60c24e1d4 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -18,6 +18,7 @@
     find_common_seq/4,
     get_missing_revs/4,
     update_docs/4,
+    update_docs/5,
     pull_replication/1,
     load_checkpoint/4,
     load_checkpoint/5,
@@ -61,6 +62,8 @@ get_missing_revs(Node, DbName, IdsRevs, Options) ->
 
 update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+update_docs(Node, DbName, Docs, Options, Timeout) ->
+    rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}, 
Timeout).
 
 load_checkpoint(Node, DbName, SourceNode, SourceUUID, <<>>) ->
     % Upgrade clause for a mixed cluster for old nodes that don't have

Reply via email to