This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ef25bb10 [enhancement](load) lazy-open necessary partitions when 
load (#18874)
f8ef25bb10 is described below

commit f8ef25bb10b9e9c1318939e8678506f019848edc
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun May 14 16:09:55 2023 +0800

    [enhancement](load) lazy-open necessary partitions when load (#18874)
---
 be/src/common/config.cpp                         |   3 +
 be/src/common/config.h                           |   3 +
 be/src/runtime/load_channel.cpp                  |  26 +++-
 be/src/runtime/load_channel.h                    |   4 +
 be/src/runtime/load_channel_mgr.cpp              |  13 ++
 be/src/runtime/load_channel_mgr.h                |   2 +
 be/src/runtime/tablets_channel.cpp               | 177 ++++++++++++++++++++---
 be/src/runtime/tablets_channel.h                 |  12 +-
 be/src/service/internal_service.cpp              |  23 +++
 be/src/service/internal_service.h                |   4 +
 be/src/vec/sink/vtablet_sink.cpp                 | 100 ++++++++++++-
 be/src/vec/sink/vtablet_sink.h                   |   9 ++
 be/test/vec/exec/vtablet_sink_test.cpp           |   8 +
 docs/en/docs/admin-manual/config/be-config.md    |   6 +
 docs/zh-CN/docs/admin-manual/config/be-config.md |   6 +
 gensrc/proto/internal_service.proto              |  11 ++
 16 files changed, 380 insertions(+), 27 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 38cbcfa51a..0d316afccb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -447,6 +447,9 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 // the timeout of a rpc to open the tablet writer in remote BE.
 // short operation time, can set a short timeout
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
+// The configuration is used to enable lazy open feature, and the default 
value is true.
+// When there is mixed deployment in the upgraded version, it needs to be set 
to false.
+DEFINE_mBool(enable_lazy_open_partition, "true");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "false");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 614bd272f6..f4fbbdcde0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -481,6 +481,9 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 // the timeout of a rpc to open the tablet writer in remote BE.
 // short operation time, can set a short timeout
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
+// The configuration is used to enable lazy open feature, and the default 
value is true.
+// When there is mixed deployment in the upgraded version, it needs to be set 
to false.
+DECLARE_mBool(enable_lazy_open_partition);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 2c047e912c..e26564962a 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -69,7 +69,8 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _load_id, _is_high_priority, 
_self_profile));
+            channel = std::make_shared<TabletsChannel>(key, _load_id, 
_is_high_priority,
+                                                       _self_profile);
             {
                 std::lock_guard<SpinLock> l(_tablets_channels_lock);
                 _tablets_channels.insert({index_id, channel});
@@ -84,6 +85,29 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
     return Status::OK();
 }
 
+Status LoadChannel::open_partition(const OpenPartitionRequest& params) {
+    int64_t index_id = params.index_id();
+    std::shared_ptr<TabletsChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it != _tablets_channels.end()) {
+            channel = it->second;
+        } else {
+            // create a new tablets channel
+            TabletsChannelKey key(params.id(), index_id);
+            channel = std::make_shared<TabletsChannel>(key, _load_id, 
_is_high_priority,
+                                                       _self_profile);
+            {
+                std::lock_guard<SpinLock> l(_tablets_channels_lock);
+                _tablets_channels.insert({index_id, channel});
+            }
+        }
+    }
+    RETURN_IF_ERROR(channel->open_all_writers_for_partition(params));
+    return Status::OK();
+}
+
 Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& 
channel,
                                          bool& is_finished, const int64_t 
