This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c58e46d4f [opt](memory) add BE-level cache for load tablet schemas 
(#63508)
20c58e46d4f is described below

commit 20c58e46d4fc6cea133704ae4b5d4cd547880880
Author: hui lai <[email protected]>
AuthorDate: Mon Jun 8 11:10:50 2026 +0800

    [opt](memory) add BE-level cache for load tablet schemas (#63508)
    
    PR #58476 reuses `TabletColumn` and `TabletIndex` objects in the load
    path, but each load writer can still build and hold its own
    `TabletSchema` object. For wide tables with many tablets or buckets,
    duplicated schema-level metadata, vectors, and lookup maps still consume
    noticeable memory.
    
    This PR adds a BE-level cache for fully built load `TabletSchema`
    objects. `BaseRowsetBuilder` and `DeltaWriterV2` reuse cached schemas
    after all load-specific schema fields are set.
    
    The cache key includes the serialized schema plus fields that are not
    part of `TabletSchemaPB`, such as `table_id`, `db_id`, and
    `auto_increment_column`, to avoid sharing a schema object across
    incompatible load contexts.
    
    ### Test
    
    60 concurrent stream loads, each with 512 tablets:
    
    before optimization:
    <img width="934" height="604" alt="image"
    
src="https://github.com/user-attachments/assets/49eb4e5b-a940-4db5-a440-0940e5ea3ba4";
    />
    
    after optimization(_build_current_tablet_schema basically does not
    consume memory):
    <img width="610" height="568" alt="image"
    
src="https://github.com/user-attachments/assets/67a34e73-5bd1-4fd9-897f-bcd8f4aa2966";
    />
---
 be/src/load/delta_writer/delta_writer_v2.cpp       |  42 ++++---
 be/src/storage/rowset_builder.cpp                  |  38 +++++--
 be/src/storage/tablet/tablet_schema_cache.cpp      |  88 +++++++++++++++
 be/src/storage/tablet/tablet_schema_cache.h        |  17 +++
 be/src/storage/tablet_info.h                       |   2 +-
 .../storage/tablet/tablet_schema_cache_test.cpp    | 125 +++++++++++++++++++++
 6 files changed, 284 insertions(+), 28 deletions(-)

diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp 
b/be/src/load/delta_writer/delta_writer_v2.cpp
index c99c7ebe614..c117e2b3180 100644
--- a/be/src/load/delta_writer/delta_writer_v2.cpp
+++ b/be/src/load/delta_writer/delta_writer_v2.cpp
@@ -52,6 +52,7 @@
 #include "storage/storage_engine.h"
 #include "storage/tablet/tablet_manager.h"
 #include "storage/tablet/tablet_schema.h"
+#include "storage/tablet/tablet_schema_cache.h"
 #include "storage/tablet_info.h"
 #include "util/brpc_client_cache.h"
 #include "util/brpc_closure.h"
@@ -214,28 +215,39 @@ Status DeltaWriterV2::cancel_with_status(const Status& 
st) {
 Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
                                                    const OlapTableSchemaParam* 
table_schema_param,
                                                    const TabletSchema& 
ori_tablet_schema) {
-    _tablet_schema->copy_from(ori_tablet_schema);
     // find the right index id
-    int i = 0;
-    auto indexes = table_schema_param->indexes();
-    for (; i < indexes.size(); i++) {
-        if (indexes[i]->index_id == index_id) {
+    const OlapTableIndexSchema* index_schema = nullptr;
+    for (const auto* schema : table_schema_param->indexes()) {
+        if (schema->index_id == index_id) {
+            index_schema = schema;
             break;
         }
     }
 
-    if (!indexes.empty() && !indexes[i]->columns.empty() &&
-        indexes[i]->columns[0]->unique_id() >= 0) {
-        _tablet_schema->build_current_tablet_schema(
-                index_id, static_cast<int32_t>(table_schema_param->version()), 
indexes[i],
-                ori_tablet_schema);
+    auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
+            index_id, table_schema_param, ori_tablet_schema, index_schema);
+    auto cached_schema = 
TabletSchemaCache::instance()->lookup_schema(cache_key);
+    if (cached_schema.first != nullptr) {
+        _tablet_schema = cached_schema.second;
+        TabletSchemaCache::instance()->release(cached_schema.first);
+    } else {
+        _tablet_schema->copy_from(ori_tablet_schema);
+        if (index_schema != nullptr && !index_schema->columns.empty() &&
+            index_schema->columns[0]->unique_id() >= 0) {
+            _tablet_schema->build_current_tablet_schema(
+                    index_id, 
static_cast<int32_t>(table_schema_param->version()), index_schema,
+                    ori_tablet_schema);
+        }
+        _tablet_schema->set_table_id(table_schema_param->table_id());
+        _tablet_schema->set_db_id(table_schema_param->db_id());
+        if (table_schema_param->is_partial_update()) {
+            
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
+        }
+        auto inserted_schema = 
TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
+        _tablet_schema = inserted_schema.second;
+        TabletSchemaCache::instance()->release(inserted_schema.first);
     }
 
-    _tablet_schema->set_table_id(table_schema_param->table_id());
-    _tablet_schema->set_db_id(table_schema_param->db_id());
-    if (table_schema_param->is_partial_update()) {
-        
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
-    }
     // set partial update columns info
     _partial_update_info = std::make_shared<PartialUpdateInfo>();
     RETURN_IF_ERROR(_partial_update_info->init(
diff --git a/be/src/storage/rowset_builder.cpp 
b/be/src/storage/rowset_builder.cpp
index 2f2e83669a5..1be1d0873dc 100644
--- a/be/src/storage/rowset_builder.cpp
+++ b/be/src/storage/rowset_builder.cpp
@@ -51,6 +51,7 @@
 #include "storage/tablet/tablet_manager.h"
 #include "storage/tablet/tablet_meta.h"
 #include "storage/tablet/tablet_schema.h"
+#include "storage/tablet/tablet_schema_cache.h"
 #include "storage/tablet_info.h"
 #include "storage/txn/txn_manager.h"
 #include "util/brpc_client_cache.h"
@@ -242,6 +243,7 @@ Status RowsetBuilder::init() {
     // build tablet schema in request level
     RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, 
_req.table_schema_param.get(),
                                                  *_tablet->tablet_schema()));
+    context.tablet_schema = _tablet_schema;
 
     context.mow_context = mow_context;
 
@@ -441,14 +443,30 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
         }
     }
 
-    if (index_schema != nullptr && !index_schema->columns.empty() &&
-        index_schema->columns[0]->unique_id() >= 0) {
-        _tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
-        _tablet_schema->build_current_tablet_schema(
-                index_id, cast_set<int32_t>(table_schema_param->version()), 
index_schema,
-                ori_tablet_schema);
+    auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
+            index_id, table_schema_param, ori_tablet_schema, index_schema);
+    auto cached_schema = 
TabletSchemaCache::instance()->lookup_schema(cache_key);
+    if (cached_schema.first != nullptr) {
+        _tablet_schema = cached_schema.second;
+        TabletSchemaCache::instance()->release(cached_schema.first);
     } else {
-        _tablet_schema->copy_from(ori_tablet_schema);
+        if (index_schema != nullptr && !index_schema->columns.empty() &&
+            index_schema->columns[0]->unique_id() >= 0) {
+            _tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
+            _tablet_schema->build_current_tablet_schema(
+                    index_id, 
cast_set<int32_t>(table_schema_param->version()), index_schema,
+                    ori_tablet_schema);
+        } else {
+            _tablet_schema->copy_from(ori_tablet_schema);
+        }
+        _tablet_schema->set_table_id(table_schema_param->table_id());
+        _tablet_schema->set_db_id(table_schema_param->db_id());
+        if (table_schema_param->is_partial_update()) {
+            
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
+        }
+        auto inserted_schema = 
TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
+        _tablet_schema = inserted_schema.second;
+        TabletSchemaCache::instance()->release(inserted_schema.first);
     }
     if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) 
{
         // After schema change, should include extracted column
@@ -468,11 +486,6 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
         }
     }
 
-    _tablet_schema->set_table_id(table_schema_param->table_id());
-    _tablet_schema->set_db_id(table_schema_param->db_id());
-    if (table_schema_param->is_partial_update()) {
-        
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
-    }
     // set partial update columns info
     if (is_data_builder()) {
         _partial_update_info = std::make_shared<PartialUpdateInfo>();
@@ -565,6 +578,7 @@ Status RowBinlogRowsetBuilder::init() {
     RETURN_IF_ERROR(_build_current_tablet_schema(
             _req.index_id, _req.table_schema_param.get(),
             
*std::dynamic_pointer_cast<Tablet>(_tablet)->row_binlog_tablet_schema()));
+    context.tablet_schema = _tablet_schema;
     context.write_binlog_opt().enable = true;
 
     _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
diff --git a/be/src/storage/tablet/tablet_schema_cache.cpp 
b/be/src/storage/tablet/tablet_schema_cache.cpp
index 93fdd7709c7..0d3a242d56f 100644
--- a/be/src/storage/tablet/tablet_schema_cache.cpp
+++ b/be/src/storage/tablet/tablet_schema_cache.cpp
@@ -21,8 +21,13 @@
 #include <glog/logging.h>
 #include <json2pb/pb_to_json.h>
 
+#include <cstdint>
+#include <type_traits>
+#include <utility>
+
 #include "bvar/bvar.h"
 #include "storage/tablet/tablet_schema.h"
+#include "storage/tablet_info.h"
 #include "util/sha.h"
 
 bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
@@ -39,6 +44,17 @@ static std::string get_key_signature(const std::string& 
origin) {
     return std::string {digest.digest().data(), digest.digest().length()};
 }
 
+template <typename T>
+static void append_cache_key_value(std::string* key, T value) {
+    static_assert(std::is_integral_v<T>);
+    key->append(reinterpret_cast<const char*>(&value), sizeof(value));
+}
+
+static void append_cache_key_string(std::string* key, const std::string& 
value) {
+    append_cache_key_value(key, static_cast<uint64_t>(value.size()));
+    key->append(value);
+}
+
 std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const 
std::string& key) {
     std::string key_signature = get_key_signature(key);
     auto* lru_handle = lookup(key_signature);
@@ -64,6 +80,78 @@ std::pair<Cache::Handle*, TabletSchemaSPtr> 
TabletSchemaCache::insert(const std:
     return std::make_pair(lru_handle, tablet_schema_ptr);
 }
 
+std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(
+        const std::string& key, TabletSchemaSPtr tablet_schema) {
+    auto* lru_handle = lookup(key);
+    TabletSchemaSPtr tablet_schema_ptr;
+    if (lru_handle) {
+        auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
+        tablet_schema_ptr = value->tablet_schema;
+        g_tablet_schema_cache_hit_count << 1;
+    } else {
+        DCHECK(tablet_schema != nullptr);
+        auto* value = new CacheValue;
+        tablet_schema_ptr = std::move(tablet_schema);
+        value->tablet_schema = tablet_schema_ptr;
+        lru_handle = LRUCachePolicy::insert(key, value, 
tablet_schema_ptr->num_columns(),
+                                            tablet_schema_ptr->mem_size(), 
CachePriority::NORMAL);
+        g_tablet_schema_cache_count << 1;
+        g_tablet_schema_cache_columns_count << 
tablet_schema_ptr->num_columns();
+    }
+    DCHECK(lru_handle != nullptr);
+    return std::make_pair(lru_handle, tablet_schema_ptr);
+}
+
+std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::lookup_schema(
+        const std::string& key) {
+    auto* lru_handle = lookup(key);
+    if (lru_handle == nullptr) {
+        return {nullptr, nullptr};
+    }
+    auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
+    g_tablet_schema_cache_hit_count << 1;
+    return {lru_handle, value->tablet_schema};
+}
+
+std::string TabletSchemaCache::build_load_schema_cache_key(
+        int64_t index_id, const OlapTableSchemaParam* table_schema_param,
+        const TabletSchema& ori_tablet_schema, const OlapTableIndexSchema* 
index_schema) {
+    DCHECK(table_schema_param != nullptr);
+    std::string cache_key;
+    cache_key.append("load_schema_v2");
+    append_cache_key_value(&cache_key, index_id);
+    append_cache_key_value(&cache_key, table_schema_param->table_id());
+    append_cache_key_value(&cache_key, table_schema_param->db_id());
+    append_cache_key_value(&cache_key, table_schema_param->version());
+
+    TabletSchemaPB ori_schema_pb;
+    ori_tablet_schema.to_schema_pb(&ori_schema_pb);
+    append_cache_key_string(&cache_key,
+                            
TabletSchema::deterministic_string_serialize(ori_schema_pb));
+    if (ori_tablet_schema.num_variant_columns() > 0) {
+        // Variant schemas carry path set info outside TabletSchemaPB, so do 
not share them
+        // across different source TabletSchema objects unless that metadata 
is serialized.
+        append_cache_key_value(&cache_key, 
reinterpret_cast<uintptr_t>(&ori_tablet_schema));
+    }
+
+    std::string auto_increment_column;
+    if (table_schema_param->is_partial_update()) {
+        auto_increment_column = table_schema_param->auto_increment_coulumn();
+    }
+    append_cache_key_string(&cache_key, auto_increment_column);
+
+    const bool has_current_schema = index_schema != nullptr && 
!index_schema->columns.empty() &&
+                                    index_schema->columns[0]->unique_id() >= 0;
+    append_cache_key_value(&cache_key, has_current_schema);
+    if (index_schema != nullptr) {
+        POlapTableIndexSchema index_schema_pb;
+        index_schema->to_protobuf(&index_schema_pb);
+        append_cache_key_string(&cache_key,
+                                
TabletSchema::deterministic_string_serialize(index_schema_pb));
+    }
+    return get_key_signature(cache_key);
+}
+
 void TabletSchemaCache::release(Cache::Handle* lru_handle) {
     LRUCachePolicy::release(lru_handle);
 }
diff --git a/be/src/storage/tablet/tablet_schema_cache.h 
b/be/src/storage/tablet/tablet_schema_cache.h
index 41fa287d6af..fd097dacac8 100644
--- a/be/src/storage/tablet/tablet_schema_cache.h
+++ b/be/src/storage/tablet/tablet_schema_cache.h
@@ -17,12 +17,19 @@
 
 #pragma once
 
+#include <cstdint>
+#include <string>
+#include <utility>
+
 #include "runtime/exec_env.h"
 #include "runtime/memory/lru_cache_policy.h"
 #include "storage/tablet/tablet_fwd.h"
 
 namespace doris {
 
+class OlapTableSchemaParam;
+struct OlapTableIndexSchema;
+
 class TabletSchemaCache : public LRUCachePolicy {
 public:
     using LRUCachePolicy::insert;
@@ -45,6 +52,16 @@ public:
 
     std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key);
 
+    std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key,
+                                                       TabletSchemaSPtr 
tablet_schema);
+
+    std::pair<Cache::Handle*, TabletSchemaSPtr> lookup_schema(const 
std::string& key);
+
+    static std::string build_load_schema_cache_key(int64_t index_id,
+                                                   const OlapTableSchemaParam* 
table_schema_param,
+                                                   const TabletSchema& 
ori_tablet_schema,
+                                                   const OlapTableIndexSchema* 
index_schema);
+
     void release(Cache::Handle*);
 
 private:
diff --git a/be/src/storage/tablet_info.h b/be/src/storage/tablet_info.h
index 70d187ebac9..c0caf8bb49a 100644
--- a/be/src/storage/tablet_info.h
+++ b/be/src/storage/tablet_info.h
@@ -105,7 +105,7 @@ public:
         return _unique_key_update_mode == 
UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
     }
 
-    std::set<std::string> partial_update_input_columns() const {
+    const std::set<std::string>& partial_update_input_columns() const {
         return _partial_update_input_columns;
     }
     PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const {
diff --git a/be/test/storage/tablet/tablet_schema_cache_test.cpp 
b/be/test/storage/tablet/tablet_schema_cache_test.cpp
new file mode 100644
index 00000000000..76d122d6b51
--- /dev/null
+++ b/be/test/storage/tablet/tablet_schema_cache_test.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/tablet/tablet_schema_cache.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/tablet/tablet_schema.h"
+#include "storage/tablet_info.h"
+
+namespace doris {
+
+static TabletColumn create_test_column(int32_t unique_id, const std::string& 
name,
+                                       bool is_nullable = false) {
+    TabletColumn column;
+    column.set_unique_id(unique_id);
+    column.set_name(name);
+    column.set_type(FieldType::OLAP_FIELD_TYPE_INT);
+    column.set_is_key(true);
+    column.set_is_nullable(is_nullable);
+    column.set_length(4);
+    column.set_index_length(4);
+    return column;
+}
+
+static void set_schema_param(OlapTableSchemaParam* param, int64_t table_id = 
20, int64_t db_id = 10,
+                             int64_t version = 3) {
+    POlapTableSchemaParam pschema;
+    pschema.set_db_id(db_id);
+    pschema.set_table_id(table_id);
+    pschema.set_version(version);
+    Status st = param->init(pschema);
+    ASSERT_TRUE(st.ok()) << st;
+}
+
+TEST(TabletSchemaCacheTest, LoadSchemaCacheKeyUsesFullIndexSchema) {
+    TabletSchema ori_schema;
+    ori_schema.set_schema_version(1);
+    ori_schema.append_column(create_test_column(1, "k1"));
+
+    OlapTableSchemaParam param;
+    set_schema_param(&param);
+    TabletColumn column1 = create_test_column(1, "k1");
+    TabletColumn column2 = create_test_column(1, "k1_changed");
+
+    OlapTableIndexSchema index_schema1;
+    index_schema1.index_id = 100;
+    index_schema1.schema_hash = 200;
+    index_schema1.columns.push_back(&column1);
+
+    OlapTableIndexSchema index_schema2;
+    index_schema2.index_id = 100;
+    index_schema2.schema_hash = 200;
+    index_schema2.columns.push_back(&column2);
+
+    auto key1 =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, &index_schema1);
+    auto key2 =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, &index_schema2);
+    EXPECT_NE(key1, key2);
+
+    auto same_key =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, &index_schema1);
+    EXPECT_EQ(key1, same_key);
+
+    auto fallback_key =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, nullptr);
+    EXPECT_NE(key1, fallback_key);
+    auto same_fallback_key =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, nullptr);
+    EXPECT_EQ(fallback_key, same_fallback_key);
+
+    OlapTableSchemaParam other_table_param;
+    set_schema_param(&other_table_param, 21);
+    auto other_table_key = TabletSchemaCache::build_load_schema_cache_key(
+            100, &other_table_param, ori_schema, &index_schema1);
+    EXPECT_NE(key1, other_table_key);
+}
+
+TEST(TabletSchemaCacheTest, InsertAndLookupLoadSchema) {
+    TabletSchema ori_schema;
+    ori_schema.set_schema_version(1);
+    ori_schema.append_column(create_test_column(1, "k1"));
+
+    OlapTableSchemaParam param;
+    set_schema_param(&param, 30);
+    TabletColumn column = create_test_column(1, "k1");
+    OlapTableIndexSchema index_schema;
+    index_schema.index_id = 100;
+    index_schema.schema_hash = 200;
+    index_schema.columns.push_back(&column);
+
+    auto cache_key =
+            TabletSchemaCache::build_load_schema_cache_key(100, &param, 
ori_schema, &index_schema);
+    auto tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->copy_from(ori_schema);
+
+    auto inserted = TabletSchemaCache::instance()->insert(cache_key, 
tablet_schema);
+    EXPECT_EQ(tablet_schema.get(), inserted.second.get());
+    TabletSchemaCache::instance()->release(inserted.first);
+
+    auto cached = TabletSchemaCache::instance()->lookup_schema(cache_key);
+    ASSERT_NE(nullptr, cached.first);
+    EXPECT_EQ(tablet_schema.get(), cached.second.get());
+    TabletSchemaCache::instance()->release(cached.first);
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to