This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0da94e22b2 [refactor](streamload) refactor stream load executor
(#25615)
d0da94e22b2 is described below
commit d0da94e22b20afdaaf9fba1316dce70a6060d95d
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Oct 23 14:34:26 2023 +0800
[refactor](streamload) refactor stream load executor (#25615)
---
.../runtime/stream_load/stream_load_executor.cpp | 226 +++++++--------------
1 file changed, 74 insertions(+), 152 deletions(-)
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 32e4d76dc7c..3dd39e56f65 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -71,161 +71,83 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id="
<< ctx->txn_id
<< ", query_id=" <<
print_id(ctx->put_result.params.params.query_id);
Status st;
+ auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
+ if (ctx->group_commit) {
+ ctx->label = state->import_label();
+ ctx->txn_id = state->wal_id();
+ }
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+ ctx->commit_infos = std::move(state->tablet_commit_infos());
+ if (status->ok()) {
+ ctx->number_total_rows = state->num_rows_load_total();
+ ctx->number_loaded_rows = state->num_rows_load_success();
+ ctx->number_filtered_rows = state->num_rows_load_filtered();
+ ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+ int64_t num_selected_rows = ctx->number_total_rows -
ctx->number_unselected_rows;
+ if (!ctx->group_commit && num_selected_rows > 0 &&
+ (double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
+ // NOTE: Do not modify the error message here, for historical
reasons,
+ // some users may rely on this error message.
+ *status = Status::InternalError("too many filtered rows");
+ }
+ if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
+ ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
+ }
+
+ if (status->ok()) {
+
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
+ DorisMetrics::instance()->stream_load_rows_total->increment(
+ ctx->number_loaded_rows);
+ }
+ } else {
+ LOG(WARNING) << "fragment execute failed"
+ << ", query_id=" <<
UniqueId(ctx->put_result.params.params.query_id)
+ << ", err_msg=" << status->to_string() << ", " <<
ctx->brief();
+ // cancel body_sink, make sender known it
+ if (ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(status->to_string());
+ }
+
+ switch (ctx->load_src_type) {
+ // reset the stream load ctx's kafka commit offset
+ case TLoadSourceType::KAFKA:
+ ctx->kafka_info->reset_offset();
+ break;
+ default:
+ break;
+ }
+ }
+ ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
+ ctx->promise.set_value(*status);
+
+ if (!status->ok() && ctx->body_sink != nullptr) {
+ // In some cases, the load execution is exited early.
+ // For example, when max_filter_ratio is 0 and illegal data is
encountered
+ // during stream loading, the entire load process is terminated
early.
+ // However, the http connection may still be sending data to
stream_load_pipe
+ // and waiting for it to be consumed.
+ // Therefore, we need to actively cancel to end the pipe.
+ ctx->body_sink->cancel(status->to_string());
+ }
+
+ if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+ if (ctx->body_sink->cancelled() || !status->ok()) {
+ ctx->status = *status;
+ this->rollback_txn(ctx.get());
+ } else {
+ static_cast<void>(this->commit_txn(ctx.get()));
+ }
+ }
+ };
+
if (ctx->put_result.__isset.params) {
- st = _exec_env->fragment_mgr()->exec_plan_fragment(
- ctx->put_result.params, [ctx, this](RuntimeState* state,
Status* status) {
- if (ctx->group_commit) {
- ctx->label = state->import_label();
- ctx->txn_id = state->wal_id();
- }
- ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
- ctx->commit_infos =
std::move(state->tablet_commit_infos());
- if (status->ok()) {
- ctx->number_total_rows = state->num_rows_load_total();
- ctx->number_loaded_rows =
state->num_rows_load_success();
- ctx->number_filtered_rows =
state->num_rows_load_filtered();
- ctx->number_unselected_rows =
state->num_rows_load_unselected();
-
- int64_t num_selected_rows =
- ctx->number_total_rows -
ctx->number_unselected_rows;
- if (!ctx->group_commit && num_selected_rows > 0 &&
- (double)ctx->number_filtered_rows /
num_selected_rows >
- ctx->max_filter_ratio) {
- // NOTE: Do not modify the error message here, for
historical reasons,
- // some users may rely on this error message.
- *status = Status::InternalError("too many filtered
rows");
- }
- if (ctx->number_filtered_rows > 0 &&
- !state->get_error_log_file_path().empty()) {
- ctx->error_url =
-
to_load_error_http_path(state->get_error_log_file_path());
- }
-
- if (status->ok()) {
-
DorisMetrics::instance()->stream_receive_bytes_total->increment(
- ctx->receive_bytes);
-
DorisMetrics::instance()->stream_load_rows_total->increment(
- ctx->number_loaded_rows);
- }
- } else {
- LOG(WARNING)
- << "fragment execute failed"
- << ", query_id=" <<
UniqueId(ctx->put_result.params.params.query_id)
- << ", err_msg=" << status->to_string() << ", "
<< ctx->brief();
- // cancel body_sink, make sender known it
- if (ctx->body_sink != nullptr) {
- ctx->body_sink->cancel(status->to_string());
- }
-
- switch (ctx->load_src_type) {
- // reset the stream load ctx's kafka commit offset
- case TLoadSourceType::KAFKA:
- ctx->kafka_info->reset_offset();
- break;
- default:
- break;
- }
- }
- ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
-
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal
data is encountered
- // during stream loading, the entire load process is
terminated early.
- // However, the http connection may still be sending
data to stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the
pipe.
- ctx->body_sink->cancel(status->to_string());
- }
-
- if (ctx->need_commit_self && ctx->body_sink != nullptr) {
- if (ctx->body_sink->cancelled() || !status->ok()) {
- ctx->status = *status;
- this->rollback_txn(ctx.get());
- } else {
- static_cast<void>(this->commit_txn(ctx.get()));
- }
- }
- });
+ st =
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
exec_fragment);
} else {
- st = _exec_env->fragment_mgr()->exec_plan_fragment(
- ctx->put_result.pipeline_params, [ctx, this](RuntimeState*
state, Status* status) {
- if (ctx->group_commit) {
- ctx->label = state->import_label();
- ctx->txn_id = state->wal_id();
- }
- ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
- ctx->commit_infos =
std::move(state->tablet_commit_infos());
- if (status->ok()) {
- ctx->number_total_rows = state->num_rows_load_total();
- ctx->number_loaded_rows =
state->num_rows_load_success();
- ctx->number_filtered_rows =
state->num_rows_load_filtered();
- ctx->number_unselected_rows =
state->num_rows_load_unselected();
-
- int64_t num_selected_rows =
- ctx->number_total_rows -
ctx->number_unselected_rows;
- if (!ctx->group_commit && num_selected_rows > 0 &&
- (double)ctx->number_filtered_rows /
num_selected_rows >
- ctx->max_filter_ratio) {
- // NOTE: Do not modify the error message here, for
historical reasons,
- // some users may rely on this error message.
- *status = Status::InternalError("too many filtered
rows");
- }
- if (ctx->number_filtered_rows > 0 &&
- !state->get_error_log_file_path().empty()) {
- ctx->error_url =
-
to_load_error_http_path(state->get_error_log_file_path());
- }
-
- if (status->ok()) {
-
DorisMetrics::instance()->stream_receive_bytes_total->increment(
- ctx->receive_bytes);
-
DorisMetrics::instance()->stream_load_rows_total->increment(
- ctx->number_loaded_rows);
- }
- } else {
- LOG(WARNING)
- << "fragment execute failed"
- << ", query_id=" <<
UniqueId(ctx->put_result.params.params.query_id)
- << ", err_msg=" << status->to_string() << ", "
<< ctx->brief();
- // cancel body_sink, make sender known it
- if (ctx->body_sink != nullptr) {
- ctx->body_sink->cancel(status->to_string());
- }
-
- switch (ctx->load_src_type) {
- // reset the stream load ctx's kafka commit offset
- case TLoadSourceType::KAFKA:
- ctx->kafka_info->reset_offset();
- break;
- default:
- break;
- }
- }
- ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
-
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal
data is encountered
- // during stream loading, the entire load process is
terminated early.
- // However, the http connection may still be sending
data to stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the
pipe.
- ctx->body_sink->cancel(status->to_string());
- }
-
- if (ctx->need_commit_self && ctx->body_sink != nullptr) {
- if (ctx->body_sink->cancelled() || !status->ok()) {
- ctx->status = *status;
- this->rollback_txn(ctx.get());
- } else {
- static_cast<void>(this->commit_txn(ctx.get()));
- }
- }
- });
+ st =
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
+ exec_fragment);
}
+
if (!st.ok()) {
// no need to check unref's return value
return st;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]