xiaokang commented on code in PR #33298:
URL: https://github.com/apache/doris/pull/33298#discussion_r1558634769
##########
be/src/olap/rowset/segment_v2/segment.cpp:
##########
@@ -413,10 +413,12 @@ Status Segment::_create_column_readers(const
SegmentFooterPB& footer) {
// init by column path
for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns();
++ordinal) {
auto& column = _tablet_schema->column(ordinal);
- if (!column.has_path_info()) {
+ if (!column.has_path_info() && !column.is_variant_type()) {
Review Comment:
!column.has_path_info() is not enough? What's the sematics of
column.has_path_info()? Should it return true for normal column?
##########
cloud/src/meta-service/meta_service_schema.h:
##########
@@ -30,4 +30,17 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg,
Transaction* txn,
// Return true if parse success
[[nodiscard]] bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema);
+// Writes schema dictionary metadata to RowsetMetaCloudPB
+[[nodiscard]] std::pair<MetaServiceCode, std::string> write_schema_dict(
Review Comment:
Can you keep the interface consistent with put_schema_kv?
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return {code, msg};
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == "VARIANT" &&
!column_pb.sparse_columns().empty()) {
Review Comment:
define a const string for "VARIANT"
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
Review Comment:
suggestion for more readable: rename to mutable_columns_or_indexes or add
comment
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
Review Comment:
What's the purpose to replace columns/indexes by new ones without dict.
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
Review Comment:
check TXN_OK directly
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
Review Comment:
rowset_meta is not used in this function.
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -1080,14 +1081,11 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
DCHECK(rowset_meta.tablet_schema().has_schema_version());
DCHECK_GE(rowset_meta.tablet_schema().schema_version(), 0);
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
- std::string schema_key;
- if (rowset_meta.has_variant_type_in_schema()) {
- // encodes schema in a seperate kv, since variant schema is
volatile
- schema_key = meta_rowset_schema_key(
- {instance_id, rowset_meta.tablet_id(),
rowset_meta.rowset_id_v2()});
- } else {
- schema_key = meta_schema_key(
+ std::string schema_key = meta_schema_key(
{instance_id, rowset_meta.index_id(),
rowset_meta.schema_version()});
+ if (rowset_meta.has_variant_type_in_schema()) {
+ std::tie(code, msg) = write_schema_dict(instance_id, txn.get(),
&rowset_meta);
Review Comment:
`write_schema_dict` and `put_schema_kv` are not in one transaction. Is there
any problem if the first is success but the second is failed?
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return {code, msg};
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == "VARIANT" &&
!column_pb.sparse_columns().empty()) {
+ // set parent_id for restore info
+ for (auto& sparse_col: *column_pb.mutable_sparse_columns()) {
+ sparse_col.set_parent_unique_id(column_pb.unique_id());
+ }
+ sparse_columns.Add(column_pb.sparse_columns().begin(),
column_pb.sparse_columns().end());
+ }
+ column_pb.clear_sparse_columns();
Review Comment:
Why change original column_pb?
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return {code, msg};
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == "VARIANT" &&
!column_pb.sparse_columns().empty()) {
+ // set parent_id for restore info
+ for (auto& sparse_col: *column_pb.mutable_sparse_columns()) {
+ sparse_col.set_parent_unique_id(column_pb.unique_id());
+ }
+ sparse_columns.Add(column_pb.sparse_columns().begin(),
column_pb.sparse_columns().end());
+ }
+ column_pb.clear_sparse_columns();
+ }
+ auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+ // handle column dict
+ auto original_column_dict_id = dict.current_column_dict_id();
+ auto column_filter = [&](const doris::ColumnPB& col) -> bool {return
col.unique_id() >= 0;};
+ auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_column(),
+ rowset_meta, rowset_meta->tablet_schema().column(),
+ column_filter, column_dict_adder);
+
+ // handle sparse column dict
+ auto sparse_column_dict_adder = [&](int32_t key) {
dict_list->add_sparse_column_dict_key_list(key); };
+ // not filter any
+ auto sparse_column_filter = [&](const doris::ColumnPB& col) -> bool
{return false;};
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ nullptr, rowset_meta, sparse_columns,
+ sparse_column_filter, sparse_column_dict_adder);
+
+ // handle index info dict
+ auto original_index_dict_id = dict.current_index_dict_id();
+ auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool
{return index_pb.index_suffix_name().empty();};
+ auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
+ process_dictionary<doris::TabletIndexPB>(dict, dict.index_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_index(),
+ rowset_meta, rowset_meta->tablet_schema().index(),
+ index_filter, index_dict_adder);
+
+ // Write back modified dictionaries.
+ if (original_index_dict_id != dict.current_index_dict_id()
+ || original_column_dict_id != dict.current_column_dict_id()) {
+ // If dictionary was modified, serialize and save it.
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ std::string dict_val;
+ if (!dict.SerializeToString(&dict_val)) {
+ // Handle serialization error.
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "Failed to serialize dictionary for saving, txn_id=" <<
rowset_meta->txn_id();
+ msg = ss.str();
+ return {code, msg};
+ }
+ // Limit the size of dict value
+ if (dict_val.size() > config::schema_dict_kv_size_limit) {
+ code = MetaServiceCode::KV_TXN_COMMIT_ERR;
+ ss << "Failed to write dictionary for saving, txn_id=" <<
rowset_meta->txn_id()
+ << ", reached the limited size threshold of SchemaDictKeyList
" << config::schema_dict_kv_size_limit;
+ msg = ss.str();
+ }
+ // splitting large values (>90*1000) into multiple KVs
+ cloud::put(txn, dict_key, dict_val, 0);
+ LOG(INFO) << "Dictionary saved, key=" << hex(dict_key) << " txn_id="
<< rowset_meta->txn_id()
+ << " Dict size=" << dict.column_dict_size()
+ << ", Current column ID=" << dict.current_column_dict_id()
+ << ", Current index ID=" << dict.current_index_dict_id();
+ }
+ return {code, msg};
+}
+
+std::pair<MetaServiceCode, std::string> read_schema_from_dict(
+ const std::string& instance_id,
+ int64_t index_id,
+ Transaction* txn,
+ google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>*
rowset_metas) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+
+ // read dict if any rowset has dict key list
+ SchemaCloudDictionary dict;
+ std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id,
index_id});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, column_dict_key, &dict_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "internal error, failed to get dict ret=" << err;
+ msg = ss.str();
+ return {code, msg};
+ }
+ if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&dict)) [[unlikely]] {
Review Comment:
what about TXN_KEY_NOT_FOUND?
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
Review Comment:
suggestion for more readable: rename to columns_or_indexes or add comment
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return {code, msg};
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == "VARIANT" &&
!column_pb.sparse_columns().empty()) {
+ // set parent_id for restore info
+ for (auto& sparse_col: *column_pb.mutable_sparse_columns()) {
+ sparse_col.set_parent_unique_id(column_pb.unique_id());
+ }
+ sparse_columns.Add(column_pb.sparse_columns().begin(),
column_pb.sparse_columns().end());
+ }
+ column_pb.clear_sparse_columns();
+ }
+ auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+ // handle column dict
+ auto original_column_dict_id = dict.current_column_dict_id();
+ auto column_filter = [&](const doris::ColumnPB& col) -> bool {return
col.unique_id() >= 0;};
+ auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_column(),
+ rowset_meta, rowset_meta->tablet_schema().column(),
+ column_filter, column_dict_adder);
+
+ // handle sparse column dict
+ auto sparse_column_dict_adder = [&](int32_t key) {
dict_list->add_sparse_column_dict_key_list(key); };
+ // not filter any
+ auto sparse_column_filter = [&](const doris::ColumnPB& col) -> bool
{return false;};
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ nullptr, rowset_meta, sparse_columns,
+ sparse_column_filter, sparse_column_dict_adder);
+
+ // handle index info dict
+ auto original_index_dict_id = dict.current_index_dict_id();
+ auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool
{return index_pb.index_suffix_name().empty();};
+ auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
+ process_dictionary<doris::TabletIndexPB>(dict, dict.index_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_index(),
+ rowset_meta, rowset_meta->tablet_schema().index(),
+ index_filter, index_dict_adder);
+
+ // Write back modified dictionaries.
+ if (original_index_dict_id != dict.current_index_dict_id()
+ || original_column_dict_id != dict.current_column_dict_id()) {
+ // If dictionary was modified, serialize and save it.
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
Review Comment:
Is it the same as line 208?
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
Review Comment:
suggestion for more readable: rename to column_or_index_dict or add comment
##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf,
doris::TabletSchemaCloudPB* schema)
return buf.to_pb(schema);
}
+// Map item to dictionary key, and add key to rowset meta, if it is a new one,
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+ SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ RowsetMetaCloudPB* rowset_meta,
+ const google::protobuf::RepeatedPtrField<ItemPB>& items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn) {
+ if (items.empty()) {
+ return;
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it != reversed_dict.end()) {
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ } else {
+ // Add new dictionary key-value pair and update
current_xxx_dict_id.
+ int64_t current_dict_id = 0;
+ if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+ current_dict_id = dict.current_column_dict_id() + 1;
+ dict.set_current_column_dict_id(current_dict_id);
+ dict.mutable_column_dict()->emplace(current_dict_id, item);
+ }
+ if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+ current_dict_id = dict.current_index_dict_id() + 1;
+ dict.set_current_index_dict_id(current_dict_id);
+ dict.mutable_index_dict()->emplace(current_dict_id, item);
+ }
+ add_dict_key_fn(current_dict_id);
+ reversed_dict[serialized_key] = current_dict_id;
+ // LOG(INFO) << "Add dict key = " << current_dict_id << " dict
value = " << item.ShortDebugString();
+ }
+ }
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+ const std::string& instance_id,
+ Transaction* txn,
+ RowsetMetaCloudPB* rowset_meta) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // wrtie dict to rowset meta and update dict
+ SchemaCloudDictionary dict;
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
rowset_meta->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << rowset_meta->index_id() << " key=" <<
hex(dict_key) << " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return {code, msg};
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return {code, msg};
+ }
+
+ // collect sparse columns and clear in parent column
+ google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+ for (auto& column_pb :
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+ if (column_pb.type() == "VARIANT" &&
!column_pb.sparse_columns().empty()) {
+ // set parent_id for restore info
+ for (auto& sparse_col: *column_pb.mutable_sparse_columns()) {
+ sparse_col.set_parent_unique_id(column_pb.unique_id());
+ }
+ sparse_columns.Add(column_pb.sparse_columns().begin(),
column_pb.sparse_columns().end());
+ }
+ column_pb.clear_sparse_columns();
+ }
+ auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+ // handle column dict
+ auto original_column_dict_id = dict.current_column_dict_id();
+ auto column_filter = [&](const doris::ColumnPB& col) -> bool {return
col.unique_id() >= 0;};
+ auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_column(),
+ rowset_meta, rowset_meta->tablet_schema().column(),
+ column_filter, column_dict_adder);
+
+ // handle sparse column dict
+ auto sparse_column_dict_adder = [&](int32_t key) {
dict_list->add_sparse_column_dict_key_list(key); };
+ // not filter any
+ auto sparse_column_filter = [&](const doris::ColumnPB& col) -> bool
{return false;};
+ process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+ nullptr, rowset_meta, sparse_columns,
+ sparse_column_filter, sparse_column_dict_adder);
+
+ // handle index info dict
+ auto original_index_dict_id = dict.current_index_dict_id();
+ auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool
{return index_pb.index_suffix_name().empty();};
+ auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
+ process_dictionary<doris::TabletIndexPB>(dict, dict.index_dict(),
+ rowset_meta->mutable_tablet_schema()->mutable_index(),
+ rowset_meta, rowset_meta->tablet_schema().index(),
+ index_filter, index_dict_adder);
+
+ // Write back modified dictionaries.
+ if (original_index_dict_id != dict.current_index_dict_id()
+ || original_column_dict_id != dict.current_column_dict_id()) {
+ // If dictionary was modified, serialize and save it.
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
rowset_meta->index_id()});
+ std::string dict_val;
+ if (!dict.SerializeToString(&dict_val)) {
+ // Handle serialization error.
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "Failed to serialize dictionary for saving, txn_id=" <<
rowset_meta->txn_id();
+ msg = ss.str();
+ return {code, msg};
+ }
+ // Limit the size of dict value
+ if (dict_val.size() > config::schema_dict_kv_size_limit) {
+ code = MetaServiceCode::KV_TXN_COMMIT_ERR;
+ ss << "Failed to write dictionary for saving, txn_id=" <<
rowset_meta->txn_id()
+ << ", reached the limited size threshold of SchemaDictKeyList
" << config::schema_dict_kv_size_limit;
+ msg = ss.str();
+ }
+ // splitting large values (>90*1000) into multiple KVs
+ cloud::put(txn, dict_key, dict_val, 0);
+ LOG(INFO) << "Dictionary saved, key=" << hex(dict_key) << " txn_id="
<< rowset_meta->txn_id()
+ << " Dict size=" << dict.column_dict_size()
+ << ", Current column ID=" << dict.current_column_dict_id()
+ << ", Current index ID=" << dict.current_index_dict_id();
+ }
+ return {code, msg};
+}
+
+std::pair<MetaServiceCode, std::string> read_schema_from_dict(
+ const std::string& instance_id,
+ int64_t index_id,
+ Transaction* txn,
+ google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>*
rowset_metas) {
+ std::string msg;
+ std::stringstream ss;
+ MetaServiceCode code = MetaServiceCode::OK;
+
+ // read dict if any rowset has dict key list
+ SchemaCloudDictionary dict;
+ std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id,
index_id});
+ ValueBuf dict_val;
+ auto err = cloud::get(txn, column_dict_key, &dict_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "internal error, failed to get dict ret=" << err;
+ msg = ss.str();
+ return {code, msg};
+ }
+ if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&dict)) [[unlikely]] {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "failed to parse SchemaCloudDictionary";
+ return {code, msg};
+ }
+ LOG(INFO) << "Get schema_dict, column size=" << dict.column_dict_size()
+ << ", index size=" << dict.index_dict_size();
+
+ auto fill_schema_with_dict = [&](RowsetMetaCloudPB* out) {
+ std::unordered_map<int32_t, ColumnPB*> unique_id_map;
+ //init map
+ for (ColumnPB& column :
*out->mutable_tablet_schema()->mutable_column()) {
+ unique_id_map[column.unique_id()] = &column;
+ }
+ // column info
+ for (size_t i = 0; i <
out->schema_dict_key_list().column_dict_key_list_size(); ++i) {
+ int dict_key = out->schema_dict_key_list().column_dict_key_list(i);
+ const ColumnPB& dict_val = dict.column_dict().at(dict_key);
+ ColumnPB& to_add = *out->mutable_tablet_schema()->add_column();
+ to_add = dict_val;
+ VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString();
+ }
+
+ // index info
+ for (size_t i = 0; i <
out->schema_dict_key_list().index_info_dict_key_list_size(); ++i) {
+ int dict_key =
out->schema_dict_key_list().index_info_dict_key_list(i);
+ const doris::TabletIndexPB& dict_val =
dict.index_dict().at(dict_key);
+ *out->mutable_tablet_schema()->add_index() = dict_val;
Review Comment:
suggest a consistent easy to read style:
```
auto* to_add = out->mutable_tablet_schema()->add_index();
*to_add = dict.index_dict().at(dict_key);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]