github-actions[bot] commented on code in PR #18828:
URL: https://github.com/apache/doris/pull/18828#discussion_r1172292015
##########
be/src/runtime/fragment_mgr.h:
##########
@@ -128,6 +128,8 @@ class FragmentMgr : public RestMonitorIface {
Status apply_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
+ Status apply_filterv2(const PPublishFilterRequestV2* request, const char*
data);
Review Comment:
warning: unknown type name 'PPublishFilterRequestV2'; did you mean
'PPublishFilterRequest'? [clang-diagnostic-error]
```suggestion
Status apply_filterv2(const PPublishFilterRequest* request, const char*
data);
```
**be/src/runtime/fragment_mgr.h:56:** 'PPublishFilterRequest' declared here
```cpp
class PPublishFilterRequest;
^
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1152,10 +1152,53 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
return runtime_filter_mgr->update_filter(request, attach_data);
}
+Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
const char* data) {
Review Comment:
warning: out-of-line definition of 'apply_filterv2' does not match any
declaration in 'doris::FragmentMgr' [clang-diagnostic-error]
```cpp
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
const char* data) {
^
```
##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -79,6 +79,8 @@ class RuntimeFilterMgr {
// update filter by remote
Status update_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* data);
+ Status update_filter(const PPublishFilterRequestV2* request, const char*
data,
Review Comment:
warning: unknown type name 'PPublishFilterRequestV2'; did you mean
'PPublishFilterRequest'? [clang-diagnostic-error]
```suggestion
Status update_filter(const PPublishFilterRequest* request, const char*
data,
```
**be/src/runtime/runtime_filter_mgr.h:42:** 'PPublishFilterRequest' declared
here
```cpp
class PPublishFilterRequest;
^
```
##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -131,6 +132,17 @@ Status RuntimeFilterMgr::update_filter(const
PPublishFilterRequest* request,
return real_filter->update_filter(¶ms);
}
+Status RuntimeFilterMgr::update_filter(const PPublishFilterRequestV2* request,
Review Comment:
warning: out-of-line definition of 'update_filter' does not match any
declaration in 'doris::RuntimeFilterMgr' [clang-diagnostic-error]
```cpp
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequestV2*
request,
^
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1152,10 +1152,53 @@
return runtime_filter_mgr->update_filter(request, attach_data);
}
+Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
const char* data) {
+ bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
+ const auto& fragment_instance_ids = request->fragment_instance_ids();
+ for (size_t i = 0; i < fragment_instance_ids.size(); i++) {
+ UniqueId fragment_instance_id = fragment_instance_ids[i];
+ TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
+ std::shared_ptr<FragmentExecState> fragment_state;
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+ RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+ if (is_pipeline) {
+ std::unique_lock<std::mutex> lock(_lock);
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
+ }
+ pip_context = iter->second;
+
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr =
pip_context->get_runtime_state()->runtime_filter_mgr();
+ } else {
+ std::unique_lock<std::mutex> lock(_lock);
+ auto iter = _fragment_map.find(tfragment_instance_id);
+ if (iter == _fragment_map.end()) {
+ VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
+ }
+ fragment_state = iter->second;
+
+ DCHECK(fragment_state != nullptr);
+ runtime_filter_mgr =
fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+ }
+
+ // TODO: Update once per BE
+ RETURN_IF_ERROR(runtime_filter_mgr->update_filter(request, data,
fragment_instance_id));
Review Comment:
warning: no matching member function for call to 'update_filter'
[clang-diagnostic-error]
```cpp
RETURN_IF_ERROR(runtime_filter_mgr->update_filter(request, data,
fragment_instance_id));
^
```
**be/src/common/status.h:509:** expanded from macro 'RETURN_IF_ERROR'
```cpp
Status _status_ = (stmt); \
^
```
**be/src/runtime/runtime_filter_mgr.h:81:** candidate function not viable:
no known conversion from 'const doris::PPublishFilterRequestV2 *' to 'const
doris::PPublishFilterRequest *' for 1st argument
```cpp
Status update_filter(const PPublishFilterRequestV2* request, const char*
data,
^
```
**be/src/runtime/runtime_filter_mgr.h:79:** candidate function not viable:
requires 2 arguments, but 3 were provided
```cpp
Status update_filter(const PPublishFilterRequest* request,
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]