This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a85f46ff33 [refactor](move-memtable) rename open_stream_sink rpc to 
open_load_stream (#25883)
6a85f46ff33 is described below

commit 6a85f46ff33a6480d6ffe2b27f7fe8422d0e85fd
Author: Kaijie Chen <[email protected]>
AuthorDate: Sun Oct 29 10:07:14 2023 +0800

    [refactor](move-memtable) rename open_stream_sink rpc to open_load_stream 
(#25883)
---
 be/src/common/config.cpp             |  4 ++--
 be/src/common/config.h               |  4 ++--
 be/src/service/internal_service.cpp  |  4 ++--
 be/src/service/internal_service.h    |  2 +-
 be/src/vec/sink/load_stream_stub.cpp |  6 +++---
 be/src/vec/sink/load_stream_stub.h   |  2 +-
 be/test/runtime/load_stream_test.cpp | 10 +++++-----
 gensrc/proto/internal_service.proto  |  2 +-
 8 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5b6e393feed..deb99682fed 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -744,8 +744,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, 
"0.1");
 DEFINE_Bool(share_delta_writers, "true");
 // number of brpc stream per load
 DEFINE_Int32(num_streams_per_load, "5");
-// timeout for open stream sink rpc in ms
-DEFINE_Int64(open_stream_sink_timeout_ms, "500");
+// timeout for open load stream rpc in ms
+DEFINE_Int64(open_load_stream_timeout_ms, "500");
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 71c0a1e12c4..9855ba0ffec 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -801,8 +801,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 DECLARE_Bool(share_delta_writers);
 // number of brpc stream per load
 DECLARE_Int32(num_streams_per_load);
-// timeout for open stream sink rpc in ms
-DECLARE_Int64(open_stream_sink_timeout_ms);
+// timeout for open load stream rpc in ms
+DECLARE_Int64(open_load_stream_timeout_ms);
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 611e45edc1e..8f9ed8c0d09 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -355,7 +355,7 @@ void 
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
     }
 }
 
-void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* 
controller,
                                             const POpenStreamSinkRequest* 
request,
                                             POpenStreamSinkResponse* response,
                                             google::protobuf::Closure* done) {
@@ -365,7 +365,7 @@ void 
PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* con
         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
         brpc::StreamOptions stream_options;
 
-        LOG(INFO) << "open stream sink, load_id = " << request->load_id()
+        LOG(INFO) << "open load stream, load_id = " << request->load_id()
                   << ", src_id = " << request->src_id();
 
         for (const auto& req : request->tablets()) {
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index db0ee07581e..d5712e654cf 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -92,7 +92,7 @@ public:
                             PTabletWriterOpenResult* response,
                             google::protobuf::Closure* done) override;
 
-    void open_stream_sink(google::protobuf::RpcController* controller,
+    void open_load_stream(google::protobuf::RpcController* controller,
                           const POpenStreamSinkRequest* request, 
POpenStreamSinkResponse* response,
                           google::protobuf::Closure* done) override;
 
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 821c46dc433..60ef352ddbb 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -101,7 +101,7 @@ LoadStreamStub::~LoadStreamStub() {
     }
 }
 
-// open_stream_sink
+// open_load_stream
 // tablets means
 Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
@@ -124,7 +124,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
         return Status::Error<true>(ret, "Failed to create stream");
     }
-    cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
+    cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
     POpenStreamSinkRequest request;
     *request.mutable_load_id() = _load_id;
     request.set_src_id(_src_id);
@@ -138,7 +138,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     // use "pooled" connection to avoid conflicts between streaming rpc and 
regular rpc,
     // see: https://github.com/apache/brpc/issues/392
     const auto& stub = client_cache->get_new_client_no_cache(host_port, 
"baidu_std", "pooled");
-    stub->open_stream_sink(&cntl, &request, &response, nullptr);
+    stub->open_load_stream(&cntl, &request, &response, nullptr);
     for (const auto& resp : response.tablet_schemas()) {
         auto tablet_schema = std::make_unique<TabletSchema>();
         tablet_schema->init_from_pb(resp.tablet_schema());
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index bd3ddec0d65..60a48427618 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -145,7 +145,7 @@ public:
 #endif
             ~LoadStreamStub();
 
-    // open_stream_sink
+    // open_load_stream
     Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
                 const std::vector<PTabletID>& tablets_for_schema, bool 
enable_profile);
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index 043c9a10d74..c733d787222 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -347,7 +347,7 @@ public:
         StreamService(LoadStreamMgr* load_stream_mgr)
                 : _sd(brpc::INVALID_STREAM_ID), 
_load_stream_mgr(load_stream_mgr) {}
         virtual ~StreamService() { brpc::StreamClose(_sd); };
-        virtual void open_stream_sink(google::protobuf::RpcController* 
controller,
+        virtual void open_load_stream(google::protobuf::RpcController* 
controller,
                                       const POpenStreamSinkRequest* request,
                                       POpenStreamSinkResponse* response,
                                       google::protobuf::Closure* done) {
@@ -453,11 +453,11 @@ public:
             auto ptablet = request.add_tablets();
             ptablet->set_tablet_id(NORMAL_TABLET_ID);
             ptablet->set_index_id(NORMAL_INDEX_ID);
-            stub.open_stream_sink(&_cntl, &request, &response, nullptr);
+            stub.open_load_stream(&_cntl, &request, &response, nullptr);
             if (_cntl.Failed()) {
-                std::cerr << "open_stream_sink failed" << std::endl;
-                LOG(ERROR) << "Fail to open stream sink " << _cntl.ErrorText();
-                return Status::InternalError("Fail to open stream sink");
+                std::cerr << "open_load_stream failed" << std::endl;
+                LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText();
+                return Status::InternalError("Fail to open load stream");
             }
 
             return Status::OK();
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 8f0dc34e9d8..5881e5bf2a4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -796,7 +796,7 @@ service PBackendService {
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
-    rpc open_stream_sink(POpenStreamSinkRequest) returns 
(POpenStreamSinkResponse);
+    rpc open_load_stream(POpenStreamSinkRequest) returns 
(POpenStreamSinkResponse);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_add_block_by_http(PEmptyRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to