This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 563c3f75ffd [feature](move-memtable) share delta writer v2 among sinks 
(#24066)
563c3f75ffd is described below

commit 563c3f75ffd498c08a41cc9aaaef78755bae80d7
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Sep 13 14:39:29 2023 +0800

    [feature](move-memtable) share delta writer v2 among sinks (#24066)
---
 be/src/common/config.cpp                        |   6 +-
 be/src/common/config.h                          |   6 +-
 be/src/runtime/exec_env.cpp                     |   4 +
 be/src/runtime/exec_env.h                       |  13 ++-
 be/src/runtime/exec_env_init.cpp                |   6 +
 be/src/vec/sink/delta_writer_v2_pool.cpp        |  87 ++++++++++++++
 be/src/vec/sink/delta_writer_v2_pool.h          | 111 ++++++++++++++++++
 be/src/vec/sink/load_stream_stub.cpp            |   4 +
 be/src/vec/sink/load_stream_stub.h              |  13 ++-
 be/src/vec/sink/load_stream_stub_pool.cpp       |  56 +++++++++
 be/src/vec/sink/load_stream_stub_pool.h         | 101 ++++++++++++++++
 be/src/vec/sink/vtablet_sink_v2.cpp             | 149 ++++++++++++------------
 be/src/vec/sink/vtablet_sink_v2.h               |  12 +-
 be/test/vec/exec/delta_writer_v2_pool_test.cpp  |  87 ++++++++++++++
 be/test/vec/exec/load_stream_stub_pool_test.cpp |  54 +++++++++
 15 files changed, 619 insertions(+), 90 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index eecc4a11185..7b10bc72123 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -730,7 +730,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, 
"1048576");
 // In most cases, it does not need to be modified.
 DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
 
-// number of brpc stream per OlapTableSinkV2
+// share brpc streams when memtable_on_sink_node = true
+DEFINE_Bool(share_load_streams, "true");
+// share delta writers when memtable_on_sink_node = true
+DEFINE_Bool(share_delta_writers, "true");
+// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = 
true)
 DEFINE_Int32(num_streams_per_sink, "5");
 // timeout for open stream sink rpc in ms
 DEFINE_Int64(open_stream_sink_timeout_ms, "500");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 83b7dbd95fc..eefcbb9b16a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -789,7 +789,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
 // In most cases, it does not need to be modified.
 DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 
-// number of brpc stream per OlapTableSinkV2
+// share brpc streams when memtable_on_sink_node = true
+DECLARE_Bool(share_load_streams);
+// share delta writers when memtable_on_sink_node = true
+DECLARE_Bool(share_delta_writers);
+// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = 
true)
 DECLARE_Int32(num_streams_per_sink);
 // timeout for open stream sink rpc in ms
 DECLARE_Int64(open_stream_sink_timeout_ms);
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 27986f5de31..aab83fba5f0 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -29,9 +29,13 @@
 #include "time.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "vec/sink/delta_writer_v2_pool.h"
+#include "vec/sink/load_stream_stub_pool.h"
 
 namespace doris {
 
+ExecEnv::ExecEnv() = default;
+
 ExecEnv::~ExecEnv() {
     destroy();
 }
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index fd54d441666..d52e7cd3612 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -50,6 +50,10 @@ class TaskScheduler;
 namespace taskgroup {
 class TaskGroupManager;
 }
+namespace stream_load {
+class DeltaWriterV2Pool;
+class LoadStreamStubPool;
+} // namespace stream_load
 namespace io {
 class S3FileBufferPool;
 class FileCacheFactory;
@@ -234,6 +238,11 @@ public:
     vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
     std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
 
+    stream_load::LoadStreamStubPool* load_stream_stub_pool() {
+        return _load_stream_stub_pool.get();
+    }
+    stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return 
_delta_writer_v2_pool.get(); }
+
     void wait_for_all_tasks_done();
 
     void update_frontends(const std::vector<TFrontendInfo>& new_infos);
@@ -257,7 +266,7 @@ public:
     }
 
 private:
