This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8b061c70551c7a4381c3568c3ca55c9a5f97ca86 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Fri Apr 19 13:05:19 2024 +0800 [Enhancement](group commit) Add fault injection case for group commit --- be/src/runtime/group_commit_mgr.cpp | 11 +++++- ...oup_commit_async_wal_msg_fault_injection.groovy | 40 +++++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 6d8873602d3..38e599180e7 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -333,6 +333,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ RuntimeState* state) { Status st; Status result_status; + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", + { status = Status::InternalError(""); }); if (status.ok()) { // commit txn TLoadTxnCommitRequest request; @@ -368,6 +370,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ }, 10000L); result_status = Status::create<false>(result.status); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { + std ::string msg = "abort txn"; + LOG(INFO) << "debug promise set: " << msg; + ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( + Status ::InternalError(msg)); + return status; + }); } std::shared_ptr<LoadBlockQueue> load_block_queue; { @@ -392,7 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ // status: exec_plan_fragment result // st: commit txn rpc status // result_status: commit txn result - DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st", { st = Status::InternalError(""); }); if (status.ok() && st.ok() && (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) { diff --git a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy index cdf537749cc..c2523c49092 100644 --- a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy @@ -74,7 +74,7 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { exception = false; try { GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") - GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status") + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st") streamLoad { table "${tableName}" set 'column_separator', ',' @@ -88,6 +88,44 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { logger.info(e.getMessage()) assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes')) exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st") + assertTrue(exception) + } + + // test group commit abort txn + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + assertFalse(true); + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('abort txn')) + exception = true; } finally { GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org