This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 70e1c563b36 [Chore](runtime-filter) enlarge sync filter size rpc
timeout limit (#37103) (#37225)
70e1c563b36 is described below
commit 70e1c563b360b51e013624fb34ca1f175d2c4d78
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 3 21:02:26 2024 +0800
[Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103)
(#37225)
pick from #37103
---
be/src/common/config.cpp | 1 +
be/src/exprs/runtime_filter.cpp | 4 ++--
be/src/exprs/runtime_filter.h | 2 +-
be/src/exprs/runtime_filter_slots.h | 2 +-
regression-test/suites/query_p0/join/test_join5.groovy | 1 +
5 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fe811165c17..f276487d152 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -246,6 +246,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size,
"102400");
// default thrift client connect timeout(in seconds)
DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
+
// default thrift client retry interval (in milliseconds)
DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
// max message size of thrift request
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1271ec39156..3f8a19f1b16 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1054,7 +1054,7 @@ public:
: Base(req, callback), _dependency(std::move(dependency)) {}
};
-Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
+Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t
local_filter_size) {
DCHECK(is_producer());
if (_need_local_merge) {
@@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t
local_filter_size) {
request->set_filter_size(local_filter_size);
request->set_filter_id(_filter_id);
- callback->cntl_->set_timeout_ms(wait_time_ms());
+ callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout())
* 1000);
stub->send_filter_size(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(),
closure.get());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ee6897be322..e8c5bbfd872 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -230,7 +230,7 @@ public:
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);
- Status send_filter_size(uint64_t local_filter_size);
+ Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
RuntimeFilterType type() const { return _runtime_filter_type; }
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index b5b04a1ebac..0bf8a33f9f2 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -55,7 +55,7 @@ public:
// send_filter_size may call dependency->sub(), so we call
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter->need_sync_filter_size()) {
-
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+ RETURN_IF_ERROR(runtime_filter->send_filter_size(state,
hash_table_size));
}
}
return Status::OK();
diff --git a/regression-test/suites/query_p0/join/test_join5.groovy
b/regression-test/suites/query_p0/join/test_join5.groovy
index 62be496372d..4323575870f 100644
--- a/regression-test/suites/query_p0/join/test_join5.groovy
+++ b/regression-test/suites/query_p0/join/test_join5.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_join5", "query,p0") {
+ sql "set runtime_filter_wait_time_ms = 5"
def DBname = "regression_test_join5"
sql "DROP DATABASE IF EXISTS ${DBname}"
sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]