index_id) {
     std::lock_guard<std::mutex> l(_lock);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index e57cd59407..3445786e58 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -41,10 +41,12 @@
 #include "util/spinlock.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
+//#include <gen_cpp/internal_service.pb.h>
 
 namespace doris {
 
 class PTabletWriterOpenRequest;
+class OpenPartitionRequest;
 
 // A LoadChannel manages tablets channels for all indexes
 // corresponding to a certain load job
@@ -57,6 +59,8 @@ public:
     // open a new load channel if not exist
     Status open(const PTabletWriterOpenRequest& request);
 
+    Status open_partition(const OpenPartitionRequest& params);
+
     // this batch must belong to a index in one transaction
     Status add_batch(const PTabletWriterAddBlockRequest& request,
                      PTabletWriterAddBlockResult* response);
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 424aca7bcf..524e04b0b8 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -134,6 +134,19 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
     return Status::OK();
 }
 
+Status LoadChannelMgr::open_partition(const OpenPartitionRequest& params) {
+    UniqueId load_id(params.id());
+    std::shared_ptr<LoadChannel> channel;
+    auto it = _load_channels.find(load_id);
+    if (it != _load_channels.end()) {
+        channel = it->second;
+    } else {
+        return Status::InternalError("unknown load id, load id=" + 
load_id.to_string());
+    }
+    RETURN_IF_ERROR(channel->open_partition(params));
+    return Status::OK();
+}
+
 static void dummy_deleter(const CacheKey& key, void* value) {}
 
 Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& 
channel, bool& is_eof,
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index cad611b75b..663a395500 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -54,6 +54,8 @@ public:
     // open a new load channel if not exist
     Status open(const PTabletWriterOpenRequest& request);
 
+    Status open_partition(const OpenPartitionRequest& params);
+
     Status add_batch(const PTabletWriterAddBlockRequest& request,
                      PTabletWriterAddBlockResult* response);
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 36737b67e4..bcbc956ba3 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -107,12 +107,22 @@ Status TabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
     _next_seqs.resize(_num_remaining_senders, 0);
     _closed_senders.Reset(_num_remaining_senders);
 
-    RETURN_IF_ERROR(_open_all_writers(request));
-
+    if (!config::enable_lazy_open_partition) {
+        RETURN_IF_ERROR(_open_all_writers(request));
+    } else {
+        _build_partition_tablets_relation(request);
+    }
     _state = kOpened;
     return Status::OK();
 }
 
+void TabletsChannel::_build_partition_tablets_relation(const 
PTabletWriterOpenRequest& request) {
+    for (auto& tablet : request.tablets()) {
+        
_partition_tablets_map[tablet.partition_id()].emplace_back(tablet.tablet_id());
+        _tablet_partition_map[tablet.tablet_id()] = tablet.partition_id();
+    }
+}
+
 Status TabletsChannel::close(
         LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished,
         const google::protobuf::RepeatedField<int64_t>& partition_ids,
@@ -270,6 +280,7 @@ int64_t TabletsChannel::mem_consumption() {
     return write_mem_usage + flush_mem_usage;
 }
 
+// Old logic,used for opening all writers of all partitions.
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
@@ -281,9 +292,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
         }
     }
     if (index_slots == nullptr) {
-        std::stringstream ss;
-        ss << "unknown index id, key=" << _key;
-        return Status::InternalError(ss.str());
+        Status::InternalError("unknown index id, key={}", _key.to_string());
     }
     for (auto& tablet : request.tablets()) {
         WriteRequest wrequest;
@@ -319,6 +328,117 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
     return Status::OK();
 }
 
