This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 1091da5fb fix(duplication): deal with bulkload dup crash (#2101)
1091da5fb is described below
commit 1091da5fbec8ae383fc475f0a7ede12574c3df08
Author: ninsmiracle <[email protected]>
AuthorDate: Fri Jan 3 11:18:58 2025 +0800
fix(duplication): deal with bulkload dup crash (#2101)
https://github.com/apache/incubator-pegasus/issues/2100.
Make replica server skip `RPC_RRDB_RRDB_BULK_LOAD` code when reply the plog.
---
src/server/pegasus_mutation_duplicator.cpp | 39 +++++++++++++++++++++---------
1 file changed, 27 insertions(+), 12 deletions(-)
diff --git a/src/server/pegasus_mutation_duplicator.cpp
b/src/server/pegasus_mutation_duplicator.cpp
index 091d1247a..f31117435 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -35,6 +35,7 @@
#include "common/common.h"
#include "common/duplication_common.h"
#include "duplication_internal_types.h"
+#include "gutil/map_util.h"
#include "pegasus/client.h"
#include "pegasus_key_schema.h"
#include "rpc/rpc_message.h"
@@ -238,6 +239,13 @@ void
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
auto batch_request = std::make_unique<dsn::apps::duplicate_request>();
uint batch_count = 0;
uint batch_bytes = 0;
+ // The rpc codes should be ignored:
+ // - RPC_RRDB_RRDB_DUPLICATE: Now not supports duplicating the deuplicate
mutations to the
+ // remote cluster.
+ // - RPC_RRDB_RRDB_BULK_LOAD: Now not supports the control flow RPC.
+ const static std::set<int> ingnored_rpc_code =
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
+
dsn::apps::RPC_RRDB_RRDB_BULK_LOAD};
+
for (auto mut : muts) {
// mut: 0=timestamp, 1=rpc_code, 2=raw_message
batch_count++;
@@ -245,21 +253,28 @@ void
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
dsn::blob raw_message = std::get<2>(mut);
auto dreq = std::make_unique<dsn::apps::duplicate_request>();
- if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
- // ignore if it is a DUPLICATE
- // Because DUPLICATE comes from other clusters should not be
forwarded to any other
- // destinations. A DUPLICATE is meant to be targeting only one
cluster.
+ if (gutil::ContainsKey(ingnored_rpc_code, rpc_code)) {
+ // It it do not recommend to use bulkload and normal writing in
the same app,
+ // it may also cause inconsistency between actual data and
expected data
+ // And duplication will not dup the data of bulkload to backup
clusters,
+ // if you want to force use it, you can permit this risk in you
own way on the clusters
+ // you maintenance. For example, you can do bulkload both on
master-clusters and
+ // backup-cluster (with duplication enable) at the same time, but
this will inevitably
+ // cause data inconsistency problems.
+ if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
+ LOG_DEBUG_PREFIX("Ignore sending bulkload rpc when doing
duplication");
+ }
continue;
- } else {
- dsn::apps::duplicate_entry entry;
- entry.__set_raw_message(raw_message);
- entry.__set_task_code(rpc_code);
- entry.__set_timestamp(std::get<0>(mut));
-
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
- batch_request->entries.emplace_back(std::move(entry));
- batch_bytes += raw_message.length();
}
+ dsn::apps::duplicate_entry entry;
+ entry.__set_raw_message(raw_message);
+ entry.__set_task_code(rpc_code);
+ entry.__set_timestamp(std::get<0>(mut));
+ entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
+ batch_request->entries.emplace_back(std::move(entry));
+ batch_bytes += raw_message.length();
+
if (batch_count == muts.size() || batch_bytes >=
FLAGS_duplicate_log_batch_bytes ||
batch_bytes >= dsn::replication::FLAGS_dup_max_allowed_write_size)
{
// since all the plog's mutations of replica belong to same gpid
though the hash of
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]