This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0-preview by this
push:
new 68bee442d26 [cloud](Variant) Optimize FDB kv size for Variant rowsets
and update schema on compaction (#33298)
68bee442d26 is described below
commit 68bee442d262d10ca9b86a9d1764c39c94ab6d11
Author: lihangyu <[email protected]>
AuthorDate: Fri Apr 12 10:32:41 2024 +0800
[cloud](Variant) Optimize FDB kv size for Variant rowsets and update schema
on compaction (#33298)
Currently, Variant writes an `fdb kv` for each `rowset`'s `TabletSchema`
upon `commit_rowset`. This approach is due to the schema's granularity being at
the rowset level, with each rowset potentially having a unique schema. However,
when the number of rowsets exceeds 1000, `get_rowset` operations may face FDB
transaction timeouts due to large kv sizes (exceeding 100K), leading to read
times over 10ms. To address this, there's a need to optimize the kv size to
prevent timeouts, targeti [...]
The compaction process does not update the schema, potentially merging
multiple rowset schemas into a new one without accurately reflecting the
changes. This results in inconsistencies and incorrect schema representations
in the output rowset.
To reduce the space occupied by `TabletSchema`, which is largely consumed
by the `column` and `index` fields, a dictionary compression method is
proposed. By encoding these fields into dictionary keys, we can significantly
decrease the storage space required. This involves modifications to the
`RowsetMetaCloudPB` and the introduction of a `SchemaCloudDictionary` to manage
the dictionary keys and ensure efficient storage utilization.
```
message RowsetMetaCloudPB {
...
repeated int32 column_dict_key_list = 26;
...
}
message SchemaCloudDictionary {
map<int32, ColumnPB> column_dict = 1;
optional int64 current_column_index = 2;
map<int32, TabletIndexPB> index_dict = 3;
optional int64 current_index_index = 2;
}
```
---
be/src/olap/rowset/segment_v2/segment.cpp | 4 +-
be/src/olap/tablet_schema.cpp | 4 +
cloud/src/common/config.h | 3 +
cloud/src/meta-service/keys.cpp | 19 +-
cloud/src/meta-service/keys.h | 11 +-
cloud/src/meta-service/meta_service.cpp | 31 ++-
cloud/src/meta-service/meta_service_schema.cpp | 254 +++++++++++++++++++++
cloud/src/meta-service/meta_service_schema.h | 9 +
cloud/src/recycler/recycler.cpp | 38 +--
cloud/test/keys_test.cpp | 30 +++
cloud/test/schema_kv_test.cpp | 65 +++++-
gensrc/proto/olap_file.proto | 25 ++
.../data/variant_p0/concurrent_insert.out | 103 +++++++++
.../suites/variant_github_events_p0/load.groovy | 31 +++
.../suites/variant_p0/complexjson.groovy | 2 +-
.../suites/variant_p0/concurrent_insert.groovy | 54 +++++
16 files changed, 612 insertions(+), 71 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 9a41171bdd8..9d7b573ca98 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -416,7 +416,9 @@ Status Segment::_create_column_readers(const
SegmentFooterPB& footer) {
if (!column.has_path_info()) {
continue;
}
- auto iter =
column_path_to_footer_ordinal.find(*column.path_info_ptr());
+ auto path = column.has_path_info() ? *column.path_info_ptr()
+ :
vectorized::PathInData(column.name_lower_case());
+ auto iter = column_path_to_footer_ordinal.find(path);
if (iter == column_path_to_footer_ordinal.end()) {
continue;
}
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 2197d78c21d..33e18986b10 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -560,6 +560,10 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
_column_path->from_protobuf(column.column_path_info());
_parent_col_unique_id =
column.column_path_info().parrent_column_unique_id();
}
+ if (is_variant_type() && !column.has_column_path_info()) {
+ // set path info for variant root column, to prevent from missing
+ _column_path =
std::make_shared<vectorized::PathInData>(_col_name_lower_case);
+ }
for (auto& column_pb : column.sparse_columns()) {
TabletColumn column;
column.init_from_pb(column_pb);
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 5640aef2cf9..280faf9014c 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -125,6 +125,9 @@ CONF_mBool(snapshot_get_tablet_stats, "true");
// Value codec version
CONF_mInt16(meta_schema_value_version, "1");
+// Limit kv size of Schema SchemaDictKeyList, default 10MB
+CONF_mInt32(schema_dict_kv_size_limit, "10485760");
+
// For instance check interval
CONF_Int64(reserved_buffer_days, "3");
diff --git a/cloud/src/meta-service/keys.cpp b/cloud/src/meta-service/keys.cpp
index 6ad60d8aeac..01b4b990652 100644
--- a/cloud/src/meta-service/keys.cpp
+++ b/cloud/src/meta-service/keys.cpp
@@ -47,10 +47,10 @@ namespace doris::cloud {
[[maybe_unused]] static const char* META_KEY_INFIX_TABLET = "tablet";
[[maybe_unused]] static const char* META_KEY_INFIX_TABLET_IDX = "tablet_index";
[[maybe_unused]] static const char* META_KEY_INFIX_SCHEMA = "schema";
-[[maybe_unused]] static const char* META_KEY_INFIX_ROWSET_SCHEMA =
"rowset_schema";
[[maybe_unused]] static const char* META_KEY_INFIX_DELETE_BITMAP =
"delete_bitmap";
[[maybe_unused]] static const char* META_KEY_INFIX_DELETE_BITMAP_LOCK =
"delete_bitmap_lock";
[[maybe_unused]] static const char* META_KEY_INFIX_DELETE_BITMAP_PENDING =
"delete_bitmap_pending";
+[[maybe_unused]] static const char* META_KEY_SCHEMA_PB_DICTIONARY =
"tablet_schema_pb_dict";
[[maybe_unused]] static const char* RECYCLE_KEY_INFIX_INDEX = "index";
[[maybe_unused]] static const char* RECYCLE_KEY_INFIX_PART = "partition";
@@ -115,7 +115,7 @@ static void encode_prefix(const T& t, std::string* key) {
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo,
RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
- CopyJobKeyInfo, CopyFileKeyInfo, MetaRowsetSchemaKeyInfo,
StorageVaultKeyInfo>);
+ CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo,
MetaSchemaPBDictionaryInfo>);
key->push_back(CLOUD_USER_KEY_SPACE01);
// Prefixes for key families
@@ -131,7 +131,7 @@ static void encode_prefix(const T& t, std::string* key) {
|| std::is_same_v<T, MetaTabletKeyInfo>
|| std::is_same_v<T, MetaTabletIdxKeyInfo>
|| std::is_same_v<T, MetaSchemaKeyInfo>
- || std::is_same_v<T, MetaRowsetSchemaKeyInfo>
+ || std::is_same_v<T, MetaSchemaPBDictionaryInfo>
|| std::is_same_v<T, MetaDeleteBitmapInfo>
|| std::is_same_v<T, MetaDeleteBitmapUpdateLockInfo>
|| std::is_same_v<T, MetaPendingDeleteBitmapInfo>) {
@@ -282,13 +282,6 @@ void meta_schema_key(const MetaSchemaKeyInfo& in,
std::string* out) {
encode_int64(std::get<2>(in), out); // schema_version
}
-void meta_rowset_schema_key(const MetaRowsetSchemaKeyInfo& in, std::string*
out) {
- encode_prefix(in, out); // 0x01 "meta"
${instance_id}
- encode_bytes(META_KEY_INFIX_ROWSET_SCHEMA, out); // "rowset_schema"
- encode_int64(std::get<1>(in), out); // tablet_id
- encode_bytes(std::get<2>(in), out); // rowset_id
-}
-
void meta_delete_bitmap_key(const MetaDeleteBitmapInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "meta"
${instance_id}
encode_bytes(META_KEY_INFIX_DELETE_BITMAP, out); // "delete_bitmap"
@@ -312,6 +305,12 @@ void meta_pending_delete_bitmap_key(const
MetaPendingDeleteBitmapInfo& in, std::
encode_int64(std::get<1>(in), out); // table_id
}
+void meta_schema_pb_dictionary_key(const MetaSchemaPBDictionaryInfo& in,
std::string* out) {
+ encode_prefix(in, out); // 0x01 "meta"
${instance_id}
+ encode_bytes(META_KEY_SCHEMA_PB_DICTIONARY, out); //
"tablet_schema_pb_dict"
+ encode_int64(std::get<1>(in), out); // index_id
+}
+
//==============================================================================
// Recycle keys
//==============================================================================
diff --git a/cloud/src/meta-service/keys.h b/cloud/src/meta-service/keys.h
index 483332a133c..21cf2208de1 100644
--- a/cloud/src/meta-service/keys.h
+++ b/cloud/src/meta-service/keys.h
@@ -44,6 +44,7 @@
// 0x01 "meta" ${instance_id} "delete_bitmap_lock" ${table_id} ${partition_id}
-> DeleteBitmapUpdateLockPB
// 0x01 "meta" ${instance_id} "delete_bitmap_pending" ${table_id}
-> PendingDeleteBitmapPB
// 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id}
${version} ${segment_id} -> roaringbitmap
+// 0x01 "meta" ${instance_id} "tablet_schema_pb_dict" ${index_id}
-> SchemaCloudDictionary
//
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id}
${partition_id} ${tablet_id} -> TabletStatsPB
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id}
${partition_id} ${tablet_id} "data_size" -> int64
@@ -163,9 +164,6 @@ using JobRecycleKeyInfo = BasicKeyInfo<20 ,
std::tuple<std::string>>;
// 0:instance_id
1:index_id 2:schema_version
using MetaSchemaKeyInfo = BasicKeyInfo<21, std::tuple<std::string,
int64_t, int64_t>>;
-// 0:instance_id
1:tablet_id 2:rowset_id
-using MetaRowsetSchemaKeyInfo = BasicKeyInfo<21, std::tuple<std::string,
int64_t, std::string>>;
-
// 0:instance_id
1:tablet_id 2:rowest_id 3:version 4:seg_id
using MetaDeleteBitmapInfo = BasicKeyInfo<22 , std::tuple<std::string,
int64_t, std::string, int64_t, int64_t>>;
@@ -184,6 +182,9 @@ using StorageVaultKeyInfo = BasicKeyInfo<26,
std::tuple<std::string, std::string
// 0:instance_id 1:db_id
2:table_id
using TableVersionKeyInfo = BasicKeyInfo<27, std::tuple<std::string, int64_t,
int64_t>>;
+// 0:instance_id
1:index_id
+using MetaSchemaPBDictionaryInfo = BasicKeyInfo<28 , std::tuple<std::string,
int64_t>>;
+
void instance_key(const InstanceKeyInfo& in, std::string* out);
static inline std::string instance_key(const InstanceKeyInfo& in) {
std::string s; instance_key(in, &s); return s; }
@@ -213,19 +214,19 @@ void meta_rowset_tmp_key(const MetaRowsetTmpKeyInfo& in,
std::string* out);
void meta_tablet_idx_key(const MetaTabletIdxKeyInfo& in, std::string* out);
void meta_tablet_key(const MetaTabletKeyInfo& in, std::string* out);
void meta_schema_key(const MetaSchemaKeyInfo& in, std::string* out);
-void meta_rowset_schema_key(const MetaRowsetSchemaKeyInfo& in, std::string*
out);
void meta_delete_bitmap_key(const MetaDeleteBitmapInfo& in, std::string* out);
void meta_delete_bitmap_update_lock_key(const MetaDeleteBitmapUpdateLockInfo&
in, std::string* out);
void meta_pending_delete_bitmap_key(const MetaPendingDeleteBitmapInfo& in,
std::string* out);
+void meta_schema_pb_dictionary_key(const MetaSchemaPBDictionaryInfo& in,
std::string* out);
static inline std::string meta_rowset_key(const MetaRowsetKeyInfo& in) {
std::string s; meta_rowset_key(in, &s); return s; }
static inline std::string meta_rowset_tmp_key(const MetaRowsetTmpKeyInfo& in)
{ std::string s; meta_rowset_tmp_key(in, &s); return s; }
static inline std::string meta_tablet_idx_key(const MetaTabletIdxKeyInfo& in)
{ std::string s; meta_tablet_idx_key(in, &s); return s; }
static inline std::string meta_tablet_key(const MetaTabletKeyInfo& in) {
std::string s; meta_tablet_key(in, &s); return s; }
static inline std::string meta_schema_key(const MetaSchemaKeyInfo& in) {
std::string s; meta_schema_key(in, &s); return s; }
-static inline std::string meta_rowset_schema_key(const
MetaRowsetSchemaKeyInfo& in) { std::string s; meta_rowset_schema_key(in, &s);
return s; }
static inline std::string meta_delete_bitmap_key(const MetaDeleteBitmapInfo&
in) { std::string s; meta_delete_bitmap_key(in, &s); return s; }
static inline std::string meta_delete_bitmap_update_lock_key(const
MetaDeleteBitmapUpdateLockInfo& in) { std::string s;
meta_delete_bitmap_update_lock_key(in, &s); return s; }
static inline std::string meta_pending_delete_bitmap_key(const
MetaPendingDeleteBitmapInfo& in) { std::string s;
meta_pending_delete_bitmap_key(in, &s); return s; }
+static inline std::string meta_schema_pb_dictionary_key(const
MetaSchemaPBDictionaryInfo& in) { std::string s;
meta_schema_pb_dictionary_key(in, &s); return s; }
std::string recycle_key_prefix(std::string_view instance_id);
void recycle_index_key(const RecycleIndexKeyInfo& in, std::string* out);
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index e27b6b5b944..8bdcc0581b0 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -40,6 +40,7 @@
#include <memory>
#include <sstream>
#include <string>
+#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <utility>
@@ -1087,14 +1088,11 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
DCHECK(rowset_meta.tablet_schema().has_schema_version());
DCHECK_GE(rowset_meta.tablet_schema().schema_version(), 0);
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
- std::string schema_key;
+ std::string schema_key = meta_schema_key(
+ {instance_id, rowset_meta.index_id(),
rowset_meta.schema_version()});
if (rowset_meta.has_variant_type_in_schema()) {
- // encodes schema in a seperate kv, since variant schema is
volatile
- schema_key = meta_rowset_schema_key(
- {instance_id, rowset_meta.tablet_id(),
rowset_meta.rowset_id_v2()});
- } else {
- schema_key = meta_schema_key(
- {instance_id, rowset_meta.index_id(),
rowset_meta.schema_version()});
+ write_schema_dict(code, msg, instance_id, txn.get(), &rowset_meta);
+ if (code != MetaServiceCode::OK) return;
}
put_schema_kv(code, msg, txn.get(), schema_key,
rowset_meta.tablet_schema());
if (code != MetaServiceCode::OK) return;
@@ -1439,8 +1437,12 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
rowset_meta.set_index_id(idx.index_id());
}
+ bool need_read_schema_dict = false;
auto arena = response->GetArena();
for (auto& rowset_meta : *response->mutable_rowset_meta()) {
+ if (rowset_meta.has_schema_dict_key_list()) {
+ need_read_schema_dict = true;
+ }
if (rowset_meta.has_tablet_schema()) continue;
if (!rowset_meta.has_schema_version()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -1450,15 +1452,6 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
rowset_meta.start_version(), rowset_meta.end_version());
return;
}
- if (rowset_meta.has_variant_type_in_schema()) {
- // get rowset schema kv
- auto key = meta_rowset_schema_key(
- {instance_id, idx.tablet_id(),
rowset_meta.rowset_id_v2()});
- if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, code,
msg)) {
- return;
- }
- continue;
- }
if (auto it = version_to_schema.find(rowset_meta.schema_version());
it != version_to_schema.end()) {
if (arena != nullptr) {
@@ -1475,6 +1468,12 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
rowset_meta.mutable_tablet_schema());
}
}
+
+ if (need_read_schema_dict) {
+ read_schema_from_dict(code, msg, instance_id, idx.index_id(),
txn.get(),
+ response->mutable_rowset_meta());
+ if (code != MetaServiceCode::OK) return;
+ }
}
void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController*
controller,
diff --git a/cloud/src/meta-service/meta_service_schema.cpp
b/cloud/src/meta-service/meta_service_schema.cpp
index 6f361fb9f76..ee69b1547cf 100644
--- a/cloud/src/meta-service/meta_service_schema.cpp
+++ b/cloud/src/meta-service/meta_service_schema.cpp
@@ -18,7 +18,17 @@
#include "meta-service/meta_service_schema.h"
#include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <google/protobuf/map.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/repeated_field.h>
+#include <algorithm>
+#include <cstdint>
+#include <type_traits>
+
+#include "common/config.h"
#include "common/logging.h"
#include "common/sync_point.h"
#include "common/util.h"
@@ -32,6 +42,8 @@ namespace config {
extern int16_t meta_schema_value_version;
}
+constexpr static const char* VARIANT_TYPE_NAME = "VARIANT";
+
void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key, const
doris::TabletSchemaCloudPB& schema) {
TxnErrorCode err = cloud::key_exists(txn, schema_key);
@@ -118,5 +130,247 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
// TODO(plat1ko): Apply decompression based on value version
return buf.to_pb(schema);
}
+/**
+ * Processes dictionary items, mapping them to a dictionary key and adding the
key to rowset meta.
+ * If it's a new item, generates a new key and increments the item ID. This
function is also responsible
+ * for removing dynamic parts from the original RowsetMeta's TabletSchema to
ensure the stability of
+ * FDB schema key-value pairs.
+ *
+ * @param dict The schema cloud dictionary reference, used for storing and
managing schema dictionary data.
+ * @param item_dict A mapping from item unique identifiers to their protobuf
representations, used to find
+ * and process specific item data.
+ * @param result Pointer to the collection of result items. Stores filtered or
transformed items. Can be nullptr
+ * if collecting results is not required.
+ * @param items The collection of items to be processed. These items are
filtered and potentially added to the dictionary.
+ * @param filter A function to determine which items should be processed. If
it returns true, the item is processed.
+ * @param add_dict_key_fn A function to handle the logic when a new item is
added to the dictionary, such as updating metadata.
+ */
+template <typename ItemPB>
+void process_dictionary(SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>&
item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ const google::protobuf::RepeatedPtrField<ItemPB>&
items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ // clear extended items to prevent writing them to fdb
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+void write_schema_dict(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
+ Transaction* txn, RowsetMetaCloudPB* rowset_meta) {
+ std::stringstream ss;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key)
+ << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return;
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == VARIANT_TYPE_NAME &&
!column_pb.sparse_columns().empty()) {
+ // set parent_id for restore info
+ for (auto& sparse_col : *column_pb.mutable_sparse_columns()) {
+ sparse_col.set_parent_unique_id(column_pb.unique_id());
+ }
+ sparse_columns.Add(column_pb.sparse_columns().begin(),
+ column_pb.sparse_columns().end());
+ }
+ // clear sparse columns to prevent writing them to fdb
+ column_pb.clear_sparse_columns();
+ }
+ auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+ // handle column dict
+ auto original_column_dict_id = dict.current_column_dict_id();
+ auto column_filter = [&](const doris::ColumnPB& col) -> bool { return
col.unique_id() >= 0; };
+ auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
+ process_dictionary<doris::ColumnPB>(
+ dict, dict.column_dict(),
rowset_meta->mutable_tablet_schema()->mutable_column(),
+ rowset_meta->tablet_schema().column(), column_filter,
column_dict_adder);
+
+ // handle sparse column dict
+ auto sparse_column_dict_adder = [&](int32_t key) {
+ dict_list->add_sparse_column_dict_key_list(key);
+ };
+ // not filter any
+ auto sparse_column_filter = [&](const doris::ColumnPB& col) -> bool {
return false; };
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(), nullptr,
sparse_columns,
+ sparse_column_filter,
sparse_column_dict_adder);
+
+ // handle index info dict
+ auto original_index_dict_id = dict.current_index_dict_id();
+ auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool {
+ return index_pb.index_suffix_name().empty();
+ };
+ auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
+ process_dictionary<doris::TabletIndexPB>(
+ dict, dict.index_dict(),
rowset_meta->mutable_tablet_schema()->mutable_index(),
+ rowset_meta->tablet_schema().index(), index_filter,
index_dict_adder);
+
+ // Write back modified dictionaries.
+ if (original_index_dict_id != dict.current_index_dict_id() ||
+ original_column_dict_id != dict.current_column_dict_id()) {
+ // If dictionary was modified, serialize and save it.
+ std::string dict_val;
+ if (!dict.SerializeToString(&dict_val)) {
+ // Handle serialization error.
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "Failed to serialize dictionary for saving, txn_id=" <<
rowset_meta->txn_id();
+ msg = ss.str();
+ return;
+ }
+ // Limit the size of dict value
+ if (dict_val.size() > config::schema_dict_kv_size_limit) {
+ code = MetaServiceCode::KV_TXN_COMMIT_ERR;
+ ss << "Failed to write dictionary for saving, txn_id=" <<
rowset_meta->txn_id()
+ << ", reached the limited size threshold of SchemaDictKeyList "
+ << config::schema_dict_kv_size_limit;
+ msg = ss.str();
+ }
+ // splitting large values (>90*1000) into multiple KVs
+ cloud::put(txn, dict_key, dict_val, 0);
+ LOG(INFO) << "Dictionary saved, key=" << hex(dict_key)
+ << " txn_id=" << rowset_meta->txn_id() << " Dict size=" <<
dict.column_dict_size()
+ << ", Current column ID=" << dict.current_column_dict_id()
+ << ", Current index ID=" << dict.current_index_dict_id();
+ }
+}
+
+void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
+ int64_t index_id, Transaction* txn,
+
google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>* rowset_metas) {
+ std::stringstream ss;
+
+ // read dict if any rowset has dict key list
+ SchemaCloudDictionary dict;
+ std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id,
index_id});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, column_dict_key, &dict_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "internal error, failed to get dict ret=" << err;
+ msg = ss.str();
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&dict)) [[unlikely]] {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "failed to parse SchemaCloudDictionary";
+ return;
+ }
+ LOG(INFO) << "Get schema_dict, column size=" << dict.column_dict_size()
+ << ", index size=" << dict.index_dict_size();
+
+ auto fill_schema_with_dict = [&](RowsetMetaCloudPB* out) {
+ std::unordered_map<int32_t, ColumnPB*> unique_id_map;
+ //init map
+ for (ColumnPB& column :
*out->mutable_tablet_schema()->mutable_column()) {
+ unique_id_map[column.unique_id()] = &column;
+ }
+ // column info
+ for (size_t i = 0; i <
out->schema_dict_key_list().column_dict_key_list_size(); ++i) {
+ int dict_key = out->schema_dict_key_list().column_dict_key_list(i);
+ const ColumnPB& dict_val = dict.column_dict().at(dict_key);
+ ColumnPB& to_add = *out->mutable_tablet_schema()->add_column();
+ to_add = dict_val;
+ VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString();
+ }
+
+ // index info
+ for (size_t i = 0; i <
out->schema_dict_key_list().index_info_dict_key_list_size(); ++i) {
+ int dict_key =
out->schema_dict_key_list().index_info_dict_key_list(i);
+ const doris::TabletIndexPB& dict_val =
dict.index_dict().at(dict_key);
+ doris::TabletIndexPB& to_add =
*out->mutable_tablet_schema()->add_index();
+ to_add = dict_val;
+ VLOG_DEBUG << "fill dict index " << dict_val.ShortDebugString();
+ }
+
+ // sparse column info
+ for (size_t i = 0; i <
out->schema_dict_key_list().sparse_column_dict_key_list_size();
+ ++i) {
+ int dict_key =
out->schema_dict_key_list().sparse_column_dict_key_list(i);
+ const ColumnPB& dict_val = dict.column_dict().at(dict_key);
+
*unique_id_map.at(dict_val.parent_unique_id())->add_sparse_columns() = dict_val;
+ VLOG_DEBUG << "fill dict sparse column" <<
dict_val.ShortDebugString();
+ }
+ };
+
+ // fill rowsets's schema with dict info
+ for (auto& rowset_meta : *rowset_metas) {
+ if (rowset_meta.has_schema_dict_key_list()) {
+ fill_schema_with_dict(&rowset_meta);
+ }
+ }
+}
} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service_schema.h
b/cloud/src/meta-service/meta_service_schema.h
index 44fabeafd73..d44f01f9747 100644
--- a/cloud/src/meta-service/meta_service_schema.h
+++ b/cloud/src/meta-service/meta_service_schema.h
@@ -30,4 +30,13 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg,
Transaction* txn,
// Return true if parse success
[[nodiscard]] bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema);
+// Writes schema dictionary metadata to RowsetMetaCloudPB
+void write_schema_dict(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
+ Transaction* txn, RowsetMetaCloudPB* rowset_meta);
+
+// Read schema from dictionary metadata, modified to rowset_metas
+void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
+ int64_t index_id, Transaction* txn,
+
google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>* rowset_metas);
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index ed39dd8df66..3089aeb66d6 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1153,6 +1153,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
txn->remove(job_key_begin, job_key_end);
LOG(WARNING) << "remove job kv, begin=" << hex(job_key_begin) << " end="
<< hex(job_key_end);
std::string schema_key_begin, schema_key_end;
+ std::string schema_dict_key;
if (partition_id <= 0) {
// Delete schema kv of this index
meta_schema_key({instance_id_, index_id, 0}, &schema_key_begin);
@@ -1160,6 +1161,9 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
txn->remove(schema_key_begin, schema_key_end);
LOG(WARNING) << "remove schema kv, begin=" << hex(schema_key_begin)
<< " end=" << hex(schema_key_end);
+ meta_schema_pb_dictionary_key({instance_id_, index_id},
&schema_dict_key);
+ txn->remove(schema_dict_key);
+ LOG(WARNING) << "remove schema dict kv, key=" << hex(schema_dict_key);
}
TxnErrorCode err = txn->commit();
@@ -1336,11 +1340,6 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_,
tablet_id + 1, "", 0, 0});
txn->remove(delete_bitmap_start, delete_bitmap_end);
- // remove rowset schema
- std::string rowset_schema_start = meta_rowset_schema_key({instance_id_,
tablet_id, ""});
- std::string rowset_schema_end = meta_rowset_schema_key({instance_id_,
tablet_id + 1, ""});
- txn->remove(rowset_schema_start, rowset_schema_end);
-
TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id
<< ", err=" << err;
@@ -1401,7 +1400,6 @@ int InstanceRecycler::recycle_rowsets() {
});
std::vector<std::string> rowset_keys;
- std::vector<std::string> rowset_schema_keys;
std::vector<doris::RowsetMetaCloudPB> rowsets;
// Store keys of rowset recycled by background workers
@@ -1529,9 +1527,6 @@ int InstanceRecycler::recycle_rowsets() {
return -1;
}
} else {
- auto schema_key = meta_rowset_schema_key(
- {instance_id_, rowset_meta->tablet_id(),
rowset_meta->rowset_id_v2()});
- rowset_schema_keys.push_back(std::move(schema_key));
rowset_keys.push_back(std::string(k));
if (rowset_meta->num_segments() > 0) { // Skip empty rowset
rowsets.push_back(std::move(*rowset_meta));
@@ -1542,23 +1537,15 @@ int InstanceRecycler::recycle_rowsets() {
auto loop_done = [&]() -> int {
std::vector<std::string> rowset_keys_to_delete;
- std::vector<std::string> rowset_schema_keys_to_delete;
std::vector<doris::RowsetMetaCloudPB> rowsets_to_delete;
rowset_keys_to_delete.swap(rowset_keys);
- rowset_schema_keys_to_delete.swap(rowset_schema_keys);
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete =
std::move(rowset_keys_to_delete),
- rowsets_to_delete = std::move(rowsets_to_delete),
- rowset_schema_keys_to_delete =
-
std::move(rowset_schema_keys_to_delete)]() {
+ rowsets_to_delete =
std::move(rowsets_to_delete)]() {
if (delete_rowset_data(rowsets_to_delete) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id="
<< instance_id_;
return;
}
- if (txn_remove(txn_kv_.get(), rowset_schema_keys_to_delete) != 0) {
- LOG(WARNING) << "failed to delete recycle rowset kv,
instance_id=" << instance_id_;
- return;
- }
if (txn_remove(txn_kv_.get(), rowset_keys_to_delete) != 0) {
LOG(WARNING) << "failed to delete recycle rowset kv,
instance_id=" << instance_id_;
return;
@@ -1618,11 +1605,10 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// Elements in `tmp_rowset_keys` has the same lifetime as `it`
std::vector<std::string_view> tmp_rowset_keys;
- std::vector<std::string> tmp_rowset_schema_keys;
std::vector<doris::RowsetMetaCloudPB> tmp_rowsets;
auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys,
&tmp_rowsets,
- &expired_rowset_size, &total_rowset_size,
&tmp_rowset_schema_keys,
+ &expired_rowset_size, &total_rowset_size,
this](std::string_view k, std::string_view v) ->
int {
++num_scanned;
total_rowset_size += v.size();
@@ -1652,11 +1638,6 @@ int InstanceRecycler::recycle_tmp_rowsets() {
tmp_rowset_keys.push_back(k);
return 0;
}
- if (rowset.has_variant_type_in_schema()) {
- auto schema_key = meta_rowset_schema_key(
- {instance_id_, rowset.tablet_id(), rowset.rowset_id_v2()});
- tmp_rowset_schema_keys.push_back(std::move(schema_key));
- }
// TODO(plat1ko): check rowset not referenced
LOG(INFO) << "delete rowset data, instance_id=" << instance_id_
<< " tablet_id=" << rowset.tablet_id() << " rowset_id=" <<
rowset.rowset_id_v2()
@@ -1670,8 +1651,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
return 0;
};
- auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled,
&tmp_rowset_schema_keys,
- this]() -> int {
+ auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, this]()
-> int {
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
@@ -1680,10 +1660,6 @@ int InstanceRecycler::recycle_tmp_rowsets() {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id="
<< instance_id_;
return -1;
}
- if (txn_remove(txn_kv_.get(), tmp_rowset_schema_keys) != 0) {
- LOG(WARNING) << "failed to delete tmp rowset schema kv,
instance_id=" << instance_id_;
- return -1;
- }
if (txn_remove(txn_kv_.get(), tmp_rowset_keys) != 0) {
LOG(WARNING) << "failed to delete tmp rowset kv, instance_id=" <<
instance_id_;
return -1;
diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp
index ce7cbb0551e..a92a685cda4 100644
--- a/cloud/test/keys_test.cpp
+++ b/cloud/test/keys_test.cpp
@@ -21,6 +21,7 @@
#include <bthread/countdown_event.h>
#include <gtest/gtest.h>
+#include <cstdint>
#include <cstring>
#include <iostream>
#include <random>
@@ -1024,3 +1025,32 @@ TEST(KeysTest, DecodeKeysTest) {
ASSERT_TRUE(!pretty_key.empty()) << key;
std::cout << "\n" << pretty_key << std::endl;
}
+
+TEST(KeysTest, MetaSchemaPBDictionaryTest) {
+ using namespace doris::cloud;
+ std::string instance_id = "instance_id_meta_dict";
+ int64_t index_id = 123456;
+
+ // 0:instance_id 1:index_id
+ MetaSchemaPBDictionaryInfo dict_key {instance_id, index_id};
+ std::string encoded_dict_key;
+ meta_schema_pb_dictionary_key(dict_key, &encoded_dict_key);
+ std::cout << hex(encoded_dict_key) << std::endl;
+
+ std::string decoded_instance_id;
+ std::string decoded_prefix;
+ std::string decoded_meta_prefix;
+ int64_t decoded_index_id;
+ std::string_view key_sv(encoded_dict_key);
+ remove_user_space_prefix(&key_sv);
+ ASSERT_EQ(decode_bytes(&key_sv, &decoded_prefix), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &decoded_instance_id), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &decoded_meta_prefix), 0);
+ ASSERT_EQ(decode_int64(&key_sv, &decoded_index_id), 0);
+ ASSERT_TRUE(key_sv.empty());
+
+ EXPECT_EQ("meta", decoded_prefix);
+ EXPECT_EQ("tablet_schema_pb_dict", decoded_meta_prefix);
+ EXPECT_EQ(instance_id, decoded_instance_id);
+ EXPECT_EQ(index_id, decoded_index_id);
+}
diff --git a/cloud/test/schema_kv_test.cpp b/cloud/test/schema_kv_test.cpp
index 8fc03b951ce..4d52533a710 100644
--- a/cloud/test/schema_kv_test.cpp
+++ b/cloud/test/schema_kv_test.cpp
@@ -280,7 +280,8 @@ static void commit_txn(MetaServiceProxy* meta_service,
int64_t db_id, int64_t tx
static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id,
const std::string& rowset_id,
int32_t schema_version,
- int64_t version = -1) {
+ int64_t version = -1,
+ const TabletSchemaCloudPB*
schema = nullptr) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(rowset_id);
@@ -295,6 +296,10 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t
txn_id, int64_t tablet_id,
}
rowset.mutable_tablet_schema()->set_schema_version(schema_version);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
+ if (schema != nullptr) {
+ rowset.mutable_tablet_schema()->CopyFrom(*schema);
+ rowset.mutable_tablet_schema()->set_schema_version(schema_version);
+ }
return rowset;
}
@@ -319,19 +324,54 @@ static void commit_rowset(MetaServiceProxy* meta_service,
const doris::RowsetMet
}
static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
- int64_t table_id, int64_t tablet_id, int32_t
schema_version) {
+ int64_t table_id, int64_t tablet_id, int32_t
schema_version,
+ const TabletSchemaCloudPB* schema = nullptr) {
int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id,
txn_id));
CreateRowsetResponse res;
- auto rowset = create_rowset(txn_id, tablet_id, next_rowset_id(),
schema_version);
+ auto rowset = create_rowset(txn_id, tablet_id, next_rowset_id(),
schema_version, -1, schema);
+ rowset.set_has_variant_type_in_schema(schema != nullptr);
prepare_rowset(meta_service, rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res));
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label << ", msg="
<< res.status().msg();
commit_txn(meta_service, db_id, txn_id, label);
}
+static TabletSchemaCloudPB getVariantSchema() {
+ TabletSchemaCloudPB schema;
+ schema.set_schema_version(3);
+ // columns
+ ColumnPB var;
+ var.set_type("VARIANT");
+ var.set_unique_id(10);
+ ColumnPB var_sub1;
+ var_sub1.set_type("INT");
+ var_sub1.set_unique_id(-1);
+ schema.add_column()->CopyFrom(var_sub1);
+ ColumnPB var_sub2;
+ var_sub2.set_type("DOUBLE");
+ var_sub2.set_unique_id(-1);
+ schema.add_column()->CopyFrom(var_sub2);
+ ColumnPB var_sparse_sub1;
+ var_sparse_sub1.set_type("DOUBLE");
+ var_sparse_sub1.set_unique_id(-1);
+ var.add_sparse_columns()->CopyFrom(var_sparse_sub1);
+ schema.add_column()->CopyFrom(var);
+
+ // indexes
+ doris::TabletIndexPB index1;
+ index1.set_index_id(111);
+ index1.set_index_suffix_name("aaabbbccc");
+ schema.add_index()->CopyFrom(index1);
+
+ doris::TabletIndexPB index2;
+ index2.set_index_id(222);
+ schema.add_index()->CopyFrom(index2);
+ return schema;
+}
+
TEST(DetachSchemaKVTest, RowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
@@ -439,7 +479,8 @@ TEST(DetachSchemaKVTest, RowsetTest) {
auto insert_and_get_rowset = [&meta_service](int64_t table_id, int64_t
index_id,
int64_t partition_id, int64_t
tablet_id,
int label_base,
- google::protobuf::Arena*
arena = nullptr) {
+ google::protobuf::Arena*
arena = nullptr,
+ const TabletSchemaCloudPB*
schema = nullptr) {
config::write_schema_kv = false;
std::mt19937
rng(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist1(1, 4);
@@ -451,14 +492,14 @@ TEST(DetachSchemaKVTest, RowsetTest) {
schema_versions.push_back(dist1(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
- schema_versions.back()));
+ schema_versions.back(),
schema));
}
config::write_schema_kv = true;
for (int i = 0; i < 15; ++i) {
schema_versions.push_back(dist2(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
- schema_versions.back()));
+ schema_versions.back(),
schema));
}
// check get rowset response
auto get_rowset_res =
google::protobuf::Arena::CreateMessage<GetRowsetResponse>(arena);
@@ -481,11 +522,21 @@ TEST(DetachSchemaKVTest, RowsetTest) {
EXPECT_EQ(get_rowset_res->stats().num_rowsets(), 26);
EXPECT_EQ(get_rowset_res->stats().num_segments(), 25);
EXPECT_EQ(get_rowset_res->stats().data_size(), 250000);
+ if (schema != nullptr) {
+ auto schema_version =
get_rowset_res->rowset_meta(10).schema_version();
+
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(3);
+
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().SerializeAsString(),
+ schema->SerializeAsString());
+
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(
+ schema_version);
+ }
};
insert_and_get_rowset(10031, 10032, 10033, 10034, 300);
// use arena
google::protobuf::Arena arena;
insert_and_get_rowset(10041, 10042, 10043, 10044, 400, &arena);
+ TabletSchemaCloudPB schema = getVariantSchema();
+ insert_and_get_rowset(10051, 10052, 10053, 10054, 500, &arena, &schema);
}
TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 3ab783c6f8a..82a9011dc1c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -133,6 +133,15 @@ message RowsetMetaPB {
optional bool has_variant_type_in_schema = 1005;
}
+message SchemaDictKeyList {
+ // save the dict keys for column pb info
+ repeated int32 column_dict_key_list = 1;
+ // save the dict keys for tablet index pb info
+ repeated int32 index_info_dict_key_list = 2;
+ // save the dict keys for sparse column pb info
+ repeated int32 sparse_column_dict_key_list = 3;
+};
+
message RowsetMetaCloudPB {
required int64 rowset_id = 1; // Deprecated. Use rowset_id_v2 instead.
optional int64 partition_id = 2;
@@ -200,7 +209,11 @@ message RowsetMetaCloudPB {
// If enable_segments_file_size is false,
// the segments_file_size maybe is empty or error
optional bool enable_segments_file_size = 103;
+
+ // extra info for variants
optional bool has_variant_type_in_schema = 104;
+ // dict key lists for compress schema info
+ optional SchemaDictKeyList schema_dict_key_list = 105;
}
message SegmentStatisticsPB {
@@ -299,6 +312,18 @@ message ColumnPB {
// sparse column within a variant column
repeated ColumnPB sparse_columns = 21;
optional bool is_auto_increment = 22;
+ // only reference by variant sparse columns
+ optional int32 parent_unique_id = 23;
+}
+
+// Dictionary of Schema info, to reduce TabletSchemaCloudPB fdb kv size
+message SchemaCloudDictionary {
+ map<int32, ColumnPB> column_dict= 1;
+ // monotonic increasing
+ optional int64 current_column_dict_id = 2;
+ map<int32, TabletIndexPB> index_dict = 3;
+ // monotonic increasing
+ optional int64 current_index_dict_id = 4;
}
enum IndexType {
diff --git a/regression-test/data/variant_p0/concurrent_insert.out
b/regression-test/data/variant_p0/concurrent_insert.out
new file mode 100644
index 00000000000..610f877220f
--- /dev/null
+++ b/regression-test/data/variant_p0/concurrent_insert.out
@@ -0,0 +1,103 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_1 --
+0 {"k0":0,"x10":123} {"k0":0,"x10":123}
+0 {"k0":0,"x100":123} {"k0":0,"x100":123}
+0 {"k0":0,"x110":123} {"k0":0,"x110":123}
+0 {"k0":0,"x120":123} {"k0":0,"x120":123}
+0 {"k0":0,"x130":123} {"k0":0,"x130":123}
+0 {"k0":0,"x140":123} {"k0":0,"x140":123}
+0 {"k0":0,"x150":123} {"k0":0,"x150":123}
+0 {"k0":0,"x160":123} {"k0":0,"x160":123}
+0 {"k0":0,"x170":123} {"k0":0,"x170":123}
+0 {"k0":0,"x180":123} {"k0":0,"x180":123}
+0 {"k0":0,"x20":123} {"k0":0,"x20":123}
+0 {"k0":0,"x30":123} {"k0":0,"x30":123}
+0 {"k0":0,"x40":123} {"k0":0,"x40":123}
+0 {"k0":0,"x50":123} {"k0":0,"x50":123}
+0 {"k0":0,"x60":123} {"k0":0,"x60":123}
+0 {"k0":0,"x70":123} {"k0":0,"x70":123}
+0 {"k0":0,"x80":123} {"k0":0,"x80":123}
+0 {"k0":0,"x90":123} {"k0":0,"x90":123}
+1 {"k1":1,"x1":123} {"k1":1,"x1":123}
+1 {"k1":1,"x101":123} {"k1":1,"x101":123}
+1 {"k1":1,"x11":123} {"k1":1,"x11":123}
+1 {"k1":1,"x111":123} {"k1":1,"x111":123}
+1 {"k1":1,"x121":123} {"k1":1,"x121":123}
+1 {"k1":1,"x131":123} {"k1":1,"x131":123}
+1 {"k1":1,"x141":123} {"k1":1,"x141":123}
+1 {"k1":1,"x151":123} {"k1":1,"x151":123}
+1 {"k1":1,"x161":123} {"k1":1,"x161":123}
+1 {"k1":1,"x171":123} {"k1":1,"x171":123}
+1 {"k1":1,"x21":123} {"k1":1,"x21":123}
+1 {"k1":1,"x31":123} {"k1":1,"x31":123}
+1 {"k1":1,"x41":123} {"k1":1,"x41":123}
+1 {"k1":1,"x51":123} {"k1":1,"x51":123}
+1 {"k1":1,"x61":123} {"k1":1,"x61":123}
+1 {"k1":1,"x71":123} {"k1":1,"x71":123}
+1 {"k1":1,"x81":123} {"k1":1,"x81":123}
+1 {"k1":1,"x91":123} {"k1":1,"x91":123}
+2 {"k2":2,"x102":123} {"k2":2,"x102":123}
+2 {"k2":2,"x112":123} {"k2":2,"x112":123}
+2 {"k2":2,"x12":123} {"k2":2,"x12":123}
+2 {"k2":2,"x122":123} {"k2":2,"x122":123}
+2 {"k2":2,"x132":123} {"k2":2,"x132":123}
+2 {"k2":2,"x142":123} {"k2":2,"x142":123}
+2 {"k2":2,"x152":123} {"k2":2,"x152":123}
+2 {"k2":2,"x162":123} {"k2":2,"x162":123}
+2 {"k2":2,"x172":123} {"k2":2,"x172":123}
+2 {"k2":2,"x2":123} {"k2":2,"x2":123}
+2 {"k2":2,"x22":123} {"k2":2,"x22":123}
+2 {"k2":2,"x32":123} {"k2":2,"x32":123}
+2 {"k2":2,"x42":123} {"k2":2,"x42":123}
+2 {"k2":2,"x52":123} {"k2":2,"x52":123}
+2 {"k2":2,"x62":123} {"k2":2,"x62":123}
+2 {"k2":2,"x72":123} {"k2":2,"x72":123}
+2 {"k2":2,"x82":123} {"k2":2,"x82":123}
+2 {"k2":2,"x92":123} {"k2":2,"x92":123}
+3 {"k3":3,"x103":123} {"k3":3,"x103":123}
+3 {"k3":3,"x113":123} {"k3":3,"x113":123}
+3 {"k3":3,"x123":123} {"k3":3,"x123":123}
+3 {"k3":3,"x13":123} {"k3":3,"x13":123}
+3 {"k3":3,"x133":123} {"k3":3,"x133":123}
+3 {"k3":3,"x143":123} {"k3":3,"x143":123}
+3 {"k3":3,"x153":123} {"k3":3,"x153":123}
+3 {"k3":3,"x163":123} {"k3":3,"x163":123}
+3 {"k3":3,"x173":123} {"k3":3,"x173":123}
+3 {"k3":3,"x23":123} {"k3":3,"x23":123}
+3 {"k3":3,"x3":123} {"k3":3,"x3":123}
+3 {"k3":3,"x33":123} {"k3":3,"x33":123}
+3 {"k3":3,"x43":123} {"k3":3,"x43":123}
+3 {"k3":3,"x53":123} {"k3":3,"x53":123}
+3 {"k3":3,"x63":123} {"k3":3,"x63":123}
+3 {"k3":3,"x73":123} {"k3":3,"x73":123}
+3 {"k3":3,"x83":123} {"k3":3,"x83":123}
+3 {"k3":3,"x93":123} {"k3":3,"x93":123}
+4 {"k4":4,"x104":123} {"k4":4,"x104":123}
+4 {"k4":4,"x114":123} {"k4":4,"x114":123}
+4 {"k4":4,"x124":123} {"k4":4,"x124":123}
+4 {"k4":4,"x134":123} {"k4":4,"x134":123}
+4 {"k4":4,"x14":123} {"k4":4,"x14":123}
+4 {"k4":4,"x144":123} {"k4":4,"x144":123}
+4 {"k4":4,"x154":123} {"k4":4,"x154":123}
+4 {"k4":4,"x164":123} {"k4":4,"x164":123}
+4 {"k4":4,"x174":123} {"k4":4,"x174":123}
+4 {"k4":4,"x24":123} {"k4":4,"x24":123}
+4 {"k4":4,"x34":123} {"k4":4,"x34":123}
+4 {"k4":4,"x4":123} {"k4":4,"x4":123}
+4 {"k4":4,"x44":123} {"k4":4,"x44":123}
+4 {"k4":4,"x54":123} {"k4":4,"x54":123}
+4 {"k4":4,"x64":123} {"k4":4,"x64":123}
+4 {"k4":4,"x74":123} {"k4":4,"x74":123}
+4 {"k4":4,"x84":123} {"k4":4,"x84":123}
+4 {"k4":4,"x94":123} {"k4":4,"x94":123}
+5 {"k5":5,"x105":123} {"k5":5,"x105":123}
+5 {"k5":5,"x115":123} {"k5":5,"x115":123}
+5 {"k5":5,"x125":123} {"k5":5,"x125":123}
+5 {"k5":5,"x135":123} {"k5":5,"x135":123}
+5 {"k5":5,"x145":123} {"k5":5,"x145":123}
+5 {"k5":5,"x15":123} {"k5":5,"x15":123}
+5 {"k5":5,"x155":123} {"k5":5,"x155":123}
+5 {"k5":5,"x165":123} {"k5":5,"x165":123}
+5 {"k5":5,"x175":123} {"k5":5,"x175":123}
+5 {"k5":5,"x25":123} {"k5":5,"x25":123}
+
diff --git a/regression-test/suites/variant_github_events_p0/load.groovy
b/regression-test/suites/variant_github_events_p0/load.groovy
index 1f1b510e7a6..e01d64ce19a 100644
--- a/regression-test/suites/variant_github_events_p0/load.groovy
+++ b/regression-test/suites/variant_github_events_p0/load.groovy
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
suite("regression_test_variant_github_events_p0", "nonConcurrent"){
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
@@ -77,6 +79,35 @@ suite("regression_test_variant_github_events_p0",
"nonConcurrent"){
load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-10.json'}""")
load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-22.json'}""")
load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-23.json'}""")
+
+ def tablets = sql_return_maparray """ show tablets from github_events; """
+ // trigger compactions for all tablets in github_events
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" + out
+ ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+
// TODO fix compaction issue, this case could be stable
qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int)
from github_events where cast(v["repo"]["name"] as string) =
'xpressengine/xe-core' order by 1;"""
qt_sql """select * from github_events where cast(v["repo"]["name"] as
string) = 'xpressengine/xe-core' order by 1 limit 10"""
diff --git a/regression-test/suites/variant_p0/complexjson.groovy
b/regression-test/suites/variant_p0/complexjson.groovy
index 0ebb038bb34..012089a3f84 100644
--- a/regression-test/suites/variant_p0/complexjson.groovy
+++ b/regression-test/suites/variant_p0/complexjson.groovy
@@ -28,7 +28,7 @@ suite("regression_test_variant_complexjson",
"variant_type_complex_json") {
properties("replication_num" = "1", "disable_auto_compaction" =
"true");
"""
}
- table_name = "complexjson"
+ def table_name = "complexjson"
create_table table_name
sql """insert into ${table_name} values (1, '{
"id": 1,
diff --git a/regression-test/suites/variant_p0/concurrent_insert.groovy
b/regression-test/suites/variant_p0/concurrent_insert.groovy
new file mode 100644
index 00000000000..d8a96c92417
--- /dev/null
+++ b/regression-test/suites/variant_p0/concurrent_insert.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("regression_test_variant_concurrent_schema_update", ""){
+ def table_name = "var_concurrent"
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant,
+ v1 variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 3
+ properties("replication_num" = "1");
+ """
+ t1 = Thread.startDaemon {
+ for (int k = 1; k <= 60; k++) {
+ int x = k % 10;
+ sql """insert into ${table_name} values(${x}, '{"k${x}" : ${x},
"x${k}" : 123}', '{"k${x}" : ${x}, "x${k}" : 123}')"""
+ }
+ }
+ t2 = Thread.startDaemon {
+ for (int k = 61; k <= 120; k++) {
+ int x = k % 10;
+ sql """insert into ${table_name} values(${x}, '{"k${x}" : ${x},
"x${k}" : 123}', '{"k${x}" : ${x}, "x${k}" : 123}')"""
+ }
+ }
+ t3 = Thread.startDaemon {
+ for (int k = 121; k <= 180; k++) {
+ int x = k % 10;
+ sql """insert into ${table_name} values(${x}, '{"k${x}" : ${x},
"x${k}" : 123}', '{"k${x}" : ${x}, "x${k}" : 123}')"""
+ }
+ }
+ t1.join()
+ t2.join()
+ t3.join()
+ qt_sql_1 "select * from ${table_name} order by k, cast(v as string),
cast(v1 as string) limit 100"
+ // qt_sql_3 """desc ${table_name}"""
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]