+// Due to the use of non blocking operations,
+// the open partition rpc has not yet arrived when the actual write request 
arrives,
+// it is a logic to avoid delta writer not open.
+template <typename TabletWriterAddRequest>
+Status TabletsChannel::_open_all_writers_for_partition(const int64_t& 
tablet_id,
+                                                       const 
TabletWriterAddRequest& request) {
+    std::vector<SlotDescriptor*>* index_slots = nullptr;
+    int32_t schema_hash = 0;
+    for (auto& index : _schema->indexes()) {
+        if (index->index_id == _index_id) {
+            index_slots = &index->slots;
+            schema_hash = index->schema_hash;
+            break;
+        }
+    }
+    if (index_slots == nullptr) {
+        Status::InternalError("unknown index id, key={}", _key.to_string());
+    }
+    int64_t partition_id = _tablet_partition_map[tablet_id];
+    DCHECK(partition_id != 0);
+    auto& tablets = _partition_tablets_map[partition_id];
+    DCHECK(tablets.size() > 0);
+    VLOG_DEBUG << fmt::format(
+            "write tablet={}, open writers for all tablets in partition={}, 
total writers num={}",
+            tablet_id, partition_id, tablets.size());
+    for (auto& tablet : tablets) {
+        WriteRequest wrequest;
+        wrequest.index_id = request.index_id();
+        wrequest.tablet_id = tablet;
+        wrequest.schema_hash = schema_hash;
+        wrequest.write_type = WriteType::LOAD;
+        wrequest.txn_id = _txn_id;
+        wrequest.partition_id = partition_id;
+        wrequest.load_id = request.id();
+        wrequest.tuple_desc = _tuple_desc;
+        wrequest.slots = index_slots;
+        wrequest.is_high_priority = _is_high_priority;
+        wrequest.table_schema_param = _schema;
+
+        {
+            std::lock_guard<SpinLock> l(_tablet_writers_lock);
+
+            if (_tablet_writers.find(tablet) == _tablet_writers.end()) {
+                DeltaWriter* writer = nullptr;
+                auto st = DeltaWriter::open(&wrequest, &writer, _load_id);
+                if (!st.ok()) {
+                    auto err_msg = fmt::format(
+                            "open delta writer failed, tablet_id={}"
+                            ", txn_id={}, partition_id={}, err={}",
+                            tablet, _txn_id, partition_id, st.to_string());
+                    LOG(WARNING) << err_msg;
+                    return Status::InternalError(err_msg);
+                }
+                _tablet_writers.emplace(tablet, writer);
+                _s_tablet_writer_count += tablets.size();
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// The method will called by open partition rpc.
+Status TabletsChannel::open_all_writers_for_partition(const 
OpenPartitionRequest& request) {
+    std::vector<SlotDescriptor*>* index_slots = nullptr;
+    int32_t schema_hash = 0;
+    for (auto& index : _schema->indexes()) {
+        if (index->index_id == _index_id) {
+            index_slots = &index->slots;
+            schema_hash = index->schema_hash;
+            break;
+        }
+    }
+    if (index_slots == nullptr) {
+        Status::InternalError("unknown index id, key={}", _key.to_string());
+    }
+    for (auto& tablet : request.tablets()) {
+        WriteRequest wrequest;
+        wrequest.index_id = request.index_id();
+        wrequest.tablet_id = tablet.tablet_id();
+        wrequest.schema_hash = schema_hash;
+        wrequest.write_type = WriteType::LOAD;
+        wrequest.txn_id = _txn_id;
+        wrequest.partition_id = tablet.partition_id();
+        wrequest.load_id = request.id();
+        wrequest.tuple_desc = _tuple_desc;
+        wrequest.slots = index_slots;
+        wrequest.is_high_priority = _is_high_priority;
+        wrequest.table_schema_param = _schema;
+
+        {
+            std::lock_guard<SpinLock> l(_tablet_writers_lock);
+
+            if (_tablet_writers.find(tablet.tablet_id()) == 
_tablet_writers.end()) {
+                DeltaWriter* writer = nullptr;
+                auto st = DeltaWriter::open(&wrequest, &writer, _load_id);
+                if (!st.ok()) {
+                    auto err_msg = fmt::format(
+                            "open delta writer failed, tablet_id={}"
+                            ", txn_id={}, partition_id={}, err={}",
+                            tablet.tablet_id(), _txn_id, 
tablet.partition_id(), st.to_string());
+                    LOG(WARNING) << err_msg;
+                    return Status::InternalError(err_msg);
+                }
+                _tablet_writers.emplace(tablet.tablet_id(), writer);
+                _s_tablet_writer_count += _tablet_writers.size();
+            }
+        }
+    }
+    return Status::OK();
+}
+
 Status TabletsChannel::cancel() {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
@@ -388,23 +508,36 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
                                  std::function<Status(DeltaWriter * writer)> 
write_func) {
         google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
                 response->mutable_tablet_errors();
-        auto tablet_writer_it = _tablet_writers.find(tablet_id);
-        if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
-        }
-        Status st = write_func(tablet_writer_it->second);
-        if (!st.ok()) {
-            auto err_msg =
-                    fmt::format("tablet writer write failed, tablet_id={}, 
txn_id={}, err={}",
-                                tablet_id, _txn_id, st.to_string());
-            LOG(WARNING) << err_msg;
-            PTabletError* error = tablet_errors->Add();
-            error->set_tablet_id(tablet_id);
-            error->set_msg(err_msg);
-            tablet_writer_it->second->cancel_with_status(st);
-            _add_broken_tablet(tablet_id);
-            // continue write to other tablet.
-            // the error will return back to sender.
+        {
+            std::lock_guard<SpinLock> l(_tablet_writers_lock);
+            auto tablet_writer_it = _tablet_writers.find(tablet_id);
+            if (tablet_writer_it == _tablet_writers.end()) {
+                if (!config::enable_lazy_open_partition) {
+                    return Status::InternalError("unknown tablet to append 
data, tablet={}",
+                                                 tablet_id);
+                } else {
+                    RETURN_IF_ERROR(_open_all_writers_for_partition(tablet_id, 
request));
+                    tablet_writer_it = _tablet_writers.find(tablet_id);
+                    if (tablet_writer_it == _tablet_writers.end()) {
+                        return Status::InternalError("unknown tablet to append 
data, tablet={}",
+                                                     tablet_id);
+                    }
+                }
+            }
+            Status st = write_func(tablet_writer_it->second);
+            if (!st.ok()) {
+                auto err_msg =
+                        fmt::format("tablet writer write failed, tablet_id={}, 
txn_id={}, err={}",
+                                    tablet_id, _txn_id, st.to_string());
+                LOG(WARNING) << err_msg;
+                PTabletError* error = tablet_errors->Add();
+                error->set_tablet_id(tablet_id);
+                error->set_msg(err_msg);
+                tablet_writer_it->second->cancel_with_status(st);
+                _add_broken_tablet(tablet_id);
+                // continue write to other tablet.
+                // the error will return back to sender.
+            }
         }
         return Status::OK();
     };
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 3a897060a6..4cca4ac6ef 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -57,6 +57,7 @@ class PTabletInfo;
 class PTabletWriterOpenRequest;
 class PUniqueId;
 class TupleDescriptor;
+class OpenPartitionRequest;
 
 struct TabletsChannelKey {
     UniqueId id;
@@ -89,6 +90,9 @@ public:
 
     Status open(const PTabletWriterOpenRequest& request);
 
+    // Open specific partition all writers
+    Status open_all_writers_for_partition(const OpenPartitionRequest& request);
+
     // no-op when this channel has been closed or cancelled
     Status add_batch(const PTabletWriterAddBlockRequest& request,
                      PTabletWriterAddBlockResult* response);
@@ -124,16 +128,18 @@ private:
     template <typename Request>
     Status _get_current_seq(int64_t& cur_seq, const Request& request);
 
+    template <typename TabletWriterAddRequest>
+    Status _open_all_writers_for_partition(const int64_t& tablet_id,
+                                           const TabletWriterAddRequest& 
request);
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-    bool _try_to_wait_flushing();
-
     // deal with DeltaWriter close_wait(), add tablet to list for return.
     void _close_wait(DeltaWriter* writer,
                      google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
                      google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error,
                      PSlaveTabletNodes slave_tablet_nodes, const bool 
write_single_replica);
+    void _build_partition_tablets_relation(const PTabletWriterOpenRequest& 
request);
 
     void _add_broken_tablet(int64_t tablet_id);
     bool _is_broken_tablet(int64_t tablet_id);
@@ -170,6 +176,8 @@ private:
     // status to return when operate on an already closed/cancelled channel
     // currently it's OK.
     Status _close_status;
+    std::map<int64, std::vector<int64>> _partition_tablets_map;
+    std::map<int64, int64> _tablet_partition_map;
 
     // tablet_id -> TabletChannel
     std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index aee46ae6a4..80d02f6d58 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -245,6 +245,29 @@ void 
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
     }
 }
 
+void PInternalServiceImpl::open_partition(google::protobuf::RpcController* 
controller,
+                                          const OpenPartitionRequest* request,
+                                          OpenPartitionResult* response,
+                                          google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        VLOG_RPC << "partition open"
+                 << ", index_id=" << request->index_id();
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->load_channel_mgr()->open_partition(*request);
+        if (!st.ok()) {
+            LOG(WARNING) << "load channel open failed, message=" << st
+                         << ", index_ids=" << request->index_id();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
+}
+
 void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
controller,
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 22042196e2..dae16604ad 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -86,6 +86,10 @@ public:
                             PTabletWriterOpenResult* response,
                             google::protobuf::Closure* done) override;
 
+    void open_partition(google::protobuf::RpcController* controller,
+                        const OpenPartitionRequest* request, 
OpenPartitionResult* response,
+                        google::protobuf::Closure* done) override;
+
     void tablet_writer_add_block(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBlockRequest* request,
                                  PTabletWriterAddBlockResult* response,
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 29e5e7a4df..a80758833e 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -29,6 +29,7 @@
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
+#include <google/protobuf/stubs/common.h>
 #include <opentelemetry/nostd/shared_ptr.h>
 #include <sys/param.h>
 #include <sys/types.h>
@@ -51,6 +52,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
+#include "service/brpc.h"
 #include "util/binary_cast.hpp"
 #include "util/brpc_client_cache.h"
 #include "util/debug/sanitizer_scopes.h"
@@ -91,6 +93,40 @@ class TExpr;
 
 namespace stream_load {
 
+class OpenPartitionClosure : public google::protobuf::Closure {
+public:
+    OpenPartitionClosure(VNodeChannel* vnode_channel, IndexChannel* 
index_channel,
+                         int64_t partition_id)
+            : vnode_channel(vnode_channel),
+              index_channel(index_channel),
+              partition_id(partition_id) {};
+
+    ~OpenPartitionClosure() = default;
+
+    void Run() override {
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+        if (cntl.Failed()) {
+            std::stringstream ss;
+            ss << "failed to open partition, error=" << 
berror(this->cntl.ErrorCode())
+               << ", error_text=" << this->cntl.ErrorText();
+            LOG(WARNING) << ss.str() << " " << vnode_channel->channel_info();
+            vnode_channel->cancel("Open partition error");
+            index_channel->mark_as_failed(vnode_channel->node_id(), 
vnode_channel->host(),
+                                          fmt::format("{}, open failed, err: 
{}",
+                                                      
vnode_channel->channel_info(), ss.str()),
+                                          -1);
+        }
+    }
+
+    void join() { brpc::Join(cntl.call_id()); }
+
+    brpc::Controller cntl;
+    OpenPartitionResult result;
+    VNodeChannel* vnode_channel;
+    IndexChannel* index_channel;
+    int64_t partition_id;
+};
+
 IndexChannel::~IndexChannel() {
     if (_where_clause != nullptr) {
         _where_clause->close(_parent->_state);
@@ -353,9 +389,14 @@ Status VNodeChannel::open_wait() {
            << ", error_text=" << _open_closure->cntl.ErrorText();
         _cancelled = true;
         LOG(WARNING) << ss.str() << " " << channel_info();
+        auto error_code = _open_closure->cntl.ErrorCode();
+        auto error_text = _open_closure->cntl.ErrorText();
+        if (_open_closure->unref()) {
+            delete _open_closure;
+        }
+        _open_closure = nullptr;
         return Status::InternalError("failed to open tablet writer, error={}, 
error_text={}",
-                                     berror(_open_closure->cntl.ErrorCode()),
-                                     _open_closure->cntl.ErrorText());
+                                     berror(error_code), error_text);
     }
     Status status(_open_closure->result.status());
     if (_open_closure->unref()) {
@@ -473,6 +514,37 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
+void VNodeChannel::open_partition(int64_t partition_id) {
+    _timeout_watch.reset();
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    OpenPartitionRequest request;
+    auto load_id = std::make_shared<PUniqueId>(_parent->_load_id);
+    request.set_allocated_id(load_id.get());
+    request.set_index_id(_index_channel->_index_id);
+    for (auto& tablet : _all_tablets) {
+        if (partition_id == tablet.partition_id) {
+            auto ptablet = request.add_tablets();
+            ptablet->set_partition_id(tablet.partition_id);
+            ptablet->set_tablet_id(tablet.tablet_id);
+        }
+    }
+
+    auto open_partition_closure =
+            std::make_unique<OpenPartitionClosure>(this, _index_channel, 
partition_id);
+
+    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time();
+    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
+        remain_ms = config::min_load_rpc_timeout_ms;
+    }
+    open_partition_closure->cntl.set_timeout_ms(remain_ms);
+    _stub->open_partition(&open_partition_closure.get()->cntl, &request,
+                          &open_partition_closure.get()->result, 
open_partition_closure.get());
+
+    _open_partition_closures.insert(std::move(open_partition_closure));
+
+    request.release_id();
+}
+
 Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload, bool is_append) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
@@ -809,6 +881,9 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    for (auto& open_partition_closure : _open_partition_closures) {
+        open_partition_closure->join();
+    }
     // set _is_closed to true finally
     Defer set_closed {[&]() {
         std::lock_guard<std::mutex> l(_closed_lock);
@@ -1063,6 +1138,22 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
+    const auto& id = partition->id;
+    auto it = _opened_partitions.find(id);
+    if (it == _opened_partitions.end()) {
+        _opened_partitions.insert(id);
+        for (int j = 0; j < partition->indexes.size(); ++j) {
+            for (const auto& tid : partition->indexes[j].tablets) {
+                auto it = _channels[j]->_channels_by_tablet.find(tid);
+                for (const auto& channel : it->second) {
+                    channel->open_partition(partition->id);
+                }
+            }
+        }
+    }
+}
+
 void VOlapTableSink::_send_batch_process(RuntimeState* state) {
     SCOPED_TIMER(_non_blocking_send_timer);
     SCOPED_ATTACH_TASK(state);
@@ -1229,6 +1320,11 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         }
         // each row
         _generate_row_distribution_payload(channel_to_payload, partition, 
tablet_index, i, 1);
+        // open partition
+        if (config::enable_lazy_open_partition) {
+            // aysnc open operation,don't block send operation
+            _open_partition(partition);
+        }
     }
     // Random distribution and the block belongs to a single tablet, we could 
optimize to append the whole
     // block into node channel.
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 61d64a2bdb..8300ea3e5f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -85,6 +85,8 @@ class VExprContext;
 
 namespace stream_load {
 
+class OpenPartitionClosure;
+
 // The counter of add_batch rpc of a single node
 struct AddBatchCounter {
     // total execution time of a add_batch rpc
@@ -211,6 +213,8 @@ public:
 
     void open();
 
+    void open_partition(int64_t partition_id);
+
     Status init(RuntimeState* state);
 
     Status open_wait();
@@ -312,6 +316,7 @@ protected:
 
     std::shared_ptr<PBackendService_Stub> _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
+    std::unordered_set<std::unique_ptr<OpenPartitionClosure>> 
_open_partition_closures;
 
     std::vector<TTabletWithPartition> _all_tablets;
     // map from tablet_id to node_id where slave replicas locate in
@@ -495,6 +500,8 @@ private:
                        const VOlapTablePartition** partition, uint32_t& 
tablet_index,
                        bool& stop_processing, bool& is_continue);
 
+    void _open_partition(const VOlapTablePartition* partition);
+
     std::shared_ptr<MemTracker> _mem_tracker;
 
     ObjectPool* _pool;
@@ -599,6 +606,8 @@ private:
     std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
 
     RuntimeState* _state = nullptr;
+
+    std::unordered_set<int64_t> _opened_partitions;
 };
 
 } // namespace stream_load
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index 15ee6b79a2..aa995e671a 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -302,6 +302,14 @@ public:
         status.to_protobuf(response->mutable_status());
     }
 
+    void open_partition(google::protobuf::RpcController* controller,
+                        const OpenPartitionRequest* request, 
OpenPartitionResult* response,
+                        google::protobuf::Closure* done) override {
+        brpc::ClosureGuard done_guard(done);
+        Status status;
+        status.to_protobuf(response->mutable_status());
+    }
+
     void tablet_writer_add_block(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBlockRequest* request,
                                  PTabletWriterAddBlockResult* response,
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 80b33c7f11..1dfeb93e5c 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -1215,6 +1215,12 @@ Metrics: 
{"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * Description: Used to ignore brpc error '[E1011]The server is overcrowded' 
when writing data.
 * Default value: false
 
+#### `enable_lazy_open_partition`
+
+* Type: bool
+* Description: When importing, most partitions may not need to be written, and 
lazy opening can be used to only open the partitions that need to be 
written.When there is mixed deployment in the upgraded version, it needs to be 
set to false.
+* Default value: true
+
 #### `streaming_load_rpc_max_alive_time_sec`
 
 * Description: The lifetime of TabletsChannel. If the channel does not receive 
any data at this time, the channel will be deleted.
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 613900b9ea..c229edcfe0 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1231,6 +1231,12 @@ Metrics: 
{"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
   - 当遇到'[E1011]The server is 
overcrowded'的错误时,可以调整配置项`brpc_socket_max_unwritten_bytes`,但这个配置项不能动态调整。所以可通过设置此项为`true`来临时避免写失败。注意,此配置项只影响写流程,其他的rpc请求依旧会检查是否overcrowded。
 * 默认值:false
 
+#### `enable_lazy_open_partition`
+
+* 类型:bool
+* 
描述:导入时大部分partition可能都不需要写入,可以使用延迟打开的方式只打开需要写入的partition。升级版本出现混合部署的时候,需要设置为false。
+* 默认值:true
+
 #### `streaming_load_rpc_max_alive_time_sec`
 
 * 描述:TabletsChannel 的存活时间。如果此时通道没有收到任何数据, 通道将被删除。
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index f66090a77b..3b905897c8 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -90,6 +90,16 @@ message PTabletWriterOpenResult {
     required PStatus status = 1;
 };
 
+message OpenPartitionRequest {
+    required PUniqueId id = 1;
+    required int64 index_id = 2;
+    repeated PTabletWithPartition tablets = 3;
+}
+
+message OpenPartitionResult {
+    required PStatus status = 1;
+}
+
 // add batch to tablet writer
 message PTabletWriterAddBatchRequest {
     required PUniqueId id = 1;
@@ -640,6 +650,7 @@ service PBackendService {
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
+    rpc open_partition(OpenPartitionRequest) returns (OpenPartitionResult);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_add_block_by_http(PEmptyRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to