This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aa75f79fad [fix](executor)cancel exchange buffer rpc when query is
cancelled (#22226)
aa75f79fad is described below
commit aa75f79fad87ac845bb7552fd81ad83df6ab4759
Author: wangbo <[email protected]>
AuthorDate: Thu Jul 27 14:38:25 2023 +0800
[fix](executor)cancel exchange buffer rpc when query is cancelled (#22226)
when brpc client make a request to a server, if the server doesn't response
and may not response forever(such as BE restart), the query can be cancelled at
once, but the ExchangeSinkBuffer can not be cancelled until rpc timeout.
So we hope when the query is cancelled, the ExchangeSinkBuffer can be
closed at once.
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 29 +++++++++++++++++++++++++--
be/src/pipeline/exec/exchange_sink_buffer.h | 9 ++++++++-
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index e06bb24752..0a487922a2 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -22,7 +22,6 @@
#include <butil/iobuf_inl.h>
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
@@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const {
return total_package_size <= max_package_size;
}
-bool ExchangeSinkBuffer::is_pending_finish() const {
+bool ExchangeSinkBuffer::is_pending_finish() {
+ //note(wb) angly implementation here, because operator couples the
scheduling logic
+ // graceful implementation maybe as follows:
+ // 1 make ExchangeSinkBuffer support try close which calls
brpc::StartCancel
+ // 2 make BlockScheduler calls tryclose when query is cancel
+ bool need_cancel = _context->is_canceled();
+
for (auto& pair : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*(pair.second));
auto& id = pair.first;
if (!_instance_to_sending_by_pipeline.at(id)) {
+ // when pending finish, we need check whether current query is
cancelled
+ if (need_cancel && _instance_to_rpc_ctx.find(id) !=
_instance_to_rpc_ctx.end()) {
+ auto& rpc_ctx = _instance_to_rpc_ctx[id];
+ if (!rpc_ctx.is_cancelled) {
+ brpc::StartCancel(rpc_ctx._closure->cntl.call_id());
+ rpc_ctx.is_cancelled = true;
+ }
+ }
return true;
}
}
@@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block.get());
}
auto* closure = request.channel->get_closure(id, request.eos, nullptr);
+
+ ExchangeRpcContext rpc_ctx;
+ rpc_ctx._closure = closure;
+ rpc_ctx.is_cancelled = false;
+ _instance_to_rpc_ctx[id] = rpc_ctx;
+
closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
closure->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
@@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
auto* closure = request.channel->get_closure(id, request.eos,
request.block_holder);
+
+ ExchangeRpcContext rpc_ctx;
+ rpc_ctx._closure = closure;
+ rpc_ctx.is_cancelled = false;
+ _instance_to_rpc_ctx[id] = rpc_ctx;
+
closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
closure->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index dcea246f91..c463656310 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -19,6 +19,7 @@
#include <brpc/controller.h>
#include <gen_cpp/data.pb.h>
+#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <parallel_hashmap/phmap.h>
#include <stdint.h>
@@ -153,6 +154,11 @@ private:
vectorized::BroadcastPBlockHolder* _data;
};
+struct ExchangeRpcContext {
+ SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr;
+ bool is_cancelled = false;
+};
+
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer {
public:
@@ -162,7 +168,7 @@ public:
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
bool can_write() const;
- bool is_pending_finish() const;
+ bool is_pending_finish();
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
@@ -185,6 +191,7 @@ private:
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
+ phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext>
_instance_to_rpc_ctx;
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]