github-actions[bot] commented on code in PR #16639:
URL: https://github.com/apache/doris/pull/16639#discussion_r1111618765
##########
be/src/service/internal_service.cpp:
##########
@@ -423,200 +530,278 @@ void
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
const PTabletKeyLookupRequest*
request,
PTabletKeyLookupResponse*
response,
google::protobuf::Closure* done) {
- [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
- brpc::ClosureGuard guard(done);
- Status st = _tablet_fetch_data(request, response);
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
+ brpc::ClosureGuard guard(done);
+ Status st = _tablet_fetch_data(request, response);
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is to
get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st =
-
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is
to get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
Review Comment:
warning: no member named 'CANCELED' in 'doris::PCacheStatus'
[clang-diagnostic-error]
```cpp
response->set_status(PCacheStatus::CANCELED);
^
```
##########
be/src/service/internal_service.cpp:
##########
@@ -423,200 +530,278 @@
const PTabletKeyLookupRequest*
request,
PTabletKeyLookupResponse*
response,
google::protobuf::Closure* done) {
- [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
- brpc::ClosureGuard guard(done);
- Status st = _tablet_fetch_data(request, response);
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
+ brpc::ClosureGuard guard(done);
+ Status st = _tablet_fetch_data(request, response);
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is to
get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st =
-
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is
to get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->fetch(request, result);
+ bool ret = _heavy_work_pool.offer([this, request, result, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->fetch(request, result);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->set_status(PCacheStatus::CANCELED);
Review Comment:
warning: no member named 'CANCELED' in 'doris::PCacheStatus'
[clang-diagnostic-error]
```cpp
result->set_status(PCacheStatus::CANCELED);
^
```
##########
be/src/service/internal_service.cpp:
##########
@@ -423,200 +530,278 @@
const PTabletKeyLookupRequest*
request,
PTabletKeyLookupResponse*
response,
google::protobuf::Closure* done) {
- [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
- brpc::ClosureGuard guard(done);
- Status st = _tablet_fetch_data(request, response);
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
+ brpc::ClosureGuard guard(done);
+ Status st = _tablet_fetch_data(request, response);
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is to
get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st =
-
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is
to get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->fetch(request, result);
+ bool ret = _heavy_work_pool.offer([this, request, result, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->fetch(request, result);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::clear_cache(google::protobuf::RpcController*
controller,
const PClearCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->clear(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->clear(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
Review Comment:
warning: no member named 'CANCELED' in 'doris::PCacheStatus'
[clang-diagnostic-error]
```cpp
response->set_status(PCacheStatus::CANCELED);
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]