This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new cb9a1d33c feat(duplication): make the task code for incremental 
loading from private logs configurable (#2184)
cb9a1d33c is described below

commit cb9a1d33c5929971ee2ae7b6483b04369f24532d
Author: ninsmiracle <[email protected]>
AuthorDate: Fri Mar 7 11:25:11 2025 +0800

    feat(duplication): make the task code for incremental loading from private 
logs configurable (#2184)
    
    https://github.com/apache/incubator-pegasus/issues/2183
    
    We can make the task code configurable, allowing the thread priority 
incremental
    loading from private logs of to be adjusted from **LOW** to **COMMON**, 
thereby
    enabling support for low-latency real-time duplication.
---
 src/replica/duplication/replica_duplicator.cpp | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/src/replica/duplication/replica_duplicator.cpp 
b/src/replica/duplication/replica_duplicator.cpp
index d45626e52..1029e658b 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -35,8 +35,10 @@
 #include "load_from_private_log.h"
 #include "replica/mutation_log.h"
 #include "replica/replica.h"
+#include "task/task_code.h"
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 
 METRIC_DEFINE_counter(replica,
@@ -44,8 +46,15 @@ METRIC_DEFINE_counter(replica,
                       dsn::metric_unit::kMutations,
                       "The number of confirmed mutations for dup");
 
-namespace dsn {
-namespace replication {
+DSN_DEFINE_string(
+    replication,
+    dup_load_plog_task,
+    "LPC_REPLICATION_LONG_LOW",
+    "The task code for incremental loading from private logs while 
duplicating. Tasks with "
+    "TASK_PRIORITY_HIGH are not recommended.");
+DSN_TAG_VARIABLE(dup_load_plog_task, FT_MUTABLE);
+
+namespace dsn::replication {
 
 replica_duplicator::replica_duplicator(const duplication_entry &ent, replica 
*r)
     : replica_base(r),
@@ -161,7 +170,13 @@ void replica_duplicator::start_dup_log()
     _load = std::make_unique<load_mutation>(this, _replica, 
_load_private.get());
 
     from(*_load).link(*_ship).link(*_load);
-    fork(*_load_private, LPC_REPLICATION_LONG_LOW, 0).link(*_ship);
+    auto dup_load_plog_task = 
dsn::task_code::try_get(FLAGS_dup_load_plog_task, TASK_CODE_INVALID);
+    if (dup_load_plog_task == TASK_CODE_INVALID) {
+        dup_load_plog_task = LPC_REPLICATION_LONG_LOW;
+        LOG_ERROR_PREFIX("invalid dup_load_plog_task ({}), set it to 
LPC_REPLICATION_LONG_LOW",
+                         FLAGS_dup_load_plog_task);
+    }
+    fork(*_load_private, dup_load_plog_task, 0).link(*_ship);
 
     run_pipeline();
 }
@@ -317,5 +332,4 @@ void replica_duplicator::set_duplication_plog_checking(bool 
checking)
     _replica->set_duplication_plog_checking(checking);
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to