This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new eb88b2792 fix(bulkload): Sometime ingest will hang when encountering 
write throttling (#2157)
eb88b2792 is described below

commit eb88b27923aa11c644268f16236265eed525678d
Author: Pengfan Lu <[email protected]>
AuthorDate: Fri Dec 13 11:29:09 2024 +0800

    fix(bulkload): Sometime ingest will hang when encountering write throttling 
(#2157)
    
    Resolve issue https://github.com/apache/incubator-pegasus/issues/2156.
    
    Due to the flag _is_bulk_load_ingestion allowing only one 
RPC_RRDB_RRDB_BULK_LOAD RPC call, the throttling_controller::DELAY recall of 
on_client_write cannot execute RPC_RRDB_RRDB_BULK_LOAD again.
---
 src/replica/replica_2pc.cpp | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 884c5c4dc..f0c108b7f 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -188,6 +188,16 @@ void replica::on_client_write(dsn::message_ex *request, 
bool ignore_throttling)
         return;
     }
 
+    if (static_cast<int32_t>(_primary_states.pc.hp_secondaries.size()) + 1 <
+        
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
+        response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
+        return;
+    }
+
+    if (!ignore_throttling && throttle_write_request(request)) {
+        return;
+    }
+
     if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
         auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status();
         if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED &&
@@ -209,19 +219,9 @@ void replica::on_client_write(dsn::message_ex *request, 
bool ignore_throttling)
         _bulk_load_ingestion_start_time_ms = dsn_now_ms();
     }
 
-    if (static_cast<int32_t>(_primary_states.pc.hp_secondaries.size()) + 1 <
-        
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
-        response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
-        return;
-    }
-
-    if (!ignore_throttling && throttle_write_request(request)) {
-        return;
-    }
-
     LOG_DEBUG_PREFIX("got write request from {}", 
request->header->from_address);
     auto mu = _primary_states.write_queue.add_work(request->rpc_code(), 
request, this);
-    if (mu) {
+    if (mu != nullptr) {
         init_prepare(mu, false);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to