morningman commented on a change in pull request #6916:
URL: https://github.com/apache/incubator-doris/pull/6916#discussion_r739636432
##########
File path: be/src/runtime/data_stream_sender.cpp
##########
@@ -105,17 +105,18 @@ Status DataStreamSender::Channel::init(RuntimeState*
state) {
_brpc_request.set_be_number(_be_number);
_brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) *
1000;
- if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
- _brpc_stub =
- state->exec_env()->brpc_stub_cache()->get_stub("127.0.0.1",
_brpc_dest_addr.port);
- } else {
- _brpc_stub =
state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
- }
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
_need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo
!= -1);
+ if (_need_close) {
+ _brpc_stub =
state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
+ if (!_brpc_stub) {
+ LOG(WARNING) << "Get rpc stub failed, dest_addr=" <<
_brpc_dest_addr;
+ return Status::InternalError("get rpc stub failed");
Review comment:
More useful error msg
##########
File path: be/src/util/brpc_stub_cache.h
##########
@@ -34,25 +34,32 @@ class BrpcStubCache {
BrpcStubCache();
virtual ~BrpcStubCache();
- virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
- std::lock_guard<SpinLock> l(_lock);
+ virtual std::shared_ptr<PBackendService_Stub> get_stub(const
butil::EndPoint& endpoint) {
+ std::lock_guard<std::mutex> l(_mutex);
auto stub_ptr = _stub_map.seek(endpoint);
if (stub_ptr != nullptr) {
- return *stub_ptr;
+ if (available(*stub_ptr, endpoint)) {
Review comment:
It is not good to check it every time we call `get_stub`. I think it
should be checked when error happens.
##########
File path: be/src/util/brpc_stub_cache.cpp
##########
@@ -24,15 +24,12 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT)
BrpcStubCache::BrpcStubCache() {
_stub_map.init(239);
REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_mutex);
Review comment:
Why not using SpinLock?
##########
File path: docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md
##########
@@ -0,0 +1,46 @@
+---
+{
+ "title": "检查和重置连接缓存",
Review comment:
You need to add these new docs to sidebar
##########
File path: be/src/runtime/runtime_filter_mgr.cpp
##########
@@ -229,9 +229,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi);
request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo);
- PBackendService_Stub* stub =
ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
- targets[i].target_fragment_instance_addr);
- VLOG_NOTICE << "send filter " <<
rpc_contexts[i]->request.filter_id()
+ std::shared_ptr<PBackendService_Stub> stub(
+ ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
+ targets[i].target_fragment_instance_addr));
+ LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id()
Review comment:
Do not use INFO level log here.
##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -368,6 +370,10 @@ void NodeChannel::try_send_batch() {
if (row_batch->num_rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
row_batch->serialize(request.mutable_row_batch());
+ if (request.row_batch().ByteSizeLong() >=
double(config::brpc_max_body_size) * 0.95f) {
Review comment:
ByteSizeLong() will recursively calls ByteSizeLong() on all embedded
messages. It is expensive.
And it this message really help for debugging? First, there is no context
info in log. Second, what should we do if we meet this message?
##########
File path: be/src/util/brpc_stub_cache.h
##########
@@ -34,25 +34,32 @@ class BrpcStubCache {
BrpcStubCache();
virtual ~BrpcStubCache();
- virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
- std::lock_guard<SpinLock> l(_lock);
+ virtual std::shared_ptr<PBackendService_Stub> get_stub(const
butil::EndPoint& endpoint) {
+ std::lock_guard<std::mutex> l(_mutex);
Review comment:
Why not using SpinLock?
##########
File path: be/src/exprs/runtime_filter_rpc.cpp
##########
@@ -39,7 +39,12 @@ struct IRuntimeFilter::rpc_context {
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr) {
DCHECK(is_producer());
DCHECK(_rpc_context == nullptr);
- PBackendService_Stub* stub =
state->exec_env()->brpc_stub_cache()->get_stub(*addr);
+ std::shared_ptr<PBackendService_Stub> stub(
+ state->exec_env()->brpc_stub_cache()->get_stub(*addr));
+ if (!stub) {
+ LOG(WARNING) << "Get rpc stub failed, host=" << addr->hostname << ",
port=" << addr->port;
+ return Status::InternalError("get rpc stub failed");
Review comment:
Should return more useful info in error log, like ip and reason
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]