This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new cf2e2113db8 branch-2.1: [fix](move-memtable) tolerate non-open streams
in close wait #44680 (#45154)
cf2e2113db8 is described below
commit cf2e2113db89ebdf21569092cd007c109d7d6c65
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Dec 8 11:35:30 2024 +0800
branch-2.1: [fix](move-memtable) tolerate non-open streams in close wait
#44680 (#45154)
Cherry-picked from #44680
Co-authored-by: Kaijie Chen <[email protected]>
---
be/src/vec/sink/load_stream_stub.cpp | 16 ++++++----------
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 +++++++++---
.../test_multi_replica_fault_injection.groovy | 13 ++++++++-----
3 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 672a0be44f7..e28345f1c93 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
add_failed_tablet(tablet_id, _status);
return _status;
}
- DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
- if (segment_id != 0) {
- return Status::OK();
- }
- });
+ DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK();
});
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@@ -245,11 +241,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
add_failed_tablet(tablet_id, _status);
return _status;
}
- DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
- if (segment_id != 0) {
- return Status::OK();
- }
- });
+ DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK();
});
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@@ -339,6 +331,10 @@ Status LoadStreamStub::wait_for_schema(int64_t
partition_id, int64_t index_id, i
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
+ if (!_is_open.load()) {
+ // we don't need to close wait on non-open streams
+ return Status::OK();
+ }
if (!_is_closing.load()) {
return _status;
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index ea877a803c9..c7cb4885bcd 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -268,14 +268,20 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
}
Status VTabletWriterV2::_open_streams() {
- bool fault_injection_skip_be = true;
+ int fault_injection_skip_be = 0;
bool any_backend = false;
bool any_success = false;
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = _load_stream_map->get_or_create(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
- if (fault_injection_skip_be) {
- fault_injection_skip_be = false;
+ if (fault_injection_skip_be < 1) {
+ fault_injection_skip_be++;
+ continue;
+ }
+ });
+ DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
+ if (fault_injection_skip_be < 2) {
+ fault_injection_skip_be++;
continue;
}
});
diff --git
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 2f6afd5ca69..d09983d52d0 100644
---
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -75,14 +75,15 @@ suite("test_multi_replica_fault_injection",
"nonConcurrent") {
file "baseall.txt"
}
- def load_with_injection = { injection, error_msg->
+ def load_with_injection = { injection, error_msg, success=false->
try {
sql "truncate table test"
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
+ assertTrue(success, String.format("Expected Exception '%s',
actual success", error_msg))
} catch(Exception e) {
logger.info(e.getMessage())
- assertTrue(e.getMessage().contains(error_msg))
+ assertTrue(e.getMessage().contains(error_msg), e.toString())
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
@@ -90,15 +91,17 @@ suite("test_multi_replica_fault_injection",
"nonConcurrent") {
// StreamSinkFileWriter appendv write segment failed one replica
// success
-
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
"sucess")
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
"sucess", true)
// StreamSinkFileWriter appendv write segment failed two replica
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
"add segment failed")
// StreamSinkFileWriter appendv write segment failed all replica
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
"failed to send segment data to any replicas")
// test segment num check when LoadStreamStub missed tail segments
- load_with_injection("LoadStreamStub.only_send_segment_0", "segment num
mismatch")
+ load_with_injection("LoadStreamStub.skip_send_segment", "segment num
mismatch")
// test one backend open failure
- load_with_injection("VTabletWriterV2._open_streams.skip_one_backend",
"success")
+ load_with_injection("VTabletWriterV2._open_streams.skip_one_backend",
"success", true)
+ // test two backend open failure
+ load_with_injection("VTabletWriterV2._open_streams.skip_two_backends",
"not enough streams 1/3")
sql """ set enable_memtable_on_sink_node=false """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]