This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7163ea2cf [Enhancement](thread) FE fetch_data use brpc thread
locally in BE (#44928)
3d7163ea2cf is described below
commit 3d7163ea2cf0eaa13147f89c9c350141a2f24f9e
Author: zclllhhjj <[email protected]>
AuthorDate: Tue Dec 10 12:05:16 2024 +0800
[Enhancement](thread) FE fetch_data use brpc thread locally in BE (#44928)
1. do `fetch_data` in brpc thread locally
2. add more gauge for fragment async thread
3. fix wrong log info for FifoThreadPool
---
be/src/runtime/buffer_control_block.cpp | 6 ++---
be/src/runtime/buffer_control_block.h | 10 ++++----
be/src/runtime/fragment_mgr.cpp | 21 +++++++----------
be/src/runtime/fragment_mgr.h | 3 +--
be/src/service/internal_service.cpp | 14 ++++-------
be/src/util/doris_metrics.h | 1 +
be/src/util/threadpool.h | 35 ++++++++++++++++------------
be/src/util/work_thread_pool.hpp | 14 +++++------
be/src/vec/sink/writer/async_result_writer.h | 6 ++---
9 files changed, 53 insertions(+), 57 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 8c1ae79955f..6f4427746f8 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -30,12 +30,10 @@
#include <utility>
#include <vector>
-#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "pipeline/dependency.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
-#include "util/string_util.h"
#include "util/thrift_util.h"
#include "vec/core/block.h"
@@ -149,8 +147,8 @@ void GetArrowResultBatchCtx::on_data(
delete this;
}
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size,
RuntimeState* state)
- : _fragment_id(id),
+BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size,
RuntimeState* state)
+ : _fragment_id(std::move(id)),
_is_close(false),
_is_cancelled(false),
_buffer_limit(buffer_size),
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 249e1ba7652..9060007232e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -21,10 +21,10 @@
#include <cctz/time_zone.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
-#include <stdint.h>
#include <atomic>
#include <condition_variable>
+#include <cstdint>
#include <deque>
#include <list>
#include <memory>
@@ -34,7 +34,6 @@
#include "common/status.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
-#include "util/hash_util.hpp"
namespace google::protobuf {
class Closure;
@@ -98,13 +97,15 @@ struct GetArrowResultBatchCtx {
// buffer used for result customer and producer
class BufferControlBlock {
public:
- BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState*
state);
+ BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state);
~BufferControlBlock();
Status init();
+ // try to consume _waiting_rpc or make data waiting in
_fe_result_batch_queue. try to combine block to reduce rpc first.
Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>&
result);
Status add_arrow_batch(RuntimeState* state,
std::shared_ptr<vectorized::Block>& result);
+ // if there's Block waiting in _fe_result_batch_queue, send it(by
on_data). otherwise make a rpc wait in _waiting_rpc.
void get_batch(GetResultBatchCtx* ctx);
// for ArrowFlightBatchLocalReader
Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
@@ -150,7 +151,7 @@ protected:
const int _buffer_limit;
int64_t _packet_num;
- // blocking queue for batch
+ // Producer. blocking queue for result batch waiting to sent to FE by
_waiting_rpc.
FeResultQueue _fe_result_batch_queue;
ArrowFlightResultQueue _arrow_flight_result_batch_queue;
// for arrow flight
@@ -163,6 +164,7 @@ protected:
// TODO, waiting for data will block pipeline, so use a request pool to
save requests waiting for data.
std::condition_variable _arrow_data_arrival;
+ // Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue
filled, the rpc could be sent.
std::deque<GetResultBatchCtx*> _waiting_rpc;
std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f96e4152500..b1bc42491b5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -34,17 +34,16 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <pthread.h>
-#include <stddef.h>
#include <sys/time.h>
#include <thrift/TApplicationException.h>
#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>
-#include <time.h>
#include <unistd.h>
#include <algorithm>
-#include <atomic>
+#include <cstddef>
+#include <ctime>
#include "common/status.h"
// IWYU pragma: no_include <bits/chrono.h>
@@ -58,19 +57,16 @@
#include <unordered_set>
#include <utility>
-#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/utils.h"
-#include "gutil/strings/substitute.h"
#include "io/fs/stream_load_pipe.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/frontend_info.h"
-#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/primitive_type.h"
#include "runtime/query_context.h"
#include "runtime/runtime_filter_mgr.h"
@@ -89,24 +85,20 @@
#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
-#include "util/hash_util.hpp"
-#include "util/mem_info.h"
#include "util/network_util.h"
-#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
-#include "util/url_coding.h"
#include "vec/runtime/shared_hash_table_controller.h"
-#include "vec/runtime/vdatetime_value.h"
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads,
MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr",
"prepare");
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
@@ -184,7 +176,7 @@ static Status _do_fetch_running_queries_rpc(const
FrontendInfo& fe_info,
}
// Avoid logic error in frontend.
- if (rpc_result.__isset.status == false || rpc_result.status.status_code !=
TStatusCode::OK) {
+ if (!rpc_result.__isset.status || rpc_result.status.status_code !=
TStatusCode::OK) {
LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
doris::to_string(rpc_result.status.status_code));
@@ -193,7 +185,7 @@ static Status _do_fetch_running_queries_rpc(const
FrontendInfo& fe_info,
doris::to_string(rpc_result.status.status_code));
}
- if (rpc_result.__isset.running_queries == false) {
+ if (!rpc_result.__isset.running_queries) {
return Status::InternalError("Failed to fetch running queries from {},
reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
"running_queries is not set");
@@ -254,6 +246,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
[this]() { return _thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads,
+ [this]() { return _thread_pool->num_active_threads();
});
CHECK(s.ok()) << s.to_string();
}
@@ -262,6 +256,7 @@ FragmentMgr::~FragmentMgr() = default;
void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(fragment_instance_count);
DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
_cancel_thread->join();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 63d666788d0..e85fb07cba6 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -21,9 +21,8 @@
#include <gen_cpp/QueryPlanExtra_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
-#include <stdint.h>
-#include <condition_variable>
+#include <cstdint>
#include <functional>
#include <iosfwd>
#include <memory>
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 439f3f17faf..fb0b2f090bc 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -665,15 +665,11 @@ void
PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c
void PInternalService::fetch_data(google::protobuf::RpcController* controller,
const PFetchDataRequest* request,
PFetchDataResult* result,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.try_offer([this, controller, request, result,
done]() {
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
- _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
- });
- if (!ret) {
- offer_failed(result, done, _heavy_work_pool);
- return;
- }
+ // fetch_data is a light operation which will put a request rather than
wait inplace when there's no data ready.
+ // when there's data ready, use brpc to send. there's queue in brpc
service. won't take it too long.
+ auto* cntl = static_cast<brpc::Controller*>(controller);
+ auto* ctx = new GetResultBatchCtx(cntl, result, done);
+ _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
void PInternalService::fetch_arrow_data(google::protobuf::RpcController*
controller,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 69516773deb..31b907eec9e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -202,6 +202,7 @@ public:
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_queue_size = nullptr;
+ UIntGauge* fragment_thread_pool_num_active_threads = nullptr;
// Upload metrics
UIntGauge* upload_total_byte = nullptr;
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 9bd4a7246fb..f822c307aa6 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -20,12 +20,11 @@
#pragma once
-#include <limits.h>
-#include <stddef.h>
-
#include <boost/intrusive/detail/algo_type.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
+#include <climits>
+#include <cstddef>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <condition_variable>
@@ -50,7 +49,7 @@ class ThreadPoolToken;
class Runnable {
public:
virtual void run() = 0;
- virtual ~Runnable() {}
+ virtual ~Runnable() = default;
};
// ThreadPool takes a lot of arguments. We provide sane defaults with a
builder.
@@ -127,6 +126,9 @@ public:
return Status::OK();
}
+ ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
+ void operator=(const ThreadPoolBuilder&) = delete;
+
private:
friend class ThreadPool;
const std::string _name;
@@ -136,9 +138,6 @@ private:
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
std::chrono::milliseconds _idle_timeout;
- ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
- void operator=(const ThreadPoolBuilder&) = delete;
-
template <typename T>
static constexpr bool always_false_v = false;
};
@@ -256,13 +255,22 @@ public:
return _total_queued_tasks;
}
- std::vector<int> debug_info() {
+ std::vector<int> debug_info() const {
std::lock_guard<std::mutex> l(_lock);
std::vector<int> arr = {_num_threads,
static_cast<int>(_threads.size()), _min_threads,
_max_threads};
return arr;
}
+ std::string get_info() const {
+ std::lock_guard<std::mutex> l(_lock);
+ return fmt::format("ThreadPool(name={},
threads(active/pending)=({}/{}), queued_task={})",
+ _name, _active_threads, _num_threads_pending_start,
_total_queued_tasks);
+ }
+
+ ThreadPool(const ThreadPool&) = delete;
+ void operator=(const ThreadPool&) = delete;
+
private:
friend class ThreadPoolBuilder;
friend class ThreadPoolToken;
@@ -372,7 +380,7 @@ private:
//
// Protected by _lock.
struct IdleThread : public boost::intrusive::list_base_hook<> {
- explicit IdleThread() {}
+ explicit IdleThread() = default;
// Condition variable for "queue is not empty". Waiters wake up when a
new
// task is queued.
@@ -384,9 +392,6 @@ private:
// ExecutionMode::CONCURRENT token used by the pool for tokenless
submission.
std::unique_ptr<ThreadPoolToken> _tokenless;
-
- ThreadPool(const ThreadPool&) = delete;
- void operator=(const ThreadPool&) = delete;
};
// Entry point for token-based task submission and blocking for a particular
@@ -434,6 +439,9 @@ public:
return _entries.size();
}
+ ThreadPoolToken(const ThreadPoolToken&) = delete;
+ void operator=(const ThreadPoolToken&) = delete;
+
private:
// All possible token states. Legal state transitions:
// IDLE -> RUNNING: task is submitted via token
@@ -516,9 +524,6 @@ private:
int _num_submitted_tasks;
// Number of tasks which has not been submitted to the thread pool's queue.
int _num_unsubmitted_tasks;
-
- ThreadPoolToken(const ThreadPoolToken&) = delete;
- void operator=(const ThreadPoolToken&) = delete;
};
} // namespace doris
diff --git a/be/src/util/work_thread_pool.hpp b/be/src/util/work_thread_pool.hpp
index 00430ff7514..1da8a08f90d 100644
--- a/be/src/util/work_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -18,7 +18,6 @@
#pragma once
#include <mutex>
-#include <thread>
#include "util/blocking_priority_queue.hpp"
#include "util/blocking_queue.hpp"
@@ -126,12 +125,13 @@ public:
}
std::string get_info() const {
- return fmt::format(
- "PriorityThreadPool(name={}, queue_size={}/{},
active_thread={}/{}, "
- "total_get_wait_time={}, total_put_wait_time={})",
- _name, get_queue_size(), _work_queue.get_capacity(),
_active_threads,
- _threads.size(), _work_queue.total_get_wait_time(),
- _work_queue.total_put_wait_time());
+ return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
+ fmt::format(
+ "(name={}, queue_size={}/{}, active_thread={}/{}, "
+ "total_get_wait_time={}, total_put_wait_time={})",
+ _name, get_queue_size(), _work_queue.get_capacity(),
_active_threads,
+ _threads.size(), _work_queue.total_get_wait_time(),
+ _work_queue.total_put_wait_time());
}
protected:
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 513f2aa7984..2a90dd2dbd0 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -19,7 +19,7 @@
#include <concurrentqueue.h>
#include <condition_variable>
-#include <queue>
+#include <queue> // IWYU pragma: keep
#include "runtime/result_writer.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -49,7 +49,7 @@ class Block;
* pipeline execution engine performance.
*
* The Sub class of AsyncResultWriter need to impl two virtual function
- * * Status open() the first time IO work like: create file/ connect
networking
+ * * Status open() the first time IO work like: create file/ connect
network
* * Status write() do the real IO work for block
*/
class AsyncResultWriter : public ResultWriter {
@@ -64,7 +64,7 @@ public:
virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
- // sink the block date to date queue, it is async
+ // sink the block data to data queue, it is async
Status sink(Block* block, bool eos);
// Add the IO thread task process block() to thread pool to dispose the IO
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]