This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8d046a49ba5 [refactor](move-memtable) simplify LoadStreamStub::open
(#34488)
8d046a49ba5 is described below
commit 8d046a49ba5c4084a4933e9a8a8266179e5539d4
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed May 8 09:49:07 2024 +0800
[refactor](move-memtable) simplify LoadStreamStub::open (#34488)
---
be/src/vec/sink/load_stream_stub.cpp | 5 ++---
be/src/vec/sink/load_stream_stub.h | 5 ++---
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++------
3 files changed, 10 insertions(+), 12 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 78e1bc691cc..155ce2de349 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -141,8 +141,7 @@ LoadStreamStub::~LoadStreamStub() {
}
// open_load_stream
-Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
- BrpcClientCache<PBackendService_Stub>*
client_cache,
+Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema,
int total_streams,
@@ -157,7 +156,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub>
self,
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = idle_timeout_ms;
opt.messages_in_batch = config::load_stream_messages_in_batch;
- opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
+ opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id,
shared_from_this());
brpc::Controller cntl;
if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
delete opt.handler;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 8ef40b84145..1f0d2e459d3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -104,7 +104,7 @@ private:
std::weak_ptr<LoadStreamStub> _stub;
};
-class LoadStreamStub {
+class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
friend class LoadStreamReplyHandler;
public:
@@ -125,8 +125,7 @@ public:
~LoadStreamStub();
// open_load_stream
- Status open(std::shared_ptr<LoadStreamStub> self,
- BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
+ Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int
total_streams,
int64_t idle_timeout_ms, bool enable_profile);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 603066154fb..f00626dd723 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -278,15 +278,15 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, Streams& stream
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
- RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
- *node_info, _txn_id, *_schema,
tablets_for_schema,
- _total_streams, idle_timeout_ms,
_state->enable_profile()));
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
+ _txn_id, *_schema, tablets_for_schema,
_total_streams,
+ idle_timeout_ms,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams | std::ranges::views::drop(1)) {
- RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
- *node_info, _txn_id, *_schema, {},
_total_streams,
- idle_timeout_ms,
_state->enable_profile()));
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
+ _txn_id, *_schema, {}, _total_streams,
idle_timeout_ms,
+ _state->enable_profile()));
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]