github-actions[bot] commented on code in PR #27726:
URL: https://github.com/apache/doris/pull/27726#discussion_r1434962037
##########
be/src/olap/wal_manager.cpp:
##########
@@ -80,9 +117,43 @@
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
- return Thread::create(
- "WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
- &_replay_thread);
+ return Status::OK();
+}
+
+Status WalManager::_init_wal_disk_info() {
Review Comment:
warning: method '_init_wal_disk_info' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:101:
```diff
- Status _init_wal_disk_info();
+ static Status _init_wal_disk_info();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -66,6 +72,37 @@
}
Status WalManager::init() {
+ RETURN_IF_ERROR(_init_wal_dirs_conf());
+ RETURN_IF_ERROR(_init_wal_dirs());
+ RETURN_IF_ERROR(_init_wal_disk_info());
+ return Thread::create(
+ "WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
+ &_replay_thread);
+}
+
+Status WalManager::_init_wal_dirs_conf() {
+ std::vector<std::string> tmp_dirs;
+ if (_wal_dirs.empty()) {
+ // default case.
+ for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) {
+ tmp_dirs.emplace_back(path.path + "/wal");
+ }
+ } else {
+ // user config must be absolute path.
+ for (const std::string& wal_dir : _wal_dirs) {
+ if (std::filesystem::path(wal_dir).is_absolute()) {
+ tmp_dirs.emplace_back(wal_dir);
+ } else {
+ return Status::InternalError(
+ "BE config group_commit_replay_wal_dir has to be
absolute path!");
+ }
+ }
+ }
+ _wal_dirs = tmp_dirs;
+ return Status::OK();
+}
+
+Status WalManager::_init_wal_dirs() {
Review Comment:
warning: method '_init_wal_dirs' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:100:
```diff
- Status _init_wal_dirs();
+ static Status _init_wal_dirs();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -410,4 +473,114 @@
return Status::OK();
}
+bool WalManager::is_wal_disk_space_enough() {
+ // if all disks space usage < 80%
+ std::shared_lock l(_wal_disk_info_lock);
+ for (const auto& it : _wal_disk_info_map) {
+ size_t limit = it.second->limit;
+ size_t available = it.second->available();
+ if (available >= limit * 0.8) {
+ return true;
+ }
+ }
+ return false;
+}
+
+const string& WalManager::get_min_disk_usage_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_dirs.size() == 1
+ ? _wal_dirs[0]
+ : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(),
+ [this](const std::string& dir1, const
std::string& dir2) {
+ return
_wal_disk_info_map[dir1]->available() <
+
_wal_disk_info_map[dir2]->available();
+ });
+}
+
+const string& WalManager::get_random_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_disk_info_map.size() == 1
+ ? _wal_disk_info_map.begin()->first
+ : std::next(_wal_disk_info_map.begin(), rand() %
_wal_disk_info_map.size())
+ ->first;
+}
+
+size_t WalManager::get_max_available_size() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_disk_info_map.size() == 1
+ ? _wal_disk_info_map.begin()->second->available()
+ : std::max_element(_wal_disk_info_map.begin(),
_wal_disk_info_map.end(),
+ [](const auto& map1, const auto& map2) {
+ return map1.second->available() <
+ map2.second->available();
+ })
+ ->second->available();
+}
+
+Status WalManager::update_wal_disk_info_map(std::string wal_dir, size_t limit,
size_t used,
+ size_t pre_allocated, bool
is_add_pre_allocated) {
+ if (_wal_disk_info_map.find(wal_dir) != _wal_disk_info_map.end()) {
+ std::unique_lock l(_wal_disk_info_lock);
+ if (limit != static_cast<size_t>(-1)) {
+ _wal_disk_info_map[wal_dir]->limit = limit;
+ } else {
+ size_t available_bytes;
+ size_t disk_capacity_bytes;
+ RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(
+ wal_dir, &disk_capacity_bytes, &available_bytes));
+ bool is_percent = true;
+ int64_t wal_disk_limit = ParseUtil::parse_mem_spec(
+ config::group_commit_wal_max_disk_limit, -1,
available_bytes, &is_percent);
+ if (wal_disk_limit <= 0) {
+ return Status::InternalError("Disk full! Please check your
disk usage!");
+ }
+ size_t wal_dir_size = 0;
+
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir,
&wal_dir_size));
+ _wal_disk_info_map[wal_dir]->limit = wal_disk_limit;
+ }
+ if (used != static_cast<size_t>(-1)) {
+ _wal_disk_info_map[wal_dir]->used = used;
+ } else {
+ size_t wal_dir_size = 0;
+
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir,
&wal_dir_size));
+ _wal_disk_info_map[wal_dir]->used = wal_dir_size;
+ }
+ if (pre_allocated != static_cast<size_t>(-1)) {
+ if (is_add_pre_allocated) {
+ _wal_disk_info_map[wal_dir]->pre_allocated += pre_allocated;
+ } else {
+ _wal_disk_info_map[wal_dir]->pre_allocated -= pre_allocated;
+ }
+ }
+ } else {
+ return Status::InternalError("Can not find wal dir in wal disk info
map.");
+ }
+ return Status::OK();
+}
+
+Status WalManager::get_wal_disk_available_size(const std::string& wal_dir,
Review Comment:
warning: method 'get_wal_disk_available_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status WalManager::get_wal_disk_available_size(const std::string&
wal_dir,
```
##########
be/src/runtime/group_commit_mgr.cpp:
##########
@@ -495,4 +505,70 @@
}
return Status::OK();
}
+
+bool LoadBlockQueue::is_wal_disk_space_enough(
+ const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const
TUniqueId& load_id,
+ bool is_blocks_contain_all_load_data) {
+ size_t blocks_size = 0;
+ for (auto block : blocks) {
+ blocks_size += block->bytes();
+ }
+ size_t content_length = 0;
+ Status st =
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id,
&content_length);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to get load info!";
+ }
+ st = ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to remove load info!";
+ }
+ size_t pre_allocated = is_blocks_contain_all_load_data
+ ? blocks_size
+ : (blocks_size > content_length ?
blocks_size : content_length);
+ auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
+ size_t available_bytes = 0;
+ {
+ st = wal_mgr->get_wal_disk_available_size(wal_base_path,
&available_bytes);
+ if (!st.ok()) {
+ LOG(WARNING) << "get wal disk available size filed!";
+ }
+ }
+ if (pre_allocated < available_bytes) {
+ st = wal_mgr->update_wal_disk_info_map(wal_base_path, -1, -1,
pre_allocated);
+ if (!st.ok()) {
+ LOG(WARNING) << "update wal disk info map failed, reason: " <<
st.to_string();
+ }
+ block_queue_pre_allocated.fetch_add(pre_allocated);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t
content_length) {
+ std::unique_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) !=
_load_id_to_content_length_map.end()) {
+ return Status::InternalError("load id already exists!");
+ }
+ _load_id_to_content_length_map.insert(std::make_pair(load_id,
content_length));
+ return Status::OK();
+}
+
+Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t*
content_length) {
+ std::shared_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) ==
_load_id_to_content_length_map.end()) {
+ return Status::InternalError("can not find load id!");
+ }
+ *content_length = _load_id_to_content_length_map[load_id];
+ return Status::OK();
+}
+
+Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
Review Comment:
warning: method 'remove_load_info' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/group_commit_mgr.h:157:
```diff
- Status remove_load_info(TUniqueId load_id);
+ static Status remove_load_info(TUniqueId load_id);
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -410,4 +473,114 @@
return Status::OK();
}
+bool WalManager::is_wal_disk_space_enough() {
+ // if all disks space usage < 80%
+ std::shared_lock l(_wal_disk_info_lock);
+ for (const auto& it : _wal_disk_info_map) {
+ size_t limit = it.second->limit;
+ size_t available = it.second->available();
+ if (available >= limit * 0.8) {
+ return true;
+ }
+ }
+ return false;
+}
+
+const string& WalManager::get_min_disk_usage_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_dirs.size() == 1
+ ? _wal_dirs[0]
+ : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(),
+ [this](const std::string& dir1, const
std::string& dir2) {
+ return
_wal_disk_info_map[dir1]->available() <
+
_wal_disk_info_map[dir2]->available();
+ });
+}
+
+const string& WalManager::get_random_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_disk_info_map.size() == 1
+ ? _wal_disk_info_map.begin()->first
+ : std::next(_wal_disk_info_map.begin(), rand() %
_wal_disk_info_map.size())
+ ->first;
+}
+
+size_t WalManager::get_max_available_size() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_disk_info_map.size() == 1
+ ? _wal_disk_info_map.begin()->second->available()
+ : std::max_element(_wal_disk_info_map.begin(),
_wal_disk_info_map.end(),
+ [](const auto& map1, const auto& map2) {
+ return map1.second->available() <
+ map2.second->available();
+ })
+ ->second->available();
+}
+
+Status WalManager::update_wal_disk_info_map(std::string wal_dir, size_t limit,
size_t used,
Review Comment:
warning: method 'update_wal_disk_info_map' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status WalManager::update_wal_disk_info_map(std::string wal_dir,
size_t limit, size_t used,
```
##########
be/src/runtime/group_commit_mgr.cpp:
##########
@@ -495,4 +505,70 @@ Status LoadBlockQueue::close_wal() {
}
return Status::OK();
}
+
+bool LoadBlockQueue::is_wal_disk_space_enough(
+ const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const
TUniqueId& load_id,
+ bool is_blocks_contain_all_load_data) {
+ size_t blocks_size = 0;
+ for (auto block : blocks) {
+ blocks_size += block->bytes();
+ }
+ size_t content_length = 0;
+ Status st =
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id,
&content_length);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to get load info!";
+ }
+ st = ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to remove load info!";
+ }
+ size_t pre_allocated = is_blocks_contain_all_load_data
+ ? blocks_size
+ : (blocks_size > content_length ?
blocks_size : content_length);
+ auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
+ size_t available_bytes = 0;
+ {
+ st = wal_mgr->get_wal_disk_available_size(wal_base_path,
&available_bytes);
+ if (!st.ok()) {
+ LOG(WARNING) << "get wal disk available size filed!";
+ }
+ }
+ if (pre_allocated < available_bytes) {
+ st = wal_mgr->update_wal_disk_info_map(wal_base_path, -1, -1,
pre_allocated);
+ if (!st.ok()) {
+ LOG(WARNING) << "update wal disk info map failed, reason: " <<
st.to_string();
+ }
+ block_queue_pre_allocated.fetch_add(pre_allocated);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t
content_length) {
Review Comment:
warning: method 'update_load_info' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/group_commit_mgr.h:155:
```diff
- Status update_load_info(TUniqueId load_id, size_t content_length);
+ static Status update_load_info(TUniqueId load_id, size_t
content_length);
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -66,6 +72,37 @@ void WalManager::stop() {
}
Status WalManager::init() {
+ RETURN_IF_ERROR(_init_wal_dirs_conf());
+ RETURN_IF_ERROR(_init_wal_dirs());
+ RETURN_IF_ERROR(_init_wal_disk_info());
+ return Thread::create(
+ "WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
+ &_replay_thread);
+}
+
+Status WalManager::_init_wal_dirs_conf() {
Review Comment:
warning: method '_init_wal_dirs_conf' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:99:
```diff
- Status _init_wal_dirs_conf();
+ static Status _init_wal_dirs_conf();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -410,4 +473,114 @@
return Status::OK();
}
+bool WalManager::is_wal_disk_space_enough() {
Review Comment:
warning: method 'is_wal_disk_space_enough' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:93:
```diff
- bool is_wal_disk_space_enough();
+ static bool is_wal_disk_space_enough();
```
##########
be/src/runtime/group_commit_mgr.cpp:
##########
@@ -495,4 +505,70 @@
}
return Status::OK();
}
+
+bool LoadBlockQueue::is_wal_disk_space_enough(
+ const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const
TUniqueId& load_id,
+ bool is_blocks_contain_all_load_data) {
+ size_t blocks_size = 0;
+ for (auto block : blocks) {
+ blocks_size += block->bytes();
+ }
+ size_t content_length = 0;
+ Status st =
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id,
&content_length);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to get load info!";
+ }
+ st = ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to remove load info!";
+ }
+ size_t pre_allocated = is_blocks_contain_all_load_data
+ ? blocks_size
+ : (blocks_size > content_length ?
blocks_size : content_length);
+ auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
+ size_t available_bytes = 0;
+ {
+ st = wal_mgr->get_wal_disk_available_size(wal_base_path,
&available_bytes);
+ if (!st.ok()) {
+ LOG(WARNING) << "get wal disk available size filed!";
+ }
+ }
+ if (pre_allocated < available_bytes) {
+ st = wal_mgr->update_wal_disk_info_map(wal_base_path, -1, -1,
pre_allocated);
+ if (!st.ok()) {
+ LOG(WARNING) << "update wal disk info map failed, reason: " <<
st.to_string();
+ }
+ block_queue_pre_allocated.fetch_add(pre_allocated);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t
content_length) {
+ std::unique_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) !=
_load_id_to_content_length_map.end()) {
+ return Status::InternalError("load id already exists!");
+ }
+ _load_id_to_content_length_map.insert(std::make_pair(load_id,
content_length));
+ return Status::OK();
+}
+
+Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t*
content_length) {
Review Comment:
warning: method 'get_load_info' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/group_commit_mgr.h:156:
```diff
- Status get_load_info(TUniqueId load_id, size_t* content_length);
+ static Status get_load_info(TUniqueId load_id, size_t* content_length);
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -345,31 +419,20 @@
}
}
-Status WalManager::delete_wal(int64_t wal_id) {
+Status WalManager::delete_wal(int64_t wal_id, size_t
block_queue_pre_allocated) {
Review Comment:
warning: method 'delete_wal' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:63:
```diff
- Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated =
0);
+ static Status delete_wal(int64_t wal_id, size_t
block_queue_pre_allocated = 0);
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -410,4 +473,114 @@
return Status::OK();
}
+bool WalManager::is_wal_disk_space_enough() {
+ // if all disks space usage < 80%
+ std::shared_lock l(_wal_disk_info_lock);
+ for (const auto& it : _wal_disk_info_map) {
+ size_t limit = it.second->limit;
+ size_t available = it.second->available();
+ if (available >= limit * 0.8) {
+ return true;
+ }
+ }
+ return false;
+}
+
+const string& WalManager::get_min_disk_usage_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_dirs.size() == 1
+ ? _wal_dirs[0]
+ : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(),
+ [this](const std::string& dir1, const
std::string& dir2) {
+ return
_wal_disk_info_map[dir1]->available() <
+
_wal_disk_info_map[dir2]->available();
+ });
+}
+
+const string& WalManager::get_random_wal_dir() {
+ std::shared_lock l(_wal_disk_info_lock);
+ return _wal_disk_info_map.size() == 1
+ ? _wal_disk_info_map.begin()->first
+ : std::next(_wal_disk_info_map.begin(), rand() %
_wal_disk_info_map.size())
+ ->first;
+}
+
+size_t WalManager::get_max_available_size() {
Review Comment:
warning: method 'get_max_available_size' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:95:
```diff
- size_t get_max_available_size();
+ static size_t get_max_available_size();
```
##########
be/src/io/fs/local_file_system.cpp:
##########
@@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file,
int64_t* file_size) con
return Status::OK();
}
+Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size)
{
Review Comment:
warning: method 'directory_size' can be made static
[readability-convert-member-functions-to-static]
be/src/io/fs/local_file_system.h:80:
```diff
- Status directory_size(const Path& dir_path, size_t* dir_size);
+ static Status directory_size(const Path& dir_path, size_t* dir_size);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]