-    ExecEnv() = default;
+    ExecEnv();
 
     [[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
     void _destroy();
@@ -334,6 +343,8 @@ private:
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
     std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
+    std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
+    std::unique_ptr<stream_load::DeltaWriterV2Pool> _delta_writer_v2_pool;
 
     std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
     std::shared_mutex _zone_cache_rw_lock;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index bcdbf498013..24234a90d95 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -92,6 +92,8 @@
 #include "util/timezone_utils.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
+#include "vec/sink/delta_writer_v2_pool.h"
+#include "vec/sink/load_stream_stub_pool.h"
 
 #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
         !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
@@ -207,6 +209,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _group_commit_mgr = new GroupCommitMgr(this);
     _file_meta_cache = new 
FileMetaCache(config::max_external_file_meta_cache_num);
     _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
+    _load_stream_stub_pool = 
std::make_unique<stream_load::LoadStreamStubPool>();
+    _delta_writer_v2_pool = std::make_unique<stream_load::DeltaWriterV2Pool>();
 
     _backend_client_cache->init_metrics("backend");
     _frontend_client_cache->init_metrics("frontend");
@@ -543,6 +547,8 @@ void ExecEnv::destroy() {
     _deregister_metrics();
     SAFE_DELETE(_load_channel_mgr);
     _memtable_memory_limiter.reset(nullptr);
+    _load_stream_stub_pool.reset();
+    _delta_writer_v2_pool.reset();
 
     // shared_ptr maybe no need to be reset
     // _brpc_iobuf_block_memory_tracker.reset();
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
new file mode 100644
index 00000000000..c1066174a8c
--- /dev/null
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/delta_writer_v2_pool.h"
+
+#include "olap/delta_writer_v2.h"
+
+namespace doris {
+class TExpr;
+
+namespace stream_load {
+
+DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), 
_use_cnt(1) {}
+
+DeltaWriterV2Map::~DeltaWriterV2Map() = default;
+
+DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id,
+                                               std::function<DeltaWriterV2*()> 
creator) {
+    _map.lazy_emplace(tablet_id, [&](const 
TabletToDeltaWriterV2Map::constructor& ctor) {
+        ctor(tablet_id, creator());
+    });
+    return _map.at(tablet_id).get();
+}
+
+Status DeltaWriterV2Map::close() {
+    if (--_use_cnt > 0) {
+        return Status::OK();
+    }
+    Status status = Status::OK();
+    _map.for_each([&status](auto& entry) {
+        if (status.ok()) {
+            status = entry.second->close();
+        }
+    });
+    if (!status.ok()) {
+        return status;
+    }
+    _map.for_each([&status](auto& entry) {
+        if (status.ok()) {
+            status = entry.second->close_wait();
+        }
+    });
+    return status;
+}
+
+void DeltaWriterV2Map::cancel(Status status) {
+    _map.for_each([&status](auto& entry) { 
entry.second->cancel_with_status(status); });
+}
+
+DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
+
+DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
+
+std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId 
load_id) {
+    UniqueId id {load_id};
+    std::lock_guard<std::mutex> lock(_mutex);
+    std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
+    if (map) {
+        map->grab();
+        return map;
+    }
+    auto deleter = [this](DeltaWriterV2Map* m) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        _pool.erase(m->unique_id());
+        delete m;
+    };
+    map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
+    _pool[id] = map;
+    return map;
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
new file mode 100644
index 00000000000..d0328d7d2b5
--- /dev/null
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <parallel_hashmap/phmap.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class DeltaWriterV2;
+
+namespace stream_load {
+
+class DeltaWriterV2Map {
+public:
+    DeltaWriterV2Map(UniqueId load_id);
+
+    ~DeltaWriterV2Map();
+
+    void grab() { ++_use_cnt; }
+
+    // get or create delta writer for the given tablet, memory is managed by 
DeltaWriterV2Map
+    DeltaWriterV2* get_or_create(int64_t tablet_id, 
std::function<DeltaWriterV2*()> creator);
+
+    // close all delta writers in this DeltaWriterV2Map if there is no other 
users
+    Status close();
+
+    // cancel all delta writers in this DeltaWriterV2Map
+    void cancel(Status status);
+
+    UniqueId unique_id() const { return _load_id; }
+
+    size_t size() const { return _map.size(); }
+
+private:
+    using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
+            int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>, 
std::equal_to<int64_t>,
+            std::allocator<phmap::Pair<const int64_t, 
std::unique_ptr<DeltaWriterV2>>>, 4,
+            std::mutex>;
+
+    UniqueId _load_id;
+    TabletToDeltaWriterV2Map _map;
+    std::atomic<int> _use_cnt;
+};
+
+class DeltaWriterV2Pool {
+public:
+    DeltaWriterV2Pool();
+
+    ~DeltaWriterV2Pool();
+
+    std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
+
+    size_t size() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _pool.size();
+    }
+
+private:
+    std::mutex _mutex;
+    std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 63bea0b6c73..052aa1f256b 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -108,6 +108,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
bool enable_profile) {
+    _num_open++;
     std::unique_lock<bthread::Mutex> lock(_mutex);
     if (_is_init) {
         return Status::OK();
@@ -188,6 +189,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+    if (--_num_open > 0) {
+        return Status::OK();
+    }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 8a1ae79c529..20cf5fc02ae 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -26,6 +26,7 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
+#include <parallel_hashmap/phmap.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -71,8 +72,14 @@ class LoadStreamStub;
 
 struct SegmentStatistics;
 
-using IndexToTabletSchema = std::unordered_map<int64_t, 
std::shared_ptr<TabletSchema>>;
-using IndexToEnableMoW = std::unordered_map<int64_t, bool>;
+using IndexToTabletSchema = phmap::parallel_flat_hash_map<
+        int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, 
std::equal_to<int64_t>,
+        std::allocator<phmap::Pair<const int64_t, 
std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
+
+using IndexToEnableMoW =
+        phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, 
std::equal_to<int64_t>,
+                                      std::allocator<phmap::Pair<const 
int64_t, bool>>, 4,
+                                      std::mutex>;
 
 class LoadStreamStub {
 private:
@@ -175,6 +182,8 @@ protected:
     bthread::Mutex _mutex;
     bthread::ConditionVariable _close_cv;
 
+    std::atomic<int> _num_open;
+
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
     butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
new file mode 100644
index 00000000000..848d038b2f9
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/load_stream_stub_pool.h"
+
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+class TExpr;
+
+namespace stream_load {
+
+LoadStreamStubPool::LoadStreamStubPool() = default;
+
+LoadStreamStubPool::~LoadStreamStubPool() = default;
+std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, 
int64_t src_id,
+                                                           int64_t dst_id) {
+    auto key = std::make_pair(UniqueId(load_id), dst_id);
+    std::lock_guard<std::mutex> lock(_mutex);
+    std::shared_ptr<Streams> streams = _pool[key].lock();
+    if (streams) {
+        return streams;
+    }
+    int32_t num_streams = std::max(1, config::num_streams_per_sink);
+    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id});
+    auto deleter = [this, key](Streams* s) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        _pool.erase(key);
+        _template_stubs.erase(key.first);
+        delete s;
+    };
+    streams = std::shared_ptr<Streams>(new Streams(), deleter);
+    for (int32_t i = 0; i < num_streams; i++) {
+        // copy construct, internal tablet schema map will be shared among all 
stubs
+        streams->emplace_back(new LoadStreamStub {*it->second});
+    }
+    _pool[key] = streams;
+    return streams;
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
new file mode 100644
index 00000000000..ae550340d25
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace doris {
+
+class LoadStreamStub;
+
+namespace stream_load {
+
+using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
+
+class LoadStreamStubPool {
+public:
+    LoadStreamStubPool();
+
+    ~LoadStreamStubPool();
+
+    std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, 
int64_t dst_id);
+
+    size_t size() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _pool.size();
+    }
+
+    // for UT only
+    size_t templates_size() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _template_stubs.size();
+    }
+
+private:
+    std::mutex _mutex;
+    std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> 
_template_stubs;
+    std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>> 
_pool;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index a88f05b88d8..ebf50d222c3 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -30,6 +30,7 @@
 #include <algorithm>
 #include <execution>
 #include <mutex>
+#include <ranges>
 #include <string>
 #include <unordered_map>
 
@@ -54,7 +55,9 @@
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/sink/delta_writer_v2_pool.h"
 #include "vec/sink/load_stream_stub.h"
+#include "vec/sink/load_stream_stub_pool.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 
@@ -153,41 +156,51 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
-    _stream_pool_for_node = std::make_shared<NodeToStreams>();
-    _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
+    if (config::share_delta_writers) {
+        _delta_writer_for_tablet =
+                
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
+    } else {
+        _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
+    }
     _build_tablet_node_mapping();
-    RETURN_IF_ERROR(_init_stream_pools());
+    RETURN_IF_ERROR(_open_streams(state->backend_id()));
 
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_init_stream_pools() {
-    // stub template is for sharing internal schema map among all stubs
-    LoadStreamStub stub_template {_load_id, _sender_id};
-    for (auto& [node_id, _] : _tablets_for_node) {
-        auto node_info = _nodes_info->find_node(node_id);
+Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
+    for (auto& [dst_id, _] : _tablets_for_node) {
+        auto node_info = _nodes_info->find_node(dst_id);
         if (node_info == nullptr) {
-            return Status::InternalError("Unknown node {} in tablet location", 
node_id);
+            return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
+        }
+        std::shared_ptr<Streams> streams;
+        if (config::share_load_streams) {
+            streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                    _load_id, src_id, dst_id);
+        } else {
+            int32_t num_streams = std::max(1, config::num_streams_per_sink);
+            streams = std::make_shared<Streams>();
+            LoadStreamStub template_stub {_load_id, _sender_id};
+            for (int32_t i = 0; i < num_streams; i++) {
+                // copy construct, internal tablet schema map will be shared 
among all stubs
+                streams->emplace_back(new LoadStreamStub {template_stub});
+            }
         }
-        Streams& stream_pool = (*_stream_pool_for_node)[node_id];
-        RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool, 
stub_template));
-    }
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams& 
stream_pool,
-                                           LoadStreamStub& stub_template) {
-    stream_pool.reserve(config::num_streams_per_sink);
-    for (int i = 0; i < config::num_streams_per_sink; ++i) {
-        // internal tablet schema map will be shared among all stubs
-        auto stream = std::make_unique<LoadStreamStub>(stub_template);
         // get tablet schema from each backend only in the 1st stream
-        const std::vector<PTabletID>& tablets_for_schema =
-                i == 0 ? _indexes_from_node[node_info.id] : 
std::vector<PTabletID> {};
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
node_info,
-                                     _txn_id, *_schema, tablets_for_schema,
-                                     _state->enable_profile()));
-        stream_pool.emplace_back(std::move(stream));
+        for (auto& stream : *streams | std::ranges::views::take(1)) {
+            const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
+            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
+                                         *node_info, _txn_id, *_schema, 
tablets_for_schema,
+                                         _state->enable_profile()));
+        }
+        // for the rest streams, open without getting tablet schema
+        for (auto& stream : *streams | std::ranges::views::drop(1)) {
+            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
+                                         *node_info, _txn_id, *_schema, {},
+                                         _state->enable_profile()));
+        }
+        _streams_for_node[dst_id] = streams;
     }
     return Status::OK();
 }
