This is an automated email from the ASF dual-hosted git repository.
plat1ko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb433c73257 [enhance](S3) Don't use aws's multithread utility in S3 FS
to suite newer C++ compiler (#38539)
bb433c73257 is described below
commit bb433c73257bca314ce29f5cfa47d4a184fef0eb
Author: AlexYue <[email protected]>
AuthorDate: Wed Jul 31 13:23:28 2024 +0800
[enhance](S3) Don't use aws's multithread utility in S3 FS to suite newer
C++ compiler (#38539)
---
be/src/common/config.cpp | 3 ++-
be/src/common/config.h | 3 ++-
be/src/io/fs/s3_file_system.cpp | 23 ++++++++++-------------
be/src/runtime/exec_env.h | 2 ++
be/src/runtime/exec_env_init.cpp | 6 ++++++
5 files changed, 22 insertions(+), 15 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f984621ec85..d0a2a5fa7e3 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -938,7 +938,8 @@ DEFINE_mInt32(cold_data_compaction_interval_sec, "1800");
DEFINE_String(tmp_file_dir, "tmp");
-DEFINE_Int32(s3_transfer_executor_pool_size, "2");
+DEFINE_Int32(min_s3_file_system_thread_num, "16");
+DEFINE_Int32(max_s3_file_system_thread_num, "64");
DEFINE_Bool(enable_time_lut, "true");
DEFINE_mBool(enable_simdjson_reader, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fcfce74e7be..e117c824329 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -995,7 +995,8 @@ DECLARE_mInt32(confirm_unused_remote_files_interval_sec);
DECLARE_Int32(cold_data_compaction_thread_num);
DECLARE_mInt32(cold_data_compaction_interval_sec);
-DECLARE_Int32(s3_transfer_executor_pool_size);
+DECLARE_Int32(min_s3_file_system_thread_num);
+DECLARE_Int32(max_s3_file_system_thread_num);
DECLARE_Bool(enable_time_lut);
DECLARE_mBool(enable_simdjson_reader);
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 93f36429485..3905c4ddb1e 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -18,9 +18,8 @@
#include "io/fs/s3_file_system.h"
#include <fmt/format.h>
-#include <stddef.h>
-#include <algorithm>
+#include <cstddef>
#include "common/compiler_util.h" // IWYU pragma: keep
// IWYU pragma: no_include <bits/chrono.h>
@@ -32,7 +31,6 @@
#include <fstream> // IWYU pragma: keep
#include <future>
#include <memory>
-#include <sstream>
#include "common/config.h"
#include "common/logging.h"
@@ -46,7 +44,7 @@
#include "io/fs/s3_file_reader.h"
#include "io/fs/s3_file_writer.h"
#include "io/fs/s3_obj_storage_client.h"
-#include "util/bvar_helper.h"
+#include "runtime/exec_env.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -69,13 +67,6 @@ Result<std::string> get_key(const Path& full_path) {
return uri.get_key();
}
-// TODO(plat1ko): AwsTransferManager will be deprecated
-std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>&
default_executor() {
- static auto executor =
Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
- "default", config::s3_transfer_executor_pool_size);
- return executor;
-}
-
} // namespace
ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {}
@@ -383,13 +374,19 @@ Status S3FileSystem::batch_upload_impl(const
std::vector<Path>& local_files,
return Status::OK();
};
+ Status s = Status::OK();
std::vector<std::future<Status>> futures;
for (int i = 0; i < local_files.size(); ++i) {
auto task = std::make_shared<std::packaged_task<Status(size_t
idx)>>(upload_task);
futures.emplace_back(task->get_future());
- default_executor()->Submit([t = std::move(task), idx = i]() mutable {
(*t)(idx); });
+ auto st =
ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func(
+ [t = std::move(task), idx = i]() mutable { (*t)(idx); });
+ // We shouldn't return immediately since the previous submitted tasks
might still be running in the thread pool
+ if (!st.ok()) {
+ s = st;
+ break;
+ }
}
- Status s = Status::OK();
for (auto&& f : futures) {
auto cur_s = f.get();
if (!cur_s.ok()) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 89e5593c84b..65cf70bf568 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -204,6 +204,7 @@ public:
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get();
}
ThreadPool* non_block_close_thread_pool();
+ ThreadPool* s3_file_system_thread_pool() { return
_s3_file_system_thread_pool.get(); }
Status init_pipeline_task_scheduler();
void init_file_cache_factory();
@@ -381,6 +382,7 @@ private:
// Pool to use a new thread to release object
std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
+ std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6740f548761..dd8af20715f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -265,6 +265,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_min_threads(config::min_nonblock_close_thread_num)
.set_max_threads(config::max_nonblock_close_thread_num)
.build(&_non_block_close_thread_pool));
+ static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool")
+
.set_min_threads(config::min_s3_file_system_thread_num)
+
.set_max_threads(config::max_s3_file_system_thread_num)
+ .build(&_s3_file_system_thread_pool));
// NOTE: runtime query statistics mgr could be visited by query and daemon
thread
// so it should be created before all query begin and deleted after all
query and daemon thread stoppped
@@ -675,6 +679,7 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_join_node_thread_pool);
SAFE_SHUTDOWN(_lazy_release_obj_pool);
SAFE_SHUTDOWN(_non_block_close_thread_pool);
+ SAFE_SHUTDOWN(_s3_file_system_thread_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
@@ -720,6 +725,7 @@ void ExecEnv::destroy() {
_join_node_thread_pool.reset(nullptr);
_lazy_release_obj_pool.reset(nullptr);
_non_block_close_thread_pool.reset(nullptr);
+ _s3_file_system_thread_pool.reset(nullptr);
_send_report_thread_pool.reset(nullptr);
_send_table_stats_thread_pool.reset(nullptr);
_buffered_reader_prefetch_thread_pool.reset(nullptr);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]