This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8b061c70551c7a4381c3568c3ca55c9a5f97ca86
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Fri Apr 19 13:05:19 2024 +0800

    [Enhancement](group commit) Add fault injection case for group commit
---
 be/src/runtime/group_commit_mgr.cpp                | 11 +++++-
 ...oup_commit_async_wal_msg_fault_injection.groovy | 40 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 6d8873602d3..38e599180e7 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -333,6 +333,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
                                                    RuntimeState* state) {
     Status st;
     Status result_status;
+    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
+                    { status = Status::InternalError(""); });
     if (status.ok()) {
         // commit txn
         TLoadTxnCommitRequest request;
@@ -368,6 +370,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
                 },
                 10000L);
         result_status = Status::create<false>(result.status);
+        DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", 
{
+            std ::string msg = "abort txn";
+            LOG(INFO) << "debug promise set: " << msg;
+            
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
+                    Status ::InternalError(msg));
+            return status;
+        });
     }
     std::shared_ptr<LoadBlockQueue> load_block_queue;
     {
@@ -392,7 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     // status: exec_plan_fragment result
     // st: commit txn rpc status
     // result_status: commit txn result
-    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
+    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st",
                     { st = Status::InternalError(""); });
     if (status.ok() && st.ok() &&
         (result_status.ok() || 
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
diff --git 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index cdf537749cc..c2523c49092 100644
--- 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -74,7 +74,7 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
     exception = false;
         try {
             
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
-            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
             streamLoad {
                 table "${tableName}"
                 set 'column_separator', ','
@@ -88,6 +88,44 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
             logger.info(e.getMessage())
             assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes'))
             exception = true;
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
+            assertTrue(exception)
+        }
+
+        // test group commit abort txn
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    exception = false;
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', ','
+                set 'group_commit', 'async_mode'
+                unset 'label'
+                file 'group_commit_wal_msg.csv'
+                time 10000 
+            }
+            assertFalse(true);
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains('abort txn'))
+            exception = true;
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
             
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to