This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 1091da5fb fix(duplication): deal with bulkload dup crash (#2101)
1091da5fb is described below

commit 1091da5fbec8ae383fc475f0a7ede12574c3df08
Author: ninsmiracle <[email protected]>
AuthorDate: Fri Jan 3 11:18:58 2025 +0800

    fix(duplication): deal with bulkload dup crash (#2101)
    
    https://github.com/apache/incubator-pegasus/issues/2100.
    
    Make replica server skip `RPC_RRDB_RRDB_BULK_LOAD` code when reply the plog.
---
 src/server/pegasus_mutation_duplicator.cpp | 39 +++++++++++++++++++++---------
 1 file changed, 27 insertions(+), 12 deletions(-)

diff --git a/src/server/pegasus_mutation_duplicator.cpp 
b/src/server/pegasus_mutation_duplicator.cpp
index 091d1247a..f31117435 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -35,6 +35,7 @@
 #include "common/common.h"
 #include "common/duplication_common.h"
 #include "duplication_internal_types.h"
+#include "gutil/map_util.h"
 #include "pegasus/client.h"
 #include "pegasus_key_schema.h"
 #include "rpc/rpc_message.h"
@@ -238,6 +239,13 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
     auto batch_request = std::make_unique<dsn::apps::duplicate_request>();
     uint batch_count = 0;
     uint batch_bytes = 0;
+    // The rpc codes should be ignored:
+    // - RPC_RRDB_RRDB_DUPLICATE: Now not supports duplicating the deuplicate 
mutations to the
+    // remote cluster.
+    // - RPC_RRDB_RRDB_BULK_LOAD: Now not supports the control flow RPC.
+    const static std::set<int> ingnored_rpc_code = 
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
+                                                    
dsn::apps::RPC_RRDB_RRDB_BULK_LOAD};
+
     for (auto mut : muts) {
         // mut: 0=timestamp, 1=rpc_code, 2=raw_message
         batch_count++;
@@ -245,21 +253,28 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
         dsn::blob raw_message = std::get<2>(mut);
         auto dreq = std::make_unique<dsn::apps::duplicate_request>();
 
-        if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
-            // ignore if it is a DUPLICATE
-            // Because DUPLICATE comes from other clusters should not be 
forwarded to any other
-            // destinations. A DUPLICATE is meant to be targeting only one 
cluster.
+        if (gutil::ContainsKey(ingnored_rpc_code, rpc_code)) {
+            // It it do not recommend to use bulkload and normal writing in 
the same app,
+            // it may also cause inconsistency between actual data and 
expected data
+            // And duplication will not dup the data of bulkload to backup 
clusters,
+            // if you want to force use it, you can permit this risk in you 
own way on the clusters
+            // you maintenance. For example, you can do bulkload both on 
master-clusters and
+            // backup-cluster (with duplication enable) at the same time, but 
this will inevitably
+            // cause data inconsistency problems.
+            if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
+                LOG_DEBUG_PREFIX("Ignore sending bulkload rpc when doing 
duplication");
+            }
             continue;
-        } else {
-            dsn::apps::duplicate_entry entry;
-            entry.__set_raw_message(raw_message);
-            entry.__set_task_code(rpc_code);
-            entry.__set_timestamp(std::get<0>(mut));
-            
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
-            batch_request->entries.emplace_back(std::move(entry));
-            batch_bytes += raw_message.length();
         }
 
+        dsn::apps::duplicate_entry entry;
+        entry.__set_raw_message(raw_message);
+        entry.__set_task_code(rpc_code);
+        entry.__set_timestamp(std::get<0>(mut));
+        entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
+        batch_request->entries.emplace_back(std::move(entry));
+        batch_bytes += raw_message.length();
+
         if (batch_count == muts.size() || batch_bytes >= 
FLAGS_duplicate_log_batch_bytes ||
             batch_bytes >= dsn::replication::FLAGS_dup_max_allowed_write_size) 
{
             // since all the plog's mutations of replica belong to same gpid 
though the hash of


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to