This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20c58e46d4f [opt](memory) add BE-level cache for load tablet schemas
(#63508)
20c58e46d4f is described below
commit 20c58e46d4fc6cea133704ae4b5d4cd547880880
Author: hui lai <[email protected]>
AuthorDate: Mon Jun 8 11:10:50 2026 +0800
[opt](memory) add BE-level cache for load tablet schemas (#63508)
PR #58476 reuses `TabletColumn` and `TabletIndex` objects in the load
path, but each load writer can still build and hold its own
`TabletSchema` object. For wide tables with many tablets or buckets,
duplicated schema-level metadata, vectors, and lookup maps still consume
noticeable memory.
This PR adds a BE-level cache for fully built load `TabletSchema`
objects. `BaseRowsetBuilder` and `DeltaWriterV2` reuse cached schemas
after all load-specific schema fields are set.
The cache key includes the serialized schema plus fields that are not
part of `TabletSchemaPB`, such as `table_id`, `db_id`, and
`auto_increment_column`, to avoid sharing a schema object across
incompatible load contexts.
### Test
60 concurrent stream loads, each with 512 tablets:
before optimization:
<img width="934" height="604" alt="image"
src="https://github.com/user-attachments/assets/49eb4e5b-a940-4db5-a440-0940e5ea3ba4"
/>
after optimization(_build_current_tablet_schema basically does not
consume memory):
<img width="610" height="568" alt="image"
src="https://github.com/user-attachments/assets/67a34e73-5bd1-4fd9-897f-bcd8f4aa2966"
/>
---
be/src/load/delta_writer/delta_writer_v2.cpp | 42 ++++---
be/src/storage/rowset_builder.cpp | 38 +++++--
be/src/storage/tablet/tablet_schema_cache.cpp | 88 +++++++++++++++
be/src/storage/tablet/tablet_schema_cache.h | 17 +++
be/src/storage/tablet_info.h | 2 +-
.../storage/tablet/tablet_schema_cache_test.cpp | 125 +++++++++++++++++++++
6 files changed, 284 insertions(+), 28 deletions(-)
diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp
b/be/src/load/delta_writer/delta_writer_v2.cpp
index c99c7ebe614..c117e2b3180 100644
--- a/be/src/load/delta_writer/delta_writer_v2.cpp
+++ b/be/src/load/delta_writer/delta_writer_v2.cpp
@@ -52,6 +52,7 @@
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_manager.h"
#include "storage/tablet/tablet_schema.h"
+#include "storage/tablet/tablet_schema_cache.h"
#include "storage/tablet_info.h"
#include "util/brpc_client_cache.h"
#include "util/brpc_closure.h"
@@ -214,28 +215,39 @@ Status DeltaWriterV2::cancel_with_status(const Status&
st) {
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
const TabletSchema&
ori_tablet_schema) {
- _tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
- int i = 0;
- auto indexes = table_schema_param->indexes();
- for (; i < indexes.size(); i++) {
- if (indexes[i]->index_id == index_id) {
+ const OlapTableIndexSchema* index_schema = nullptr;
+ for (const auto* schema : table_schema_param->indexes()) {
+ if (schema->index_id == index_id) {
+ index_schema = schema;
break;
}
}
- if (!indexes.empty() && !indexes[i]->columns.empty() &&
- indexes[i]->columns[0]->unique_id() >= 0) {
- _tablet_schema->build_current_tablet_schema(
- index_id, static_cast<int32_t>(table_schema_param->version()),
indexes[i],
- ori_tablet_schema);
+ auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
+ index_id, table_schema_param, ori_tablet_schema, index_schema);
+ auto cached_schema =
TabletSchemaCache::instance()->lookup_schema(cache_key);
+ if (cached_schema.first != nullptr) {
+ _tablet_schema = cached_schema.second;
+ TabletSchemaCache::instance()->release(cached_schema.first);
+ } else {
+ _tablet_schema->copy_from(ori_tablet_schema);
+ if (index_schema != nullptr && !index_schema->columns.empty() &&
+ index_schema->columns[0]->unique_id() >= 0) {
+ _tablet_schema->build_current_tablet_schema(
+ index_id,
static_cast<int32_t>(table_schema_param->version()), index_schema,
+ ori_tablet_schema);
+ }
+ _tablet_schema->set_table_id(table_schema_param->table_id());
+ _tablet_schema->set_db_id(table_schema_param->db_id());
+ if (table_schema_param->is_partial_update()) {
+
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
+ }
+ auto inserted_schema =
TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
+ _tablet_schema = inserted_schema.second;
+ TabletSchemaCache::instance()->release(inserted_schema.first);
}
- _tablet_schema->set_table_id(table_schema_param->table_id());
- _tablet_schema->set_db_id(table_schema_param->db_id());
- if (table_schema_param->is_partial_update()) {
-
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
- }
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
RETURN_IF_ERROR(_partial_update_info->init(
diff --git a/be/src/storage/rowset_builder.cpp
b/be/src/storage/rowset_builder.cpp
index 2f2e83669a5..1be1d0873dc 100644
--- a/be/src/storage/rowset_builder.cpp
+++ b/be/src/storage/rowset_builder.cpp
@@ -51,6 +51,7 @@
#include "storage/tablet/tablet_manager.h"
#include "storage/tablet/tablet_meta.h"
#include "storage/tablet/tablet_schema.h"
+#include "storage/tablet/tablet_schema_cache.h"
#include "storage/tablet_info.h"
#include "storage/txn/txn_manager.h"
#include "util/brpc_client_cache.h"
@@ -242,6 +243,7 @@ Status RowsetBuilder::init() {
// build tablet schema in request level
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id,
_req.table_schema_param.get(),
*_tablet->tablet_schema()));
+ context.tablet_schema = _tablet_schema;
context.mow_context = mow_context;
@@ -441,14 +443,30 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
}
}
- if (index_schema != nullptr && !index_schema->columns.empty() &&
- index_schema->columns[0]->unique_id() >= 0) {
- _tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
- _tablet_schema->build_current_tablet_schema(
- index_id, cast_set<int32_t>(table_schema_param->version()),
index_schema,
- ori_tablet_schema);
+ auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
+ index_id, table_schema_param, ori_tablet_schema, index_schema);
+ auto cached_schema =
TabletSchemaCache::instance()->lookup_schema(cache_key);
+ if (cached_schema.first != nullptr) {
+ _tablet_schema = cached_schema.second;
+ TabletSchemaCache::instance()->release(cached_schema.first);
} else {
- _tablet_schema->copy_from(ori_tablet_schema);
+ if (index_schema != nullptr && !index_schema->columns.empty() &&
+ index_schema->columns[0]->unique_id() >= 0) {
+ _tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
+ _tablet_schema->build_current_tablet_schema(
+ index_id,
cast_set<int32_t>(table_schema_param->version()), index_schema,
+ ori_tablet_schema);
+ } else {
+ _tablet_schema->copy_from(ori_tablet_schema);
+ }
+ _tablet_schema->set_table_id(table_schema_param->table_id());
+ _tablet_schema->set_db_id(table_schema_param->db_id());
+ if (table_schema_param->is_partial_update()) {
+
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
+ }
+ auto inserted_schema =
TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
+ _tablet_schema = inserted_schema.second;
+ TabletSchemaCache::instance()->release(inserted_schema.first);
}
if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version())
{
// After schema change, should include extracted column
@@ -468,11 +486,6 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
}
}
- _tablet_schema->set_table_id(table_schema_param->table_id());
- _tablet_schema->set_db_id(table_schema_param->db_id());
- if (table_schema_param->is_partial_update()) {
-
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
- }
// set partial update columns info
if (is_data_builder()) {
_partial_update_info = std::make_shared<PartialUpdateInfo>();
@@ -565,6 +578,7 @@ Status RowBinlogRowsetBuilder::init() {
RETURN_IF_ERROR(_build_current_tablet_schema(
_req.index_id, _req.table_schema_param.get(),
*std::dynamic_pointer_cast<Tablet>(_tablet)->row_binlog_tablet_schema()));
+ context.tablet_schema = _tablet_schema;
context.write_binlog_opt().enable = true;
_rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
diff --git a/be/src/storage/tablet/tablet_schema_cache.cpp
b/be/src/storage/tablet/tablet_schema_cache.cpp
index 93fdd7709c7..0d3a242d56f 100644
--- a/be/src/storage/tablet/tablet_schema_cache.cpp
+++ b/be/src/storage/tablet/tablet_schema_cache.cpp
@@ -21,8 +21,13 @@
#include <glog/logging.h>
#include <json2pb/pb_to_json.h>
+#include <cstdint>
+#include <type_traits>
+#include <utility>
+
#include "bvar/bvar.h"
#include "storage/tablet/tablet_schema.h"
+#include "storage/tablet_info.h"
#include "util/sha.h"
bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
@@ -39,6 +44,17 @@ static std::string get_key_signature(const std::string&
origin) {
return std::string {digest.digest().data(), digest.digest().length()};
}
+template <typename T>
+static void append_cache_key_value(std::string* key, T value) {
+ static_assert(std::is_integral_v<T>);
+ key->append(reinterpret_cast<const char*>(&value), sizeof(value));
+}
+
+static void append_cache_key_string(std::string* key, const std::string&
value) {
+ append_cache_key_value(key, static_cast<uint64_t>(value.size()));
+ key->append(value);
+}
+
std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const
std::string& key) {
std::string key_signature = get_key_signature(key);
auto* lru_handle = lookup(key_signature);
@@ -64,6 +80,78 @@ std::pair<Cache::Handle*, TabletSchemaSPtr>
TabletSchemaCache::insert(const std:
return std::make_pair(lru_handle, tablet_schema_ptr);
}
+std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(
+ const std::string& key, TabletSchemaSPtr tablet_schema) {
+ auto* lru_handle = lookup(key);
+ TabletSchemaSPtr tablet_schema_ptr;
+ if (lru_handle) {
+ auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
+ tablet_schema_ptr = value->tablet_schema;
+ g_tablet_schema_cache_hit_count << 1;
+ } else {
+ DCHECK(tablet_schema != nullptr);
+ auto* value = new CacheValue;
+ tablet_schema_ptr = std::move(tablet_schema);
+ value->tablet_schema = tablet_schema_ptr;
+ lru_handle = LRUCachePolicy::insert(key, value,
tablet_schema_ptr->num_columns(),
+ tablet_schema_ptr->mem_size(),
CachePriority::NORMAL);
+ g_tablet_schema_cache_count << 1;
+ g_tablet_schema_cache_columns_count <<
tablet_schema_ptr->num_columns();
+ }
+ DCHECK(lru_handle != nullptr);
+ return std::make_pair(lru_handle, tablet_schema_ptr);
+}
+
+std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::lookup_schema(
+ const std::string& key) {
+ auto* lru_handle = lookup(key);
+ if (lru_handle == nullptr) {
+ return {nullptr, nullptr};
+ }
+ auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
+ g_tablet_schema_cache_hit_count << 1;
+ return {lru_handle, value->tablet_schema};
+}
+
+std::string TabletSchemaCache::build_load_schema_cache_key(
+ int64_t index_id, const OlapTableSchemaParam* table_schema_param,
+ const TabletSchema& ori_tablet_schema, const OlapTableIndexSchema*
index_schema) {
+ DCHECK(table_schema_param != nullptr);
+ std::string cache_key;
+ cache_key.append("load_schema_v2");
+ append_cache_key_value(&cache_key, index_id);
+ append_cache_key_value(&cache_key, table_schema_param->table_id());
+ append_cache_key_value(&cache_key, table_schema_param->db_id());
+ append_cache_key_value(&cache_key, table_schema_param->version());
+
+ TabletSchemaPB ori_schema_pb;
+ ori_tablet_schema.to_schema_pb(&ori_schema_pb);
+ append_cache_key_string(&cache_key,
+
TabletSchema::deterministic_string_serialize(ori_schema_pb));
+ if (ori_tablet_schema.num_variant_columns() > 0) {
+ // Variant schemas carry path set info outside TabletSchemaPB, so do
not share them
+ // across different source TabletSchema objects unless that metadata
is serialized.
+ append_cache_key_value(&cache_key,
reinterpret_cast<uintptr_t>(&ori_tablet_schema));
+ }
+
+ std::string auto_increment_column;
+ if (table_schema_param->is_partial_update()) {
+ auto_increment_column = table_schema_param->auto_increment_coulumn();
+ }
+ append_cache_key_string(&cache_key, auto_increment_column);
+
+ const bool has_current_schema = index_schema != nullptr &&
!index_schema->columns.empty() &&
+ index_schema->columns[0]->unique_id() >= 0;
+ append_cache_key_value(&cache_key, has_current_schema);
+ if (index_schema != nullptr) {
+ POlapTableIndexSchema index_schema_pb;
+ index_schema->to_protobuf(&index_schema_pb);
+ append_cache_key_string(&cache_key,
+
TabletSchema::deterministic_string_serialize(index_schema_pb));
+ }
+ return get_key_signature(cache_key);
+}
+
void TabletSchemaCache::release(Cache::Handle* lru_handle) {
LRUCachePolicy::release(lru_handle);
}
diff --git a/be/src/storage/tablet/tablet_schema_cache.h
b/be/src/storage/tablet/tablet_schema_cache.h
index 41fa287d6af..fd097dacac8 100644
--- a/be/src/storage/tablet/tablet_schema_cache.h
+++ b/be/src/storage/tablet/tablet_schema_cache.h
@@ -17,12 +17,19 @@
#pragma once
+#include <cstdint>
+#include <string>
+#include <utility>
+
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"
#include "storage/tablet/tablet_fwd.h"
namespace doris {
+class OlapTableSchemaParam;
+struct OlapTableIndexSchema;
+
class TabletSchemaCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
@@ -45,6 +52,16 @@ public:
std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key);
+ std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key,
+ TabletSchemaSPtr
tablet_schema);
+
+ std::pair<Cache::Handle*, TabletSchemaSPtr> lookup_schema(const
std::string& key);
+
+ static std::string build_load_schema_cache_key(int64_t index_id,
+ const OlapTableSchemaParam*
table_schema_param,
+ const TabletSchema&
ori_tablet_schema,
+ const OlapTableIndexSchema*
index_schema);
+
void release(Cache::Handle*);
private:
diff --git a/be/src/storage/tablet_info.h b/be/src/storage/tablet_info.h
index 70d187ebac9..c0caf8bb49a 100644
--- a/be/src/storage/tablet_info.h
+++ b/be/src/storage/tablet_info.h
@@ -105,7 +105,7 @@ public:
return _unique_key_update_mode ==
UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}
- std::set<std::string> partial_update_input_columns() const {
+ const std::set<std::string>& partial_update_input_columns() const {
return _partial_update_input_columns;
}
PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const {
diff --git a/be/test/storage/tablet/tablet_schema_cache_test.cpp
b/be/test/storage/tablet/tablet_schema_cache_test.cpp
new file mode 100644
index 00000000000..76d122d6b51
--- /dev/null
+++ b/be/test/storage/tablet/tablet_schema_cache_test.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/tablet/tablet_schema_cache.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/tablet/tablet_schema.h"
+#include "storage/tablet_info.h"
+
+namespace doris {
+
+static TabletColumn create_test_column(int32_t unique_id, const std::string&
name,
+ bool is_nullable = false) {
+ TabletColumn column;
+ column.set_unique_id(unique_id);
+ column.set_name(name);
+ column.set_type(FieldType::OLAP_FIELD_TYPE_INT);
+ column.set_is_key(true);
+ column.set_is_nullable(is_nullable);
+ column.set_length(4);
+ column.set_index_length(4);
+ return column;
+}
+
+static void set_schema_param(OlapTableSchemaParam* param, int64_t table_id =
20, int64_t db_id = 10,
+ int64_t version = 3) {
+ POlapTableSchemaParam pschema;
+ pschema.set_db_id(db_id);
+ pschema.set_table_id(table_id);
+ pschema.set_version(version);
+ Status st = param->init(pschema);
+ ASSERT_TRUE(st.ok()) << st;
+}
+
+TEST(TabletSchemaCacheTest, LoadSchemaCacheKeyUsesFullIndexSchema) {
+ TabletSchema ori_schema;
+ ori_schema.set_schema_version(1);
+ ori_schema.append_column(create_test_column(1, "k1"));
+
+ OlapTableSchemaParam param;
+ set_schema_param(¶m);
+ TabletColumn column1 = create_test_column(1, "k1");
+ TabletColumn column2 = create_test_column(1, "k1_changed");
+
+ OlapTableIndexSchema index_schema1;
+ index_schema1.index_id = 100;
+ index_schema1.schema_hash = 200;
+ index_schema1.columns.push_back(&column1);
+
+ OlapTableIndexSchema index_schema2;
+ index_schema2.index_id = 100;
+ index_schema2.schema_hash = 200;
+ index_schema2.columns.push_back(&column2);
+
+ auto key1 =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, &index_schema1);
+ auto key2 =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, &index_schema2);
+ EXPECT_NE(key1, key2);
+
+ auto same_key =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, &index_schema1);
+ EXPECT_EQ(key1, same_key);
+
+ auto fallback_key =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, nullptr);
+ EXPECT_NE(key1, fallback_key);
+ auto same_fallback_key =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, nullptr);
+ EXPECT_EQ(fallback_key, same_fallback_key);
+
+ OlapTableSchemaParam other_table_param;
+ set_schema_param(&other_table_param, 21);
+ auto other_table_key = TabletSchemaCache::build_load_schema_cache_key(
+ 100, &other_table_param, ori_schema, &index_schema1);
+ EXPECT_NE(key1, other_table_key);
+}
+
+TEST(TabletSchemaCacheTest, InsertAndLookupLoadSchema) {
+ TabletSchema ori_schema;
+ ori_schema.set_schema_version(1);
+ ori_schema.append_column(create_test_column(1, "k1"));
+
+ OlapTableSchemaParam param;
+ set_schema_param(¶m, 30);
+ TabletColumn column = create_test_column(1, "k1");
+ OlapTableIndexSchema index_schema;
+ index_schema.index_id = 100;
+ index_schema.schema_hash = 200;
+ index_schema.columns.push_back(&column);
+
+ auto cache_key =
+ TabletSchemaCache::build_load_schema_cache_key(100, ¶m,
ori_schema, &index_schema);
+ auto tablet_schema = std::make_shared<TabletSchema>();
+ tablet_schema->copy_from(ori_schema);
+
+ auto inserted = TabletSchemaCache::instance()->insert(cache_key,
tablet_schema);
+ EXPECT_EQ(tablet_schema.get(), inserted.second.get());
+ TabletSchemaCache::instance()->release(inserted.first);
+
+ auto cached = TabletSchemaCache::instance()->lookup_schema(cache_key);
+ ASSERT_NE(nullptr, cached.first);
+ EXPECT_EQ(tablet_schema.get(), cached.second.get());
+ TabletSchemaCache::instance()->release(cached.first);
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]