This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5ece07ab8c9ae4b1763aa792743a969579cb1529 Author: yiguolei <[email protected]> AuthorDate: Tue May 14 14:51:51 2024 +0800 [faultinjection](test) add some fault injection in pipeline task method --- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 26 +++++++++++++++++++++++--- be/src/util/debug_points.h | 3 ++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index af1e3912eb4..2f1abf47901 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -77,7 +77,10 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - + DBUG_EXECUTE_IF("fault_inject::PipelineXTask::prepare", { + Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task prepare failed"); + return status; + }); { // set sink local state LocalSinkStateInfo info {_task_idx, @@ -131,6 +134,11 @@ Status PipelineXTask::_extract_dependencies() { _finish_dependencies.push_back(fin_dep); } } + DBUG_EXECUTE_IF("fault_inject::PipelineXTask::_extract_dependencies", { + Status status = Status::Error<INTERNAL_ERROR>( + "fault_inject pipeline_task _extract_dependencies failed"); + return status; + }); { auto* local_state = _state->get_sink_local_state(); _write_dependencies = local_state->dependencies(); @@ -195,6 +203,11 @@ Status PipelineXTask::_open() { RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); RETURN_IF_ERROR(_extract_dependencies()); _block = doris::vectorized::Block::create_unique(); + + DBUG_EXECUTE_IF("fault_inject::PipelineXTask::open", { + Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task open failed"); + return status; + }); _opened = true; return Status::OK(); } @@ -204,7 +217,10 @@ Status PipelineXTask::execute(bool* eos) { SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; - + DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", { + Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed"); + return status; + }); ThreadCpuStopWatch cpu_time_stop_watch; cpu_time_stop_watch.start(); Defer defer {[&]() { @@ -275,7 +291,11 @@ Status PipelineXTask::execute(bool* eos) { RETURN_IF_ERROR(_sink->revoke_memory(_state)); continue; } - + DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { + Status status = + Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); + return status; + }); // Pull block from operator chain if (!_dry_run) { SCOPED_TIMER(_get_block_timer); diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 0c1607907e6..8c1fe308012 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -32,12 +32,13 @@ #include "fmt/format.h" // more usage can see 'util/debug_points_test.cpp' +// using {} around code, to avoid duplicate variable name #define DBUG_EXECUTE_IF(debug_point_name, code) \ if (UNLIKELY(config::enable_debug_points)) { \ auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ if (dp) { \ [[maybe_unused]] auto DP_NAME = debug_point_name; \ - code; \ + { code; } \ } \ } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
