This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa5e7008d5d [Chore](runtime-filter) enlarge sync filter size rpc
timeout limit (#37103)
fa5e7008d5d is described below
commit fa5e7008d5d918f30e0580ce7bda03512b3d6aec
Author: Pxl <[email protected]>
AuthorDate: Tue Jul 2 14:16:39 2024 +0800
[Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103)
## Proposed changes
enlarge sync filter size rpc timeout limit
rf will failed when rpc timeout, so we need enlarge limit
```
sync filter size meet error, filter: RuntimeFilter: (id = 3, type =
in_or_bloomfilter, need_local_merge: false, is_broadcast: false,
build_bf_cardinality: true
```
---
be/src/common/config.cpp | 1 +
be/src/exprs/runtime_filter.cpp | 4 ++--
be/src/exprs/runtime_filter.h | 2 +-
be/src/exprs/runtime_filter_slots.h | 2 +-
regression-test/suites/query_p0/join/test_join5.groovy | 1 +
5 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 78afc756af8..92303473ad6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -255,6 +255,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size,
"102400");
// default thrift client connect timeout(in seconds)
DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
+
// default thrift client retry interval (in milliseconds)
DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
// max message size of thrift request
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e38a80a143e..2dcfc97b096 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1054,7 +1054,7 @@ public:
: Base(req, callback), _dependency(std::move(dependency)) {}
};
-Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
+Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t
local_filter_size) {
DCHECK(is_producer());
if (_need_local_merge) {
@@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t
local_filter_size) {
request->set_filter_size(local_filter_size);
request->set_filter_id(_filter_id);
- callback->cntl_->set_timeout_ms(wait_time_ms());
+ callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout())
* 1000);
stub->send_filter_size(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(),
closure.get());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a78d732b687..390a61bfe1a 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -229,7 +229,7 @@ public:
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);
- Status send_filter_size(uint64_t local_filter_size);
+ Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
RuntimeFilterType type() const { return _runtime_filter_type; }
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index b5b04a1ebac..0bf8a33f9f2 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -55,7 +55,7 @@ public:
// send_filter_size may call dependency->sub(), so we call
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter->need_sync_filter_size()) {
-
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+ RETURN_IF_ERROR(runtime_filter->send_filter_size(state,
hash_table_size));
}
}
return Status::OK();
diff --git a/regression-test/suites/query_p0/join/test_join5.groovy
b/regression-test/suites/query_p0/join/test_join5.groovy
index 62be496372d..4323575870f 100644
--- a/regression-test/suites/query_p0/join/test_join5.groovy
+++ b/regression-test/suites/query_p0/join/test_join5.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_join5", "query,p0") {
+ sql "set runtime_filter_wait_time_ms = 5"
def DBname = "regression_test_join5"
sql "DROP DATABASE IF EXISTS ${DBname}"
sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]