This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 32b36d3c9c7 [refactor](move-memtable) rename proto OpenStreamSink to
OpenLoadStream (#26527)
32b36d3c9c7 is described below
commit 32b36d3c9c73a34fb2308b879fdb62e9f054cad6
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 7 22:41:20 2023 +0800
[refactor](move-memtable) rename proto OpenStreamSink to OpenLoadStream
(#26527)
---
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/load_stream.h | 2 +-
be/src/runtime/load_stream_mgr.cpp | 2 +-
be/src/runtime/load_stream_mgr.h | 2 +-
be/src/service/internal_service.cpp | 4 ++--
be/src/service/internal_service.h | 2 +-
be/src/vec/sink/load_stream_stub.cpp | 4 ++--
be/test/runtime/load_stream_test.cpp | 8 ++++----
gensrc/proto/internal_service.proto | 6 +++---
9 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index daca91e978c..9d1722c1726 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -242,7 +242,7 @@ LoadStream::~LoadStream() {
LOG(INFO) << "load stream is deconstructed " << *this;
}
-Status LoadStream::init(const POpenStreamSinkRequest* request) {
+Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_schema = std::make_shared<OlapTableSchemaParam>();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 317dc510861..fe7d90d502e 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -106,7 +106,7 @@ public:
LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool
enable_profile);
~LoadStream();
- Status init(const POpenStreamSinkRequest* request);
+ Status init(const POpenLoadStreamRequest* request);
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
diff --git a/be/src/runtime/load_stream_mgr.cpp
b/be/src/runtime/load_stream_mgr.cpp
index d1af7b60d3f..b3553046aec 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -45,7 +45,7 @@ LoadStreamMgr::~LoadStreamMgr() {
_file_writer_thread_pool->shutdown();
}
-Status LoadStreamMgr::open_load_stream(const POpenStreamSinkRequest* request,
+Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request,
LoadStreamSharedPtr& load_stream) {
UniqueId load_id(request->load_id());
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index da3ae98a42e..30d4ed069c4 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -39,7 +39,7 @@ public:
FifoThreadPool* light_work_pool);
~LoadStreamMgr();
- Status open_load_stream(const POpenStreamSinkRequest* request,
+ Status open_load_stream(const POpenLoadStreamRequest* request,
LoadStreamSharedPtr& load_stream);
void clear_load(UniqueId loadid);
std::unique_ptr<ThreadPoolToken> new_token() {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index cfaef4895b3..dba01f1b668 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -355,8 +355,8 @@ void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
}
void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController*
controller,
- const POpenStreamSinkRequest*
request,
- POpenStreamSinkResponse* response,
+ const POpenLoadStreamRequest*
request,
+ POpenLoadStreamResponse* response,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
signal::set_signal_task_id(request->load_id());
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 3ef14b6c265..ca28c8b8b06 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -94,7 +94,7 @@ public:
google::protobuf::Closure* done) override;
void open_load_stream(google::protobuf::RpcController* controller,
- const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
+ const POpenLoadStreamRequest* request,
POpenLoadStreamResponse* response,
google::protobuf::Closure* done) override;
void tablet_writer_add_block(google::protobuf::RpcController* controller,
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 60ef352ddbb..a953747f27b 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -125,7 +125,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return Status::Error<true>(ret, "Failed to create stream");
}
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
- POpenStreamSinkRequest request;
+ POpenLoadStreamRequest request;
*request.mutable_load_id() = _load_id;
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
@@ -134,7 +134,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;
}
- POpenStreamSinkResponse response;
+ POpenLoadStreamResponse response;
// use "pooled" connection to avoid conflicts between streaming rpc and
regular rpc,
// see: https://github.com/apache/brpc/issues/392
const auto& stub = client_cache->get_new_client_no_cache(host_port,
"baidu_std", "pooled");
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index 17c8b3707ae..bdd0ace9a8b 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -348,8 +348,8 @@ public:
: _sd(brpc::INVALID_STREAM_ID),
_load_stream_mgr(load_stream_mgr) {}
virtual ~StreamService() { brpc::StreamClose(_sd); };
virtual void open_load_stream(google::protobuf::RpcController*
controller,
- const POpenStreamSinkRequest* request,
- POpenStreamSinkResponse* response,
+ const POpenLoadStreamRequest* request,
+ POpenLoadStreamResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::unique_ptr<PStatus> status = std::make_unique<PStatus>();
@@ -438,8 +438,8 @@ public:
return Status::InternalError("Fail to create stream");
}
- POpenStreamSinkRequest request;
- POpenStreamSinkResponse response;
+ POpenLoadStreamRequest request;
+ POpenLoadStreamResponse response;
PUniqueId id;
id.set_hi(1);
id.set_lo(1);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 99f4464fd59..9a36916317f 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -742,7 +742,7 @@ message PGroupCommitInsertResponse {
optional int64 filtered_rows = 5;
}
-message POpenStreamSinkRequest {
+message POpenLoadStreamRequest {
optional PUniqueId load_id = 1;
optional int64 txn_id = 2;
optional int64 src_id = 3;
@@ -757,7 +757,7 @@ message PTabletSchemaWithIndex {
optional bool enable_unique_key_merge_on_write = 3;
}
-message POpenStreamSinkResponse {
+message POpenLoadStreamResponse {
optional PStatus status = 1;
repeated PTabletSchemaWithIndex tablet_schemas = 2;
}
@@ -798,7 +798,7 @@ service PBackendService {
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns
(PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns
(PTabletWriterOpenResult);
- rpc open_load_stream(POpenStreamSinkRequest) returns
(POpenStreamSinkResponse);
+ rpc open_load_stream(POpenLoadStreamRequest) returns
(POpenLoadStreamResponse);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_add_block_by_http(PEmptyRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns
(PTabletWriterCancelResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]