This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16e0898dfa8 [refactor](cloud) Add versioned read checking (#54278)
16e0898dfa8 is described below
commit 16e0898dfa876b65979504944f676b11f1d6b83d
Author: walter <[email protected]>
AuthorDate: Thu Aug 7 22:03:30 2025 +0800
[refactor](cloud) Add versioned read checking (#54278)
---
cloud/src/meta-service/meta_service.cpp | 349 +++++++++++++---------
cloud/src/meta-service/meta_service.h | 7 +
cloud/src/meta-service/meta_service_job.cpp | 193 +++++++-----
cloud/src/meta-service/meta_service_partition.cpp | 66 ++--
cloud/src/meta-service/meta_service_txn.cpp | 6 +
cloud/src/meta-service/txn_lazy_committer.cpp | 171 ++++++-----
cloud/test/txn_lazy_commit_test.cpp | 5 +-
gensrc/proto/cloud.proto | 1 +
8 files changed, 474 insertions(+), 324 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index a473d14c31b..9bc8805f9db 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -42,6 +42,7 @@
#include <ostream>
#include <sstream>
#include <string>
+#include <string_view>
#include <tuple>
#include <type_traits>
#include <unordered_map>
@@ -814,12 +815,17 @@ void
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
}
UpdateTabletLogPB update_tablet_log;
bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
for (const TabletMetaInfoPB& tablet_meta_info :
request->tablet_meta_infos()) {
doris::TabletMetaCloudPB tablet_meta;
- internal_get_tablet(code, msg, instance_id, txn.get(),
tablet_meta_info.tablet_id(),
- &tablet_meta, true);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ internal_get_tablet(code, msg, instance_id, txn.get(),
tablet_meta_info.tablet_id(),
+ &tablet_meta, true);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
if (tablet_meta_info.has_is_in_memory()) { // deprecate after 3.0.0
tablet_meta.set_is_in_memory(tablet_meta_info.is_in_memory());
@@ -1711,11 +1717,16 @@ static void fill_schema_from_dict(MetaServiceCode&
code, std::string& msg,
bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string&
msg,
const std::string& instance_id, int64_t tablet_id,
- const std::string& rowset_id, const std::string&
job_id) {
+ const std::string& rowset_id, const std::string& job_id,
+ bool is_versioned_read) {
TabletIndexPB tablet_idx;
- get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
- if (code != MetaServiceCode::OK) {
- return false;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return false;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
@@ -1875,8 +1886,9 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
// Check if the compaction/sc tablet job has finished
if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
!request->tablet_job_id().empty()) {
+ bool is_versioned_read = is_version_read_enabled(instance_id);
if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id,
rowset_id,
- request->tablet_job_id())) {
+ request->tablet_job_id(), is_versioned_read)) {
return;
}
}
@@ -2020,8 +2032,9 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
// Check if the compaction/sc tablet job has finished
if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
!request->tablet_job_id().empty()) {
+ bool is_versioned_read = is_version_read_enabled(instance_id);
if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id,
rowset_id,
- request->tablet_job_id())) {
+ request->tablet_job_id(), is_versioned_read)) {
return;
}
}
@@ -2409,6 +2422,45 @@ static bool try_fetch_and_parse_schema(Transaction* txn,
RowsetMetaCloudPB& rows
return true;
}
+void MetaServiceImpl::get_partition_pending_txn_id(std::string_view
instance_id, int64_t db_id,
+ int64_t table_id, int64_t
partition_id,
+ int64_t tablet_id,
std::stringstream& ss,
+ MetaServiceCode& code,
std::string& msg,
+ int64_t& first_txn_id,
Transaction* txn) {
+ std::string ver_val;
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
+ TxnErrorCode err = txn->get(ver_key, &ver_val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ // No pending txn, return empty
+ first_txn_id = -1;
+ return;
+ } else if (TxnErrorCode::TXN_OK != err) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partiton version, tablet_id=" << tablet_id << "
key=" << hex(ver_key)
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(ver_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse version pb db_id=" << db_id << " table_id=" <<
table_id
+ << " partition_id" << partition_id << " key=" << hex(ver_key);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ if (version_pb.pending_txn_ids_size() > 0) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ first_txn_id = version_pb.pending_txn_ids(0);
+ } else {
+ first_txn_id = -1;
+ }
+}
+
void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
const GetRowsetRequest* request,
GetRowsetResponse* response,
::google::protobuf::Closure* done) {
@@ -2440,6 +2492,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
int64_t req_cc_cnt = request->cumulative_compaction_cnt();
int64_t req_cp = request->cumulative_point();
+ bool is_versioned_read = is_version_read_enabled(instance_id);
do {
TEST_SYNC_POINT_CALLBACK("get_rowset:begin", &tablet_id);
std::unique_ptr<Transaction> txn;
@@ -2457,67 +2510,57 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
};
TabletIndexPB idx;
// Get tablet id index from kv
- get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
DCHECK(request->has_idx());
if (idx.has_db_id()) {
// there is maybe a lazy commit txn when call get_rowset
// we need advance lazy commit txn here
- std::string ver_val;
- std::string ver_key = partition_version_key(
- {instance_id, idx.db_id(), idx.table_id(),
idx.partition_id()});
- err = txn->get(ver_key, &ver_val);
- if (TxnErrorCode::TXN_OK != err && TxnErrorCode::TXN_KEY_NOT_FOUND
!= err) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get partiton version, tablet_id=" << tablet_id
- << " key=" << hex(ver_key) << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- if (TxnErrorCode::TXN_OK == err) {
- VersionPB version_pb;
- if (!version_pb.ParseFromString(ver_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse version pb db_id=" << idx.db_id()
- << " table_id=" << idx.table_id() << " partition_id" <<
idx.partition_id()
- << " key=" << hex(ver_key);
- msg = ss.str();
- LOG(WARNING) << msg;
+ int64_t first_txn_id = -1;
+ if (!is_versioned_read) {
+ get_partition_pending_txn_id(instance_id, idx.db_id(),
idx.table_id(),
+ idx.partition_id(), tablet_id,
ss, code, msg,
+ first_txn_id, txn.get());
+ if (code != MetaServiceCode::OK) {
return;
}
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
+ if (first_txn_id >= 0) {
+ stats.get_bytes += txn->get_bytes();
+ stats.get_counter += txn->num_get_keys();
+ txn.reset();
+
TEST_SYNC_POINT_CALLBACK("get_rowset::advance_last_pending_txn_id",
&first_txn_id);
+ std::shared_ptr<TxnLazyCommitTask> task =
+ txn_lazy_committer_->submit(instance_id, first_txn_id);
- if (version_pb.pending_txn_ids_size() > 0) {
- DCHECK(version_pb.pending_txn_ids_size() == 1);
- stats.get_bytes += txn->get_bytes();
- stats.get_counter += txn->num_get_keys();
- txn.reset();
-
TEST_SYNC_POINT_CALLBACK("get_rowset::advance_last_pending_txn_id",
- &version_pb);
- std::shared_ptr<TxnLazyCommitTask> task =
- txn_lazy_committer_->submit(instance_id,
version_pb.pending_txn_ids(0));
-
- std::tie(code, msg) = task->wait();
- if (code != MetaServiceCode::OK) {
- LOG(WARNING) << "advance_last_txn failed last_txn="
- << version_pb.pending_txn_ids(0) << "
code=" << code
- << " msg=" << msg;
- return;
- }
- continue;
+ std::tie(code, msg) = task->wait();
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "advance_last_txn failed last_txn=" <<
first_txn_id
+ << " code=" << code << " msg=" << msg;
+ return;
}
+ continue;
}
}
// TODO(plat1ko): Judge if tablet has been dropped (in dropped
index/partition)
TabletStatsPB tablet_stat;
- internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx,
tablet_stat, true);
- if (code != MetaServiceCode::OK) return;
+ if (!is_versioned_read) {
+ internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx,
tablet_stat);
+ if (code != MetaServiceCode::OK) return;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" <<
proto_to_json(tablet_stat);
int64_t bc_cnt = tablet_stat.base_compaction_cnt();
@@ -2544,11 +2587,16 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
req_start, req_end);
- for (auto [start, end] : versions) {
- internal_get_rowset(txn.get(), start, end, instance_id, tablet_id,
code, msg, response);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ for (auto [start, end] : versions) {
+ internal_get_rowset(txn.get(), start, end, instance_id,
tablet_id, code, msg,
+ response);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
}
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
// get referenced schema
@@ -2586,8 +2634,12 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
} else {
auto key = meta_schema_key(
{instance_id, idx.index_id(),
rowset_meta.schema_version()});
- if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key,
code, msg)) {
- return;
+ if (!is_versioned_read) {
+ if (!try_fetch_and_parse_schema(txn.get(), rowset_meta,
key, code, msg)) {
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
version_to_schema.emplace(rowset_meta.schema_version(),
rowset_meta.mutable_tablet_schema());
@@ -2780,7 +2832,7 @@ static bool remove_pending_delete_bitmap(MetaServiceCode&
code, std::string& msg
static bool check_partition_version_when_update_delete_bitmap(
MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>&
txn,
std::string& instance_id, int64_t table_id, int64_t partition_id,
int64_t tablet_id,
- int64_t txn_id, int64_t next_visible_version) {
+ int64_t txn_id, int64_t next_visible_version, bool is_versioned_read) {
if (partition_id <= 0) {
LOG(WARNING) << fmt::format(
"invalid partition_id, skip to check partition version.
txn={}, "
@@ -2820,38 +2872,41 @@ static bool
check_partition_version_when_update_delete_bitmap(
txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
return true;
}
- int64_t db_id = index_pb.tablet_index().db_id();
-
- std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
- std::string ver_val;
- err = txn->get(ver_key, &ver_val);
- if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
- code = cast_as<ErrCategory::READ>(err);
- msg = fmt::format("failed to get partition version, txn_id={},
tablet={}, err={}", txn_id,
- tablet_id, err);
- LOG(WARNING) << msg;
- return false;
- }
-
int64_t cur_max_version {-1};
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- cur_max_version = 1;
- } else {
- VersionPB version_pb;
- if (!version_pb.ParseFromString(ver_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = fmt::format("failed to parse version_pb, txn_id={},
tablet={}, key={}", txn_id,
- tablet_id, hex(ver_key));
+ if (!is_versioned_read) {
+ int64_t db_id = index_pb.tablet_index().db_id();
+ std::string ver_key = partition_version_key({instance_id, db_id,
table_id, partition_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition version, txn_id={},
tablet={}, err={}",
+ txn_id, tablet_id, err);
LOG(WARNING) << msg;
return false;
}
- DCHECK(version_pb.has_version());
- cur_max_version = version_pb.version();
- if (version_pb.pending_txn_ids_size() > 0) {
- DCHECK(version_pb.pending_txn_ids_size() == 1);
- cur_max_version += version_pb.pending_txn_ids_size();
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ cur_max_version = 1;
+ } else {
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(ver_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse version_pb, txn_id={},
tablet={}, key={}",
+ txn_id, tablet_id, hex(ver_key));
+ LOG(WARNING) << msg;
+ return false;
+ }
+ DCHECK(version_pb.has_version());
+ cur_max_version = version_pb.version();
+
+ if (version_pb.pending_txn_ids_size() > 0) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ cur_max_version += version_pb.pending_txn_ids_size();
+ }
}
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
if (cur_max_version + 1 != next_visible_version) {
@@ -2980,9 +3035,10 @@ void
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
// 3. check if partition's version matches
if (request->lock_id() > 0 && request->has_txn_id() &&
request->partition_id() &&
request->has_next_visible_version()) {
+ bool is_versioned_read = is_version_read_enabled(instance_id);
if (!check_partition_version_when_update_delete_bitmap(
code, msg, txn, instance_id, table_id,
request->partition_id(), tablet_id,
- request->txn_id(), request->next_visible_version())) {
+ request->txn_id(), request->next_visible_version(),
is_versioned_read)) {
return;
}
}
@@ -3438,10 +3494,15 @@ void
MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
};
TabletIndexPB idx(request->idx());
TabletStatsPB tablet_stat;
- internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx,
tablet_stat,
- true /*snapshot_read*/);
- if (code != MetaServiceCode::OK) {
- return;
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ if (!is_versioned_read) {
+ internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx,
tablet_stat,
+ true /*snapshot_read*/);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
// The requested compaction state and the actual compaction state are
different, which indicates that
// the requested rowsets are expired and their delete bitmap may have
been deleted.
@@ -3531,69 +3592,79 @@ bool
MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::
};
auto table_id = request->table_id();
std::stringstream ss;
+ bool is_versioned_read = is_version_read_enabled(instance_id);
if (!config::enable_batch_get_mow_tablet_stats_and_meta) {
for (const auto& tablet_idx : request->tablet_indexes()) {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
- std::string stats_key =
- stats_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
- tablet_idx.partition_id(),
tablet_idx.tablet_id()});
- std::string stats_val;
- TxnErrorCode err = txn->get(stats_key, &stats_val);
- TEST_SYNC_POINT_CALLBACK(
-
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
- if (err == TxnErrorCode::TXN_TOO_OLD) {
- code = MetaServiceCode::OK;
- err = txn_kv_->create_txn(&txn);
+ if (!is_versioned_read) {
+ std::string stats_key =
+ stats_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_idx.tablet_id()});
+ std::string stats_val;
+ TxnErrorCode err = txn->get(stats_key, &stats_val);
+ TEST_SYNC_POINT_CALLBACK(
+
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
+ if (err == TxnErrorCode::TXN_TOO_OLD) {
+ code = MetaServiceCode::OK;
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "failed to init txn when get tablet stats";
+ msg = ss.str();
+ return false;
+ }
+ err = txn->get(stats_key, &stats_val);
+ }
if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- ss << "failed to init txn when get tablet stats";
- msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet stats, err={}
tablet_id={}", err,
+ tablet_idx.tablet_id());
return false;
}
- err = txn->get(stats_key, &stats_val);
- }
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- msg = fmt::format("failed to get tablet stats, err={}
tablet_id={}", err,
- tablet_idx.tablet_id());
- return false;
- }
- if (!tablet_stat.ParseFromArray(stats_val.data(),
stats_val.size())) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = fmt::format("marformed tablet stats value, key={}",
hex(stats_key));
- return false;
+ if (!tablet_stat.ParseFromArray(stats_val.data(),
stats_val.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("marformed tablet stats value, key={}",
hex(stats_key));
+ return false;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
// 2. get tablet states
- std::string tablet_meta_key =
- meta_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
- tablet_idx.partition_id(),
tablet_idx.tablet_id()});
- std::string tablet_meta_val;
- err = txn->get(tablet_meta_key, &tablet_meta_val);
- if (err != TxnErrorCode::TXN_OK) {
- ss << "failed to get tablet meta"
- << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)"
: "")
- << " instance_id=" << instance_id << " tablet_id=" <<
tablet_idx.tablet_id()
- << " key=" << hex(tablet_meta_key) << " err=" << err;
- msg = ss.str();
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- return false;
- }
doris::TabletMetaCloudPB tablet_meta;
- if (!tablet_meta.ParseFromString(tablet_meta_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = "malformed tablet meta";
- return false;
+ if (!is_versioned_read) {
+ std::string tablet_meta_key =
+ meta_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_idx.tablet_id()});
+ std::string tablet_meta_val;
+ err = txn->get(tablet_meta_key, &tablet_meta_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ ss << "failed to get tablet meta"
+ << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not
found)" : "")
+ << " instance_id=" << instance_id << " tablet_id=" <<
tablet_idx.tablet_id()
+ << " key=" << hex(tablet_meta_key) << " err=" << err;
+ msg = ss.str();
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TABLET_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ return false;
+ }
+ if (!tablet_meta.ParseFromString(tablet_meta_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "malformed tablet meta";
+ return false;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}
- } else {
+ } else if (!is_versioned_read) {
// 1. get compaction cnts
std::vector<std::string> stats_tablet_keys;
for (const auto& tablet_idx : request->tablet_indexes()) {
@@ -3677,6 +3748,8 @@ bool
MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}
DCHECK(request->tablet_indexes_size() ==
response->tablet_states_size());
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
read_stats_sw.pause();
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index e719b368d8a..b5133bf4a6d 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -429,6 +429,13 @@ private:
MetaServiceCode& code, std::string& msg,
const std::string& instance_id, KVStats&
stats);
+ // Get the first pending transaction ID for a partition. If there no any
pending transaction,
+ // `first_txn_id` will be set to -1.
+ void get_partition_pending_txn_id(std::string_view instance_id, int64_t
db_id, int64_t table_id,
+ int64_t partition_id, int64_t tablet_id,
+ std::stringstream& ss, MetaServiceCode&
code,
+ std::string& msg, int64_t& first_txn_id,
Transaction* txn);
+
std::shared_ptr<TxnKv> txn_kv_;
std::shared_ptr<ResourceManager> resource_mgr_;
std::shared_ptr<RateLimiter> rate_limiter_;
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 0fdcce32a26..a64e26e5295 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -90,7 +90,7 @@ bool check_compaction_input_verions(const
TabletCompactionJobPB& compaction,
void start_compaction_job(MetaServiceCode& code, std::string& msg,
std::stringstream& ss,
std::unique_ptr<Transaction>& txn, const
StartTabletJobRequest* request,
StartTabletJobResponse* response, std::string&
instance_id,
- bool& need_commit) {
+ bool& need_commit, bool is_versioned_read) {
auto& compaction = request->job().compaction(0);
if (!compaction.has_id() || compaction.id().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -122,37 +122,42 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
int64_t index_id = request->job().idx().index_id();
int64_t partition_id = request->job().idx().partition_id();
int64_t tablet_id = request->job().idx().tablet_id();
- std::string stats_key =
- stats_tablet_key({instance_id, table_id, index_id, partition_id,
tablet_id});
- std::string stats_val;
- TxnErrorCode err = txn->get(stats_key, &stats_val);
- if (err != TxnErrorCode::TXN_OK) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get kv
error")
- << " when get tablet stats, tablet_id=" << tablet_id << " key=" <<
hex(stats_key)
- << " err=" << err;
- msg = ss.str();
- return;
- }
+
TabletStatsPB stats;
- CHECK(stats.ParseFromString(stats_val));
- if (compaction.base_compaction_cnt() < stats.base_compaction_cnt() ||
- compaction.cumulative_compaction_cnt() <
stats.cumulative_compaction_cnt()) {
- code = MetaServiceCode::STALE_TABLET_CACHE;
- SS << "could not perform compaction on expired tablet cache."
- << " req_base_compaction_cnt=" << compaction.base_compaction_cnt()
- << ", base_compaction_cnt=" << stats.base_compaction_cnt()
- << ", req_cumulative_compaction_cnt=" <<
compaction.cumulative_compaction_cnt()
- << ", cumulative_compaction_cnt=" <<
stats.cumulative_compaction_cnt();
- msg = ss.str();
- return;
+ if (!is_versioned_read) {
+ std::string stats_key =
+ stats_tablet_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ std::string stats_val;
+ TxnErrorCode err = txn->get(stats_key, &stats_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get
kv error")
+ << " when get tablet stats, tablet_id=" << tablet_id << " key="
<< hex(stats_key)
+ << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+ CHECK(stats.ParseFromString(stats_val));
+ if (compaction.base_compaction_cnt() < stats.base_compaction_cnt() ||
+ compaction.cumulative_compaction_cnt() <
stats.cumulative_compaction_cnt()) {
+ code = MetaServiceCode::STALE_TABLET_CACHE;
+ SS << "could not perform compaction on expired tablet cache."
+ << " req_base_compaction_cnt=" <<
compaction.base_compaction_cnt()
+ << ", base_compaction_cnt=" << stats.base_compaction_cnt()
+ << ", req_cumulative_compaction_cnt=" <<
compaction.cumulative_compaction_cnt()
+ << ", cumulative_compaction_cnt=" <<
stats.cumulative_compaction_cnt();
+ msg = ss.str();
+ return;
+ }
+ } else {
+ CHECK(true) << "versioned read is not supported yet";
}
auto job_key = job_tablet_key({instance_id, table_id, index_id,
partition_id, tablet_id});
std::string job_val;
TabletJobInfoPB job_pb;
- err = txn->get(job_key, &job_val);
+ TxnErrorCode err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
SS << "failed to get tablet job, instance_id=" << instance_id << "
tablet_id=" << tablet_id
<< " key=" << hex(job_key) << " err=" << err;
@@ -279,7 +284,7 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
void start_schema_change_job(MetaServiceCode& code, std::string& msg,
std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const StartTabletJobRequest* request,
StartTabletJobResponse* response,
- std::string& instance_id, bool& need_commit) {
+ std::string& instance_id, bool& need_commit, bool
is_versioned_read) {
auto& schema_change = request->job().schema_change();
if (!schema_change.has_id() || schema_change.id().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -311,33 +316,38 @@ void start_schema_change_job(MetaServiceCode& code,
std::string& msg, std::strin
auto& new_tablet_idx =
const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
!new_tablet_idx.has_partition_id()) {
- get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id,
new_tablet_idx);
- if (code != MetaServiceCode::OK) return;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id,
new_tablet_idx);
+ if (code != MetaServiceCode::OK) return;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
}
- MetaTabletKeyInfo new_tablet_key_info {instance_id,
new_tablet_idx.table_id(),
- new_tablet_idx.index_id(),
new_tablet_idx.partition_id(),
- new_tablet_id};
- std::string new_tablet_key;
- std::string new_tablet_val;
doris::TabletMetaCloudPB new_tablet_meta;
- meta_tablet_key(new_tablet_key_info, &new_tablet_key);
- TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
- if (err != TxnErrorCode::TXN_OK) {
- SS << "failed to get new tablet meta"
- << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
- << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
- << " key=" << hex(new_tablet_key) << " err=" << err;
- msg = ss.str();
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- return;
- }
- if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = "malformed tablet meta";
- return;
+ if (!is_versioned_read) {
+ std::string new_tablet_key =
+ meta_tablet_key({instance_id, new_tablet_idx.table_id(),
new_tablet_idx.index_id(),
+ new_tablet_idx.partition_id(),
new_tablet_id});
+ std::string new_tablet_val;
+ TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ SS << "failed to get new tablet meta"
+ << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" :
"")
+ << " instance_id=" << instance_id << " tablet_id=" <<
new_tablet_id
+ << " key=" << hex(new_tablet_key) << " err=" << err;
+ msg = ss.str();
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ return;
+ }
+ if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "malformed tablet meta";
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
-
if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
code = MetaServiceCode::JOB_ALREADY_SUCCESS;
msg = "schema_change job already success";
@@ -353,7 +363,7 @@ void start_schema_change_job(MetaServiceCode& code,
std::string& msg, std::strin
auto job_key = job_tablet_key({instance_id, table_id, index_id,
partition_id, tablet_id});
std::string job_val;
TabletJobInfoPB job_pb;
- err = txn->get(job_key, &job_val);
+ TxnErrorCode err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
SS << "failed to get tablet job, instance_id=" << instance_id << "
tablet_id=" << tablet_id
<< " key=" << hex(job_key) << " err=" << err;
@@ -439,10 +449,15 @@ void
MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* contro
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
+ bool is_versioned_read = is_version_read_enabled(instance_id);
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
- get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
- if (code != MetaServiceCode::OK) return;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
+ if (code != MetaServiceCode::OK) return;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
@@ -465,12 +480,14 @@ void
MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* contro
};
if (!request->job().compaction().empty()) {
- start_compaction_job(code, msg, ss, txn, request, response,
instance_id, need_commit);
+ start_compaction_job(code, msg, ss, txn, request, response,
instance_id, need_commit,
+ is_versioned_read);
return;
}
if (request->job().has_schema_change()) {
- start_schema_change_job(code, msg, ss, txn, request, response,
instance_id, need_commit);
+ start_schema_change_job(code, msg, ss, txn, request, response,
instance_id, need_commit,
+ is_versioned_read);
return;
}
}
@@ -671,7 +688,7 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key,
bool& need_commit,
- std::string& use_version) {
+ std::string& use_version, bool is_versioned_read) {
//==========================================================================
// check
//==========================================================================
@@ -1078,7 +1095,7 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key,
bool& need_commit,
- std::string& use_version) {
+ std::string& use_version, bool
is_versioned_read) {
//==========================================================================
// check
//==========================================================================
@@ -1098,32 +1115,40 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
auto& new_tablet_idx =
const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
!new_tablet_idx.has_partition_id()) {
- get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id,
new_tablet_idx);
- if (code != MetaServiceCode::OK) return;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id,
new_tablet_idx);
+ if (code != MetaServiceCode::OK) return;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
}
int64_t new_table_id = new_tablet_idx.table_id();
int64_t new_index_id = new_tablet_idx.index_id();
int64_t new_partition_id = new_tablet_idx.partition_id();
+ doris::TabletMetaCloudPB new_tablet_meta;
auto new_tablet_key = meta_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id,
new_tablet_id});
std::string new_tablet_val;
- doris::TabletMetaCloudPB new_tablet_meta;
- TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
- if (err != TxnErrorCode::TXN_OK) {
- SS << "failed to get new tablet meta"
- << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
- << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
- << " key=" << hex(new_tablet_key) << " err=" << err;
- msg = ss.str();
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- return;
- }
- if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = "malformed tablet meta";
- return;
+ if (!is_versioned_read) {
+ TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ SS << "failed to get new tablet meta"
+ << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" :
"")
+ << " instance_id=" << instance_id << " tablet_id=" <<
new_tablet_id
+ << " key=" << hex(new_tablet_key) << " err=" << err;
+ msg = ss.str();
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ return;
+ }
+ if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "malformed tablet meta";
+ return;
+ }
+ } else {
+ CHECK(true) << "versioned read is not supported yet";
}
if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
@@ -1184,7 +1209,7 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
std::string new_tablet_job_val;
TabletJobInfoPB new_recorded_job;
- err = txn->get(new_tablet_job_key, &new_tablet_job_val);
+ TxnErrorCode err = txn->get(new_tablet_job_key, &new_tablet_job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
SS << "internal error,"
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
@@ -1492,6 +1517,7 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
return;
}
+ bool is_versioned_read = is_version_read_enabled(instance_id);
for (int retry = 0; retry <= 1; retry++) {
bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
@@ -1510,8 +1536,12 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
- get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
- if (code != MetaServiceCode::OK) return;
+ if (!is_versioned_read) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
+ if (code != MetaServiceCode::OK) return;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
+ }
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
@@ -1550,11 +1580,12 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response,
recorded_job, instance_id,
- job_key, need_commit, use_version);
+ job_key, need_commit, use_version,
is_versioned_read);
} else if (request->job().has_schema_change()) {
// Process schema change commit
process_schema_change_job(code, msg, ss, txn, request, response,
recorded_job,
- instance_id, job_key, need_commit,
use_version);
+ instance_id, job_key, need_commit,
use_version,
+ is_versioned_read);
}
if (!need_commit) return;
diff --git a/cloud/src/meta-service/meta_service_partition.cpp
b/cloud/src/meta-service/meta_service_partition.cpp
index 6985152ca4f..b02ff532c4b 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -61,18 +61,22 @@ using check_create_table_type = std::function<const
std::tuple<
// Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
static TxnErrorCode index_exists(Transaction* txn, const std::string&
instance_id,
- const IndexRequest* req) {
- auto tablet_key = meta_tablet_key({instance_id, req->table_id(),
req->index_ids(0), 0, 0});
- auto tablet_key_end =
- meta_tablet_key({instance_id, req->table_id(), req->index_ids(0),
INT64_MAX, 0});
- std::unique_ptr<RangeGetIterator> it;
-
- TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to get kv").tag("err", err);
- return err;
+ bool is_versioned_read, const IndexRequest*
req) {
+ if (!is_versioned_read) {
+ auto tablet_key = meta_tablet_key({instance_id, req->table_id(),
req->index_ids(0), 0, 0});
+ auto tablet_key_end =
+ meta_tablet_key({instance_id, req->table_id(),
req->index_ids(0), INT64_MAX, 0});
+ std::unique_ptr<RangeGetIterator> it;
+
+ TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get kv").tag("err", err);
+ return err;
+ }
+ return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
- return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
}
static TxnErrorCode check_recycle_key_exist(Transaction* txn, const
std::string& key) {
@@ -106,7 +110,8 @@ void
MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controlle
msg = "failed to create txn";
return;
}
- err = index_exists(txn.get(), instance_id, request);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ err = index_exists(txn.get(), instance_id, is_versioned_read, request);
// If index has existed, this might be a stale request
if (err == TxnErrorCode::TXN_OK) {
code = MetaServiceCode::ALREADY_EXISTED;
@@ -195,12 +200,13 @@ void
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
commit_index_log.set_db_id(request->db_id());
commit_index_log.set_table_id(request->table_id());
+ bool is_versioned_read = is_version_read_enabled(instance_id);
for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
- err = index_exists(txn.get(), instance_id, request);
+ err = index_exists(txn.get(), instance_id, is_versioned_read,
request);
// If index has existed, this might be a duplicate request
if (err == TxnErrorCode::TXN_OK) {
return; // Index committed, OK
@@ -407,19 +413,23 @@ void
MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
// Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
static TxnErrorCode partition_exists(Transaction* txn, const std::string&
instance_id,
- const PartitionRequest* req) {
- auto tablet_key = meta_tablet_key(
- {instance_id, req->table_id(), req->index_ids(0),
req->partition_ids(0), 0});
- auto tablet_key_end = meta_tablet_key(
- {instance_id, req->table_id(), req->index_ids(0),
req->partition_ids(0), INT64_MAX});
- std::unique_ptr<RangeGetIterator> it;
-
- TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to get kv").tag("err", err);
- return err;
+ bool is_versioned_read, const
PartitionRequest* req) {
+ if (!is_versioned_read) {
+ auto tablet_key = meta_tablet_key(
+ {instance_id, req->table_id(), req->index_ids(0),
req->partition_ids(0), 0});
+ auto tablet_key_end = meta_tablet_key({instance_id, req->table_id(),
req->index_ids(0),
+ req->partition_ids(0),
INT64_MAX});
+ std::unique_ptr<RangeGetIterator> it;
+
+ TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get kv").tag("err", err);
+ return err;
+ }
+ return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
- return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
}
void MetaServiceImpl::prepare_partition(::google::protobuf::RpcController*
controller,
@@ -459,7 +469,8 @@ void
MetaServiceImpl::prepare_partition(::google::protobuf::RpcController* contr
msg = "failed to create txn";
return;
}
- err = partition_exists(txn.get(), instance_id, request);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ err = partition_exists(txn.get(), instance_id, is_versioned_read, request);
// If index has existed, this might be a stale request
if (err == TxnErrorCode::TXN_OK) {
code = MetaServiceCode::ALREADY_EXISTED;
@@ -577,6 +588,7 @@ void
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
commit_partition_log.set_table_id(request->table_id());
commit_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
+ bool is_versioned_read = is_version_read_enabled(instance_id);
for (auto part_id : request->partition_ids()) {
auto key = recycle_partition_key({instance_id, part_id});
std::string val;
@@ -584,7 +596,7 @@ void
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
// Compatible with requests without `index_ids`
if (!request->index_ids().empty()) {
- err = partition_exists(txn.get(), instance_id, request);
+ err = partition_exists(txn.get(), instance_id,
is_versioned_read, request);
// If partition has existed, this might be a duplicate request
if (err == TxnErrorCode::TXN_OK) {
return; // Partition committed, OK
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index eeba1f5d4ae..5e3bce80c6e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1293,6 +1293,7 @@ void MetaServiceImpl::commit_txn_immediately(
}
bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
// Save rowset meta
for (auto& i : rowsets) {
@@ -1406,6 +1407,7 @@ void MetaServiceImpl::commit_txn_immediately(
}
txn_info.set_versioned_write(is_versioned_write);
+ txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
@@ -1907,7 +1909,9 @@ void MetaServiceImpl::commit_txn_eventually(
txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
txn_info.set_versioned_write(is_versioned_write);
+ txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_id= " << txn_id
<< " txn_info=" << txn_info.ShortDebugString();
@@ -2447,6 +2451,7 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
}
bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
// Save rowset meta
for (auto& i : rowsets) {
@@ -2555,6 +2560,7 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
txn_info.set_versioned_write(is_versioned_write);
+ txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 5ea2851a014..ae5671bfb4a 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -125,7 +125,7 @@ void convert_tmp_rowsets(
MetaServiceCode& code, std::string& msg, int64_t db_id,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta,
std::map<int64_t, TabletIndexPB>& tablet_ids, bool is_versioned_write,
- Versionstamp versionstamp) {
+ bool is_versioned_read, Versionstamp versionstamp) {
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
@@ -161,55 +161,66 @@ void convert_tmp_rowsets(
}
if (!tablet_ids.contains(tmp_rowset_pb.tablet_id())) {
- std::string tablet_idx_key =
- meta_tablet_idx_key({instance_id,
tmp_rowset_pb.tablet_id()});
- std::string tablet_idx_val;
- err = txn->get(tablet_idx_key, &tablet_idx_val, true);
- if (TxnErrorCode::TXN_OK != err) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- ss << "failed to get tablet idx, txn_id=" << txn_id
- << " key=" << hex(tablet_idx_key) << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
TabletIndexPB tablet_idx_pb;
- if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse tablet idx pb txn_id=" << txn_id
- << " key=" << hex(tablet_idx_key);
- msg = ss.str();
- return;
+ if (!is_versioned_read) {
+ std::string tablet_idx_key =
+ meta_tablet_idx_key({instance_id,
tmp_rowset_pb.tablet_id()});
+ std::string tablet_idx_val;
+ err = txn->get(tablet_idx_key, &tablet_idx_val, true);
+ if (TxnErrorCode::TXN_OK != err) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet idx, txn_id=" << txn_id
+ << " key=" << hex(tablet_idx_key) << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse tablet idx pb txn_id=" << txn_id
+ << " key=" << hex(tablet_idx_key);
+ msg = ss.str();
+ return;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
tablet_ids.emplace(tmp_rowset_pb.tablet_id(), tablet_idx_pb);
}
const TabletIndexPB& tablet_idx_pb =
tablet_ids[tmp_rowset_pb.tablet_id()];
if (!partition_versions.contains(tmp_rowset_pb.partition_id())) {
- std::string ver_val;
- std::string ver_key = partition_version_key(
- {instance_id, db_id, tablet_idx_pb.table_id(),
tmp_rowset_pb.partition_id()});
- err = txn->get(ver_key, &ver_val);
- if (TxnErrorCode::TXN_OK != err) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- ss << "failed to get partiton version, txn_id=" << txn_id << "
key=" << hex(ver_key)
- << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
VersionPB version_pb;
- if (!version_pb.ParseFromString(ver_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse version pb txn_id=" << txn_id << "
key=" << hex(ver_key);
- msg = ss.str();
- return;
+ if (!is_versioned_read) {
+ std::string ver_val;
+ std::string ver_key =
+ partition_version_key({instance_id, db_id,
tablet_idx_pb.table_id(),
+ tmp_rowset_pb.partition_id()});
+ err = txn->get(ver_key, &ver_val);
+ if (TxnErrorCode::TXN_OK != err) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partiton version, txn_id=" << txn_id
+ << " key=" << hex(ver_key) << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ if (!version_pb.ParseFromString(ver_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse version pb txn_id=" << txn_id << "
key=" << hex(ver_key);
+ msg = ss.str();
+ return;
+ }
+ LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
+ << " version_pb:" << version_pb.ShortDebugString();
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
- LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
- << " version_pb:" << version_pb.ShortDebugString();
partition_versions.emplace(tmp_rowset_pb.partition_id(),
version_pb);
DCHECK_EQ(partition_versions.size(), 1) <<
partition_versions.size();
}
@@ -222,21 +233,24 @@ void convert_tmp_rowsets(
std::string rowset_key = meta_rowset_key({instance_id,
tmp_rowset_pb.tablet_id(), version});
std::string rowset_val;
- err = txn->get(rowset_key, &rowset_val);
- if (TxnErrorCode::TXN_OK == err) {
- // tmp rowset key has been converted
- continue;
- }
-
- if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get rowset_key, txn_id=" << txn_id << " key=" <<
hex(rowset_key)
- << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
+ if (!is_versioned_read) {
+ err = txn->get(rowset_key, &rowset_val);
+ if (TxnErrorCode::TXN_OK == err) {
+ // tmp rowset key has been converted
+ continue;
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get rowset_key, txn_id=" << txn_id << " key="
<< hex(rowset_key)
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
+ } else {
+ CHECK(true) << "versioned read is not supported yet";
}
- DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
tmp_rowset_pb.set_start_version(version);
tmp_rowset_pb.set_end_version(version);
@@ -456,6 +470,7 @@ void TxnLazyCommitTask::commit() {
}
bool is_versioned_write = txn_info.versioned_write();
+ bool is_versioned_read = txn_info.versioned_read();
std::stringstream ss;
int retry_times = 0;
@@ -525,7 +540,7 @@ void TxnLazyCommitTask::commit() {
tmp_rowset_metas.begin() + end);
convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code_,
msg_, db_id,
sub_partition_tmp_rowset_metas,
tablet_ids,
- is_versioned_write, versionstamp);
+ is_versioned_write, is_versioned_read,
versionstamp);
if (code_ != MetaServiceCode::OK) break;
}
if (code_ != MetaServiceCode::OK) break;
@@ -546,7 +561,7 @@ void TxnLazyCommitTask::commit() {
if (tablet_ids.size() > 0) {
// get table_id from memory cache
table_id = tablet_ids.begin()->second.table_id();
- } else {
+ } else if (!is_versioned_read) {
// get table_id from storage
int64_t first_tablet_id =
tmp_rowset_metas.begin()->second.tablet_id();
std::string tablet_idx_key =
@@ -573,33 +588,39 @@ void TxnLazyCommitTask::commit() {
break;
}
table_id = tablet_idx_pb.table_id();
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
}
DCHECK(table_id > 0);
DCHECK(partition_id > 0);
+ VersionPB version_pb;
std::string ver_val;
std::string ver_key =
partition_version_key({instance_id_, db_id, table_id,
partition_id});
- err = txn->get(ver_key, &ver_val);
- if (TxnErrorCode::TXN_OK != err) {
- code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
- ? MetaServiceCode::TXN_ID_NOT_FOUND
- : cast_as<ErrCategory::READ>(err);
- ss << "failed to get partiton version, txn_id=" << txn_id_
- << " key=" << hex(ver_key) << " err=" << err;
- msg_ = ss.str();
- LOG(WARNING) << msg_;
- break;
- }
- VersionPB version_pb;
- if (!version_pb.ParseFromString(ver_val)) {
- code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse version pb txn_id=" << txn_id_
- << " key=" << hex(ver_key);
- msg_ = ss.str();
- break;
+ if (!is_versioned_read) {
+ err = txn->get(ver_key, &ver_val);
+ if (TxnErrorCode::TXN_OK != err) {
+ code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partiton version, txn_id=" <<
txn_id_
+ << " key=" << hex(ver_key) << " err=" << err;
+ msg_ = ss.str();
+ LOG(WARNING) << msg_;
+ break;
+ }
+ if (!version_pb.ParseFromString(ver_val)) {
+ code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse version pb txn_id=" << txn_id_
+ << " key=" << hex(ver_key);
+ msg_ = ss.str();
+ break;
+ }
+ } else {
+ CHECK(false) << "versioned read is not supported yet";
}
if (version_pb.pending_txn_ids_size() > 0 &&
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 8f2d6ff06c2..328bedae7dc 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -1575,9 +1575,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
std::unique_lock<std::mutex> _lock(go_mutex);
last_pending_txn_id_count++;
if (last_pending_txn_id_count == 1) {
- auto version_pb = *try_any_cast<VersionPB*>(args[0]);
- ASSERT_EQ(version_pb.pending_txn_ids(0), eventually_txn_id);
- ASSERT_GT(version_pb.pending_txn_ids(0), 0);
+ auto txn_id = *try_any_cast<int64_t*>(args[0]);
+ ASSERT_EQ(txn_id, eventually_txn_id);
}
});
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 8d627f67cde..1cb4018e70a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -442,6 +442,7 @@ message TxnInfoPB {
repeated int64 sub_txn_ids = 18;
// TODO: There are more fields TBD
optional bool versioned_write = 19; // versioned write, don't need to
write RecycleTxnPB again
+ optional bool versioned_read = 20; // whether to read versioned keys
}
// For check txn conflict and txn timeout
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]