This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5806dae467e [fix](move-memtable) do not retry open streams (#41550)
(#41999)
5806dae467e is described below
commit 5806dae467ea2ba33e095e1d5abe02a6d967a3dc
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Oct 17 15:56:56 2024 +0800
[fix](move-memtable) do not retry open streams (#41550) (#41999)
backport #41550
---
be/src/vec/sink/load_stream_stub.cpp | 60 ++++++++++++++--------------
be/src/vec/sink/load_stream_stub.h | 6 +--
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +-
3 files changed, 33 insertions(+), 35 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index e29d64118b9..1d13ca4b903 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t
src_id,
_is_incremental(incremental) {};
LoadStreamStub::~LoadStreamStub() {
- if (_is_init.load() && !_is_closed.load()) {
+ if (_is_open.load() && !_is_closed.load()) {
auto ret = brpc::StreamClose(_stream_id);
LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ?
"success" : "failed");
}
@@ -149,8 +149,9 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
int64_t idle_timeout_ms, bool enable_profile) {
std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
- return _init_st;
+ return _status;
}
+ _is_init.store(true);
_dst_id = node_info.id;
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
@@ -160,8 +161,8 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
brpc::Controller cntl;
if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
delete opt.handler;
- _init_st = Status::Error<true>(ret, "Failed to create stream");
- return _init_st;
+ _status = Status::Error<true>(ret, "Failed to create stream");
+ return _status;
}
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenLoadStreamRequest request;
@@ -174,8 +175,8 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
} else if (total_streams > 0) {
request.set_total_streams(total_streams);
} else {
- _init_st = Status::InternalError("total_streams should be greator than
0");
- return _init_st;
+ _status = Status::InternalError("total_streams should be greator than
0");
+ return _status;
}
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
@@ -199,13 +200,13 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
if (cntl.Failed()) {
brpc::StreamClose(_stream_id);
- _init_st = Status::InternalError("Failed to connect to backend {}:
{}", _dst_id,
- cntl.ErrorText());
- return _init_st;
+ _status = Status::InternalError("Failed to connect to backend {}: {}",
_dst_id,
+ cntl.ErrorText());
+ return _status;
}
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" <<
node_info.brpc_port
<< ", " << *this;
- _is_init.store(true);
+ _is_open.store(true);
return Status::OK();
}
@@ -213,9 +214,9 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
bool segment_eos) {
- if (!_is_init.load()) {
- add_failed_tablet(tablet_id, _init_st);
- return _init_st;
+ if (!_is_open.load()) {
+ add_failed_tablet(tablet_id, _status);
+ return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
@@ -239,9 +240,9 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, const
SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
- if (!_is_init.load()) {
- add_failed_tablet(tablet_id, _init_st);
- return _init_st;
+ if (!_is_open.load()) {
+ add_failed_tablet(tablet_id, _status);
+ return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
@@ -265,8 +266,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
- if (!_is_init.load()) {
- return _init_st;
+ if (!_is_open.load()) {
+ return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
@@ -275,10 +276,10 @@ Status LoadStreamStub::close_load(const
std::vector<PTabletID>& tablets_to_commi
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
- _close_st = _encode_and_send(header);
- if (!_close_st.ok()) {
- LOG(WARNING) << "stream " << _stream_id << " close failed: " <<
_close_st;
- return _close_st;
+ _status = _encode_and_send(header);
+ if (!_status.ok()) {
+ LOG(WARNING) << "stream " << _stream_id << " close failed: " <<
_status;
+ return _status;
}
_is_closing.store(true);
return Status::OK();
@@ -286,8 +287,8 @@ Status LoadStreamStub::close_load(const
std::vector<PTabletID>& tablets_to_commi
// GET_SCHEMA
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
- if (!_is_init.load()) {
- return _init_st;
+ if (!_is_open.load()) {
+ return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
@@ -309,8 +310,8 @@ Status LoadStreamStub::get_schema(const
std::vector<PTabletID>& tablets) {
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t timeout_ms) {
- if (!_is_init.load()) {
- return _init_st;
+ if (!_is_open.load()) {
+ return _status;
}
if (_tablet_schema_for_index->contains(index_id)) {
return Status::OK();
@@ -337,11 +338,8 @@ Status LoadStreamStub::wait_for_schema(int64_t
partition_id, int64_t index_id, i
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
- if (!_is_init.load()) {
- return _init_st;
- }
if (!_is_closing.load()) {
- return _close_st;
+ return _status;
}
if (_is_closed.load()) {
return _check_cancel();
@@ -370,7 +368,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state,
int64_t timeout_ms) {
void LoadStreamStub::cancel(Status reason) {
LOG(WARNING) << *this << " is cancelled because of " << reason;
- if (_is_init.load()) {
+ if (_is_open.load()) {
brpc::StreamClose(_stream_id);
}
{
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 048cf4dc60f..223babb42e3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -195,7 +195,7 @@ public:
int64_t dst_id() const { return _dst_id; }
- bool is_inited() const { return _is_init.load(); }
+ bool is_open() const { return _is_open.load(); }
bool is_incremental() const { return _is_incremental; }
@@ -231,6 +231,7 @@ private:
protected:
std::atomic<bool> _is_init;
+ std::atomic<bool> _is_open;
std::atomic<bool> _is_closing;
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
@@ -240,8 +241,7 @@ protected:
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
- Status _init_st = Status::InternalError<false>("Stream is not open");
- Status _close_st;
+ Status _status = Status::InternalError<false>("Stream is not open");
Status _cancel_st;
bthread::Mutex _open_mutex;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 273430c7948..16c11b1cf42 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -389,7 +389,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id,
index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
auto stream = _load_stream_map->at(node_id)->at(_stream_index);
- for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) {
+ for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
stream = _load_stream_map->at(node_id)->at((_stream_index + i) %
_stream_per_node);
}
streams.emplace_back(std::move(stream));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]