This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new cb9a1d33c feat(duplication): make the task code for incremental
loading from private logs configurable (#2184)
cb9a1d33c is described below
commit cb9a1d33c5929971ee2ae7b6483b04369f24532d
Author: ninsmiracle <[email protected]>
AuthorDate: Fri Mar 7 11:25:11 2025 +0800
feat(duplication): make the task code for incremental loading from private
logs configurable (#2184)
https://github.com/apache/incubator-pegasus/issues/2183
We can make the task code configurable, allowing the thread priority
incremental
loading from private logs of to be adjusted from **LOW** to **COMMON**,
thereby
enabling support for low-latency real-time duplication.
---
src/replica/duplication/replica_duplicator.cpp | 24 +++++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/src/replica/duplication/replica_duplicator.cpp
b/src/replica/duplication/replica_duplicator.cpp
index d45626e52..1029e658b 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -35,8 +35,10 @@
#include "load_from_private_log.h"
#include "replica/mutation_log.h"
#include "replica/replica.h"
+#include "task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
METRIC_DEFINE_counter(replica,
@@ -44,8 +46,15 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kMutations,
"The number of confirmed mutations for dup");
-namespace dsn {
-namespace replication {
+DSN_DEFINE_string(
+ replication,
+ dup_load_plog_task,
+ "LPC_REPLICATION_LONG_LOW",
+ "The task code for incremental loading from private logs while
duplicating. Tasks with "
+ "TASK_PRIORITY_HIGH are not recommended.");
+DSN_TAG_VARIABLE(dup_load_plog_task, FT_MUTABLE);
+
+namespace dsn::replication {
replica_duplicator::replica_duplicator(const duplication_entry &ent, replica
*r)
: replica_base(r),
@@ -161,7 +170,13 @@ void replica_duplicator::start_dup_log()
_load = std::make_unique<load_mutation>(this, _replica,
_load_private.get());
from(*_load).link(*_ship).link(*_load);
- fork(*_load_private, LPC_REPLICATION_LONG_LOW, 0).link(*_ship);
+ auto dup_load_plog_task =
dsn::task_code::try_get(FLAGS_dup_load_plog_task, TASK_CODE_INVALID);
+ if (dup_load_plog_task == TASK_CODE_INVALID) {
+ dup_load_plog_task = LPC_REPLICATION_LONG_LOW;
+ LOG_ERROR_PREFIX("invalid dup_load_plog_task ({}), set it to
LPC_REPLICATION_LONG_LOW",
+ FLAGS_dup_load_plog_task);
+ }
+ fork(*_load_private, dup_load_plog_task, 0).link(*_ship);
run_pipeline();
}
@@ -317,5 +332,4 @@ void replica_duplicator::set_duplication_plog_checking(bool
checking)
_replica->set_duplication_plog_checking(checking);
}
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]