This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 329d57fdd7c [regression](move-memtable) test LoadStream
on_idle_timeout (#29354)
329d57fdd7c is described below
commit 329d57fdd7cc68304a74bb8c3389e693403b458d
Author: zhengyu <[email protected]>
AuthorDate: Wed Jan 3 14:07:51 2024 +0800
[regression](move-memtable) test LoadStream on_idle_timeout (#29354)
Signed-off-by: freemandealer <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/runtime/load_stream.cpp | 2 +-
be/src/service/internal_service.cpp | 3 +-
be/src/vec/sink/load_stream_stub.cpp | 5 ++
.../test_load_stream_fault_injection.groovy | 59 +++++++++++++++++++++-
5 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ddb235130bc..f3e8df44e17 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -777,7 +777,7 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
// idle timeout for load stream in ms
-DEFINE_Int64(load_stream_idle_timeout_ms, "600000");
+DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
// brpc streaming messages_in_batch
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 3aad2566575..1bc7e7b6637 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -454,7 +454,7 @@ Status LoadStream::_append_data(const PStreamHeader&
header, butil::IOBuf* data)
IndexStreamSharedPtr index_stream;
int64_t index_id = header.index_id();
- DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
+ DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
{ index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index a92775a6f6e..d27db896ccc 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -391,8 +391,7 @@ void
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
}
stream_options.handler = load_stream.get();
- // TODO : set idle timeout
- // stream_options.idle_timeout_ms =
+ stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
StreamId streamid;
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 9ad8d8805ca..939c88e0b65 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,6 +21,7 @@
#include "olap/rowset/rowset_writer.h"
#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
@@ -330,6 +331,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf)
{
int ret;
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
+ int64_t delay_ms = dp->param<int64>("delay_ms", 1000);
+ bthread_usleep(delay_ms * 1000);
+ });
ret = brpc::StreamWrite(_stream_id, buf);
}
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed",
{ ret = EPIPE; });
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
index 09f271fead5..f58c1226fd8 100644
---
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -67,6 +67,52 @@ suite("load_stream_fault_injection", "nonConcurrent") {
file "baseall.txt"
}
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string:[:]]
+
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
def load_with_injection = { injection, expect_errmsg ->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
@@ -110,10 +156,21 @@ suite("load_stream_fault_injection", "nonConcurrent") {
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.unknown_segid", "")
// LoadStream append_data meet unknown index id in request header
- load_with_injection("abletStream.add_segment.unknown_indexid", "")
+ load_with_injection("TabletStream._append_data.unknown_indexid", "")
// LoadStream dispatch meet unknown load id
load_with_injection("LoadStream._dispatch.unknown_loadid", "")
// LoadStream dispatch meet unknown src id
load_with_injection("LoadStream._dispatch.unknown_srcid", "")
+
+ // LoadStream meets StreamRPC idle timeout
+ get_be_param("load_stream_idle_timeout_ms")
+ set_be_param("load_stream_idle_timeout_ms", 500)
+ try {
+
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ } finally {
+ reset_be_param("load_stream_idle_timeout_ms")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]