This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b3f69211b73 [refactor](wal) move group commit load content length to
runtime state (#29188)
b3f69211b73 is described below
commit b3f69211b7313208b74fd00f664b1a08a2bea6f7
Author: abmdocrt <[email protected]>
AuthorDate: Tue Jan 2 15:53:38 2024 +0800
[refactor](wal) move group commit load content length to runtime state
(#29188)
---
be/src/http/action/http_stream.cpp | 3 +-
be/src/http/action/stream_load.cpp | 3 +-
be/src/runtime/group_commit_mgr.cpp | 48 ++---------------------------
be/src/runtime/group_commit_mgr.h | 8 +----
be/src/runtime/plan_fragment_executor.cpp | 3 ++
be/src/runtime/runtime_state.h | 7 ++++-
be/src/vec/sink/group_commit_block_sink.cpp | 17 +++++++---
be/src/vec/sink/group_commit_block_sink.h | 1 +
gensrc/thrift/PaloInternalService.thrift | 2 ++
9 files changed, 31 insertions(+), 61 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index a9652707834..b97ce2976eb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -345,8 +345,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
- ctx->id.to_thrift(), content_length));
+ ctx->put_result.params.__set_content_length(content_length);
}
}
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 6f6b7ad6b03..88e12e19dca 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -644,8 +644,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
- ctx->id.to_thrift(), content_length));
+ ctx->put_result.params.__set_content_length(content_length);
}
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 69f150b6c60..0c3589edbe0 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -498,33 +498,17 @@ Status LoadBlockQueue::close_wal() {
return Status::OK();
}
-bool LoadBlockQueue::has_enough_wal_disk_space(
- const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const
TUniqueId& load_id,
- bool is_blocks_contain_all_load_data) {
- size_t blocks_size = 0;
- for (auto block : blocks) {
- blocks_size += block->bytes();
- }
- size_t content_length = 0;
- Status st =
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id,
&content_length);
- if (st.ok()) {
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
- } else {
- return Status::InternalError("can not find load id.");
- }
- size_t pre_allocated = is_blocks_contain_all_load_data
- ? blocks_size
- : (blocks_size > content_length ?
blocks_size : content_length);
+bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
size_t available_bytes = 0;
{
- st = wal_mgr->get_wal_dir_available_size(wal_base_path,
&available_bytes);
+ Status st = wal_mgr->get_wal_dir_available_size(wal_base_path,
&available_bytes);
if (!st.ok()) {
LOG(WARNING) << "get wal disk available size filed!";
}
}
if (pre_allocated < available_bytes) {
- st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path,
pre_allocated, true);
+ Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path,
pre_allocated, true);
if (!st.ok()) {
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " <<
st.to_string();
}
@@ -534,30 +518,4 @@ bool LoadBlockQueue::has_enough_wal_disk_space(
return false;
}
}
-
-Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t
content_length) {
- std::unique_lock l(_load_info_lock);
- if (_load_id_to_content_length_map.find(load_id) ==
_load_id_to_content_length_map.end()) {
- _load_id_to_content_length_map.insert(std::make_pair(load_id,
content_length));
- }
- return Status::OK();
-}
-
-Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t*
content_length) {
- std::shared_lock l(_load_info_lock);
- if (_load_id_to_content_length_map.find(load_id) !=
_load_id_to_content_length_map.end()) {
- *content_length = _load_id_to_content_length_map[load_id];
- return Status::OK();
- }
- return Status::InternalError("can not find load id!");
-}
-
-Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
- std::unique_lock l(_load_info_lock);
- if (_load_id_to_content_length_map.find(load_id) ==
_load_id_to_content_length_map.end()) {
- return Status::InternalError("can not remove load id!");
- }
- _load_id_to_content_length_map.erase(load_id);
- return Status::OK();
-}
} // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index bf89aa2aa50..125256535fe 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -66,8 +66,7 @@ public:
WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
int be_exe_version);
Status close_wal();
- bool has_enough_wal_disk_space(const
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
- const TUniqueId& load_id, bool
is_blocks_contain_all_load_data);
+ bool has_enough_wal_disk_space(size_t pre_allocated);
// 1s
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
@@ -157,9 +156,6 @@ public:
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version);
- Status update_load_info(TUniqueId load_id, size_t content_length);
- Status get_load_info(TUniqueId load_id, size_t* content_length);
- Status remove_load_info(TUniqueId load_id);
private:
ExecEnv* _exec_env = nullptr;
@@ -170,8 +166,6 @@ private:
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for back
pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
- std::shared_mutex _load_info_lock;
- std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index dcbf6346fdd..896178946e3 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -149,6 +149,9 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
if (request.__isset.wal_id) {
_runtime_state->set_wal_id(request.wal_id);
}
+ if (request.__isset.content_length) {
+ _runtime_state->set_content_length(request.content_length);
+ }
if (request.query_options.__isset.is_report_success) {
_is_report_success = request.query_options.is_report_success;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ee515007eb6..1470ec89776 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -255,7 +255,11 @@ public:
void set_wal_id(int64_t wal_id) { _wal_id = wal_id; }
- int64_t wal_id() { return _wal_id; }
+ int64_t wal_id() const { return _wal_id; }
+
+ void set_content_length(size_t content_length) { _content_length =
content_length; }
+
+ size_t content_length() const { return _content_length; }
const std::string& import_label() { return _import_label; }
@@ -659,6 +663,7 @@ private:
std::string _load_dir;
int64_t _load_job_id;
int64_t _wal_id = -1;
+ size_t _content_length = 0;
// mini load
int64_t _normal_row_number;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 277b9859bd5..75ffa13233e 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -66,8 +66,6 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
Status GroupCommitBlockSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
- RETURN_IF_ERROR(
-
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(),
0));
_state = state;
// profile must add to state's object pool
@@ -240,8 +238,8 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState*
state,
_db_id, _table_id, _base_schema_version, load_id,
_load_block_queue,
_state->be_exec_version()));
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
- _group_commit_mode =
_load_block_queue->has_enough_wal_disk_space(
- _blocks, load_id,
is_blocks_contain_all_load_data)
+ size_t pre_allocated =
_pre_allocated(is_blocks_contain_all_load_data);
+ _group_commit_mode =
_load_block_queue->has_enough_wal_disk_space(pre_allocated)
? TGroupCommitMode::ASYNC_MODE
: TGroupCommitMode::SYNC_MODE;
if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
@@ -265,5 +263,16 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState*
state,
return Status::OK();
}
+size_t GroupCommitBlockSink::_pre_allocated(bool
is_blocks_contain_all_load_data) {
+ size_t blocks_size = 0;
+ for (auto block : _blocks) {
+ blocks_size += block->bytes();
+ }
+ return is_blocks_contain_all_load_data
+ ? blocks_size
+ : (blocks_size > _state->content_length() ? blocks_size
+ :
_state->content_length());
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
index 3db4bdd31f8..9a57cb594e7 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -48,6 +48,7 @@ public:
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block>
block);
Status _add_blocks(RuntimeState* state, bool
is_blocks_contain_all_load_data);
+ size_t _pre_allocated(bool is_blocks_contain_all_load_data);
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index e07797b593d..8700a50790f 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -477,6 +477,8 @@ struct TExecPlanFragmentParams {
27: optional i32 total_load_streams
28: optional i32 num_local_sink
+
+ 29: optional i64 content_length
}
struct TExecPlanFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]