@@ -238,7 +251,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, 
Streams& streams) {
         return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
     }
     for (auto& node_id : location->node_ids) {
-        
streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]);
+        streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
     }
     _stream_index = (_stream_index + 1) % config::num_streams_per_sink;
     return Status::OK();
@@ -310,36 +323,27 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
 Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block,
                                          int64_t tablet_id, const Rows& rows,
                                          const Streams& streams) {
-    DeltaWriterV2* delta_writer = nullptr;
-    {
-        auto it = _delta_writer_for_tablet->find(tablet_id);
-        if (it == _delta_writer_for_tablet->end()) {
-            VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " << 
tablet_id
-                       << ", index id: " << rows.index_id << ")";
-            WriteRequest req;
-            req.partition_id = rows.partition_id;
-            req.index_id = rows.index_id;
-            req.tablet_id = tablet_id;
-            req.txn_id = _txn_id;
-            req.load_id = _load_id;
-            req.tuple_desc = _output_tuple_desc;
-            req.is_high_priority = _is_high_priority;
-            req.table_schema_param = _schema.get();
-            for (auto& index : _schema->indexes()) {
-                if (index->index_id == rows.index_id) {
-                    req.slots = &index->slots;
-                    req.schema_hash = index->schema_hash;
-                    break;
-                }
+    DeltaWriterV2* delta_writer = 
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
+        WriteRequest req;
+        req.partition_id = rows.partition_id;
+        req.index_id = rows.index_id;
+        req.tablet_id = tablet_id;
+        req.txn_id = _txn_id;
+        req.load_id = _load_id;
+        req.tuple_desc = _output_tuple_desc;
+        req.is_high_priority = _is_high_priority;
+        req.table_schema_param = _schema.get();
+        for (auto& index : _schema->indexes()) {
+            if (index->index_id == rows.index_id) {
+                req.slots = &index->slots;
+                req.schema_hash = index->schema_hash;
+                break;
             }
-            DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
-            _delta_writer_for_tablet->emplace(tablet_id, delta_writer);
-        } else {
-            VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << 
tablet_id
-                       << ", index id: " << rows.index_id << ")";
-            delta_writer = it->second.get();
         }
-    }
+        DeltaWriterV2* delta_writer = nullptr;
+        DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
+        return delta_writer;
+    });
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
         
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
@@ -352,12 +356,10 @@ Status 
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
 Status VOlapTableSinkV2::_cancel(Status status) {
     LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
               << ", txn_id=" << _txn_id << ", due to error: " << status;
-
-    if (_delta_writer_for_tablet.use_count() == 1) {
-        std::for_each(std::begin(*_delta_writer_for_tablet), 
std::end(*_delta_writer_for_tablet),
-                      [&status](auto&& entry) { 
entry.second->cancel_with_status(status); });
+    if (_delta_writer_for_tablet) {
+        _delta_writer_for_tablet->cancel(status);
+        _delta_writer_for_tablet.reset();
     }
-    _delta_writer_for_tablet.reset();
     return Status::OK();
 }
 
@@ -382,37 +384,30 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
 
         {
             SCOPED_TIMER(_close_writer_timer);
-            // close all delta writers
-            if (_delta_writer_for_tablet.use_count() == 1) {
-                std::for_each(std::begin(*_delta_writer_for_tablet),
-                              std::end(*_delta_writer_for_tablet),
-                              [](auto&& entry) { entry.second->close(); });
-                std::for_each(std::begin(*_delta_writer_for_tablet),
-                              std::end(*_delta_writer_for_tablet),
-                              [](auto&& entry) { entry.second->close_wait(); 
});
-            }
+            // close all delta writers if this is the last user
+            _delta_writer_for_tablet->close();
             _delta_writer_for_tablet.reset();
         }
 
         {
             // send CLOSE_LOAD to all streams, return ERROR if any
-            for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
-                RETURN_IF_ERROR(_close_load(stream_pool));
+            for (const auto& [_, streams] : _streams_for_node) {
+                RETURN_IF_ERROR(_close_load(*streams));
             }
         }
 
         {
             SCOPED_TIMER(_close_load_timer);
-            for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
-                for (const auto& stream : stream_pool) {
+            for (const auto& [_, streams] : _streams_for_node) {
+                for (const auto& stream : *streams) {
                     stream->close_wait();
                 }
             }
         }
 
         std::vector<TTabletCommitInfo> tablet_commit_infos;
-        for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
-            for (const auto& stream : stream_pool) {
+        for (const auto& [node_id, streams] : _streams_for_node) {
+            for (const auto& stream : *streams) {
                 for (auto tablet_id : stream->success_tablets()) {
                     TTabletCommitInfo commit_info;
                     commit_info.tabletId = tablet_id;
@@ -424,7 +419,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
         state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
                                             
std::make_move_iterator(tablet_commit_infos.begin()),
                                             
std::make_move_iterator(tablet_commit_infos.end()));
-        _stream_pool_for_node.reset();
+        _streams_for_node.clear();
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
         int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 047377f4e28..bf983ba693d 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -80,10 +80,9 @@ namespace stream_load {
 class OlapTableBlockConvertor;
 class OlapTabletFinder;
 class VOlapTableSinkV2;
+class DeltaWriterV2Map;
 
-using DeltaWriterForTablet = std::unordered_map<int64_t, 
std::unique_ptr<DeltaWriterV2>>;
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-using NodeToStreams = std::unordered_map<int64_t, Streams>;
 using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
 using NodePartitionTabletMapping =
         std::unordered_map<int64_t, std::unordered_map<int64_t, 
std::unordered_set<int64_t>>>;
@@ -133,10 +132,7 @@ public:
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
 private:
-    Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
-                             LoadStreamStub& stub_template);
-
-    Status _init_stream_pools();
+    Status _open_streams(int64_t src_id);
 
     void _build_tablet_node_mapping();
 
@@ -215,9 +211,9 @@ private:
     std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::shared_ptr<NodeToStreams> _stream_pool_for_node;
+    std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
     size_t _stream_index = 0;
-    std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
+    std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
 
     std::atomic<int> _pending_streams {0};
 
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
new file mode 100644
index 00000000000..dfc3276ea75
--- /dev/null
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "vec/sink/delta_writer_v2_pool.h"
+
+#include <gtest/gtest.h>
+
+#include "olap/delta_writer_v2.h"
+
+namespace doris {
+
+namespace stream_load {
+
+class DeltaWriterV2PoolTest : public testing::Test {
+public:
+    DeltaWriterV2PoolTest() = default;
+    virtual ~DeltaWriterV2PoolTest() = default;
+};
+
+TEST_F(DeltaWriterV2PoolTest, test_pool) {
+    DeltaWriterV2Pool pool;
+    PUniqueId load_id;
+    load_id.set_hi(1);
+    load_id.set_hi(2);
+    PUniqueId load_id2;
+    load_id2.set_hi(1);
+    load_id2.set_hi(3);
+    auto map = pool.get_or_create(load_id);
+    auto map2 = pool.get_or_create(load_id2);
+    auto map3 = pool.get_or_create(load_id);
+    EXPECT_EQ(2, pool.size());
+    EXPECT_EQ(map, map3);
+    EXPECT_NE(map, map2);
+    map.reset();
+    map2.reset();
+    map3.reset();
+    EXPECT_EQ(0, pool.size());
+}
+
+TEST_F(DeltaWriterV2PoolTest, test_map) {
+    DeltaWriterV2Pool pool;
+    PUniqueId load_id;
+    load_id.set_hi(1);
+    load_id.set_hi(2);
+    auto map = pool.get_or_create(load_id);
+    EXPECT_EQ(1, pool.size());
+    WriteRequest req;
+    auto writer = map->get_or_create(100, [&req]() {
+        RuntimeProfile profile("test");
+        DeltaWriterV2* writer;
+        DeltaWriterV2::open(&req, {}, &writer, &profile);
+        return writer;
+    });
+    auto writer2 = map->get_or_create(101, [&req]() {
+        RuntimeProfile profile("test");
+        DeltaWriterV2* writer;
+        DeltaWriterV2::open(&req, {}, &writer, &profile);
+        return writer;
+    });
+    auto writer3 = map->get_or_create(100, [&req]() {
+        RuntimeProfile profile("test");
+        DeltaWriterV2* writer;
+        DeltaWriterV2::open(&req, {}, &writer, &profile);
+        return writer;
+    });
+    EXPECT_EQ(2, map->size());
+    EXPECT_EQ(writer, writer3);
+    EXPECT_NE(writer, writer2);
+    map.reset();
+    EXPECT_EQ(0, pool.size());
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
new file mode 100644
index 00000000000..f1ccb70beeb
--- /dev/null
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "vec/sink/load_stream_stub_pool.h"
+
+#include <gtest/gtest.h>
+
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+
+namespace stream_load {
+
+class LoadStreamStubPoolTest : public testing::Test {
+public:
+    LoadStreamStubPoolTest() = default;
+    virtual ~LoadStreamStubPoolTest() = default;
+};
+
+TEST_F(LoadStreamStubPoolTest, test) {
+    LoadStreamStubPool pool;
+    int64_t src_id = 100;
+    PUniqueId load_id;
+    load_id.set_hi(1);
+    load_id.set_hi(2);
+    auto streams1 = pool.get_or_create(load_id, src_id, 101);
+    auto streams2 = pool.get_or_create(load_id, src_id, 102);
+    auto streams3 = pool.get_or_create(load_id, src_id, 101);
+    EXPECT_EQ(2, pool.size());
+    EXPECT_EQ(1, pool.templates_size());
+    EXPECT_EQ(streams1, streams3);
+    EXPECT_NE(streams1, streams2);
+    streams1.reset();
+    streams2.reset();
+    streams3.reset();
+    EXPECT_EQ(0, pool.size());
+    EXPECT_EQ(0, pool.templates_size());
+}
+
+} // namespace stream_load
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to