This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a85f46ff33 [refactor](move-memtable) rename open_stream_sink rpc to
open_load_stream (#25883)
6a85f46ff33 is described below
commit 6a85f46ff33a6480d6ffe2b27f7fe8422d0e85fd
Author: Kaijie Chen <[email protected]>
AuthorDate: Sun Oct 29 10:07:14 2023 +0800
[refactor](move-memtable) rename open_stream_sink rpc to open_load_stream
(#25883)
---
be/src/common/config.cpp | 4 ++--
be/src/common/config.h | 4 ++--
be/src/service/internal_service.cpp | 4 ++--
be/src/service/internal_service.h | 2 +-
be/src/vec/sink/load_stream_stub.cpp | 6 +++---
be/src/vec/sink/load_stream_stub.h | 2 +-
be/test/runtime/load_stream_test.cpp | 10 +++++-----
gensrc/proto/internal_service.proto | 2 +-
8 files changed, 17 insertions(+), 17 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5b6e393feed..deb99682fed 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -744,8 +744,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio,
"0.1");
DEFINE_Bool(share_delta_writers, "true");
// number of brpc stream per load
DEFINE_Int32(num_streams_per_load, "5");
-// timeout for open stream sink rpc in ms
-DEFINE_Int64(open_stream_sink_timeout_ms, "500");
+// timeout for open load stream rpc in ms
+DEFINE_Int64(open_load_stream_timeout_ms, "500");
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 71c0a1e12c4..9855ba0ffec 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -801,8 +801,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
DECLARE_Bool(share_delta_writers);
// number of brpc stream per load
DECLARE_Int32(num_streams_per_load);
-// timeout for open stream sink rpc in ms
-DECLARE_Int64(open_stream_sink_timeout_ms);
+// timeout for open load stream rpc in ms
+DECLARE_Int64(open_load_stream_timeout_ms);
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 611e45edc1e..8f9ed8c0d09 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -355,7 +355,7 @@ void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
}
}
-void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController*
controller,
const POpenStreamSinkRequest*
request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) {
@@ -365,7 +365,7 @@ void
PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* con
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;
- LOG(INFO) << "open stream sink, load_id = " << request->load_id()
+ LOG(INFO) << "open load stream, load_id = " << request->load_id()
<< ", src_id = " << request->src_id();
for (const auto& req : request->tablets()) {
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index db0ee07581e..d5712e654cf 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -92,7 +92,7 @@ public:
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) override;
- void open_stream_sink(google::protobuf::RpcController* controller,
+ void open_load_stream(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) override;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 821c46dc433..60ef352ddbb 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -101,7 +101,7 @@ LoadStreamStub::~LoadStreamStub() {
}
}
-// open_stream_sink
+// open_load_stream
// tablets means
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
const NodeInfo& node_info, int64_t txn_id,
@@ -124,7 +124,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
return Status::Error<true>(ret, "Failed to create stream");
}
- cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
+ cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenStreamSinkRequest request;
*request.mutable_load_id() = _load_id;
request.set_src_id(_src_id);
@@ -138,7 +138,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
// use "pooled" connection to avoid conflicts between streaming rpc and
regular rpc,
// see: https://github.com/apache/brpc/issues/392
const auto& stub = client_cache->get_new_client_no_cache(host_port,
"baidu_std", "pooled");
- stub->open_stream_sink(&cntl, &request, &response, nullptr);
+ stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
tablet_schema->init_from_pb(resp.tablet_schema());
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index bd3ddec0d65..60a48427618 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -145,7 +145,7 @@ public:
#endif
~LoadStreamStub();
- // open_stream_sink
+ // open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, bool
enable_profile);
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index 043c9a10d74..c733d787222 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -347,7 +347,7 @@ public:
StreamService(LoadStreamMgr* load_stream_mgr)
: _sd(brpc::INVALID_STREAM_ID),
_load_stream_mgr(load_stream_mgr) {}
virtual ~StreamService() { brpc::StreamClose(_sd); };
- virtual void open_stream_sink(google::protobuf::RpcController*
controller,
+ virtual void open_load_stream(google::protobuf::RpcController*
controller,
const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) {
@@ -453,11 +453,11 @@ public:
auto ptablet = request.add_tablets();
ptablet->set_tablet_id(NORMAL_TABLET_ID);
ptablet->set_index_id(NORMAL_INDEX_ID);
- stub.open_stream_sink(&_cntl, &request, &response, nullptr);
+ stub.open_load_stream(&_cntl, &request, &response, nullptr);
if (_cntl.Failed()) {
- std::cerr << "open_stream_sink failed" << std::endl;
- LOG(ERROR) << "Fail to open stream sink " << _cntl.ErrorText();
- return Status::InternalError("Fail to open stream sink");
+ std::cerr << "open_load_stream failed" << std::endl;
+ LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText();
+ return Status::InternalError("Fail to open load stream");
}
return Status::OK();
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 8f0dc34e9d8..5881e5bf2a4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -796,7 +796,7 @@ service PBackendService {
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns
(PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns
(PTabletWriterOpenResult);
- rpc open_stream_sink(POpenStreamSinkRequest) returns
(POpenStreamSinkResponse);
+ rpc open_load_stream(POpenStreamSinkRequest) returns
(POpenStreamSinkResponse);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_add_block_by_http(PEmptyRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns
(PTabletWriterCancelResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]