This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e0898dfa8 [refactor](cloud) Add versioned read checking (#54278)
16e0898dfa8 is described below

commit 16e0898dfa876b65979504944f676b11f1d6b83d
Author: walter <[email protected]>
AuthorDate: Thu Aug 7 22:03:30 2025 +0800

    [refactor](cloud) Add versioned read checking (#54278)
---
 cloud/src/meta-service/meta_service.cpp           | 349 +++++++++++++---------
 cloud/src/meta-service/meta_service.h             |   7 +
 cloud/src/meta-service/meta_service_job.cpp       | 193 +++++++-----
 cloud/src/meta-service/meta_service_partition.cpp |  66 ++--
 cloud/src/meta-service/meta_service_txn.cpp       |   6 +
 cloud/src/meta-service/txn_lazy_committer.cpp     | 171 ++++++-----
 cloud/test/txn_lazy_commit_test.cpp               |   5 +-
 gensrc/proto/cloud.proto                          |   1 +
 8 files changed, 474 insertions(+), 324 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index a473d14c31b..9bc8805f9db 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -42,6 +42,7 @@
 #include <ostream>
 #include <sstream>
 #include <string>
+#include <string_view>
 #include <tuple>
 #include <type_traits>
 #include <unordered_map>
@@ -814,12 +815,17 @@ void 
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
     }
     UpdateTabletLogPB update_tablet_log;
     bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     for (const TabletMetaInfoPB& tablet_meta_info : 
request->tablet_meta_infos()) {
         doris::TabletMetaCloudPB tablet_meta;
-        internal_get_tablet(code, msg, instance_id, txn.get(), 
tablet_meta_info.tablet_id(),
-                            &tablet_meta, true);
-        if (code != MetaServiceCode::OK) {
-            return;
+        if (!is_versioned_read) {
+            internal_get_tablet(code, msg, instance_id, txn.get(), 
tablet_meta_info.tablet_id(),
+                                &tablet_meta, true);
+            if (code != MetaServiceCode::OK) {
+                return;
+            }
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
         }
         if (tablet_meta_info.has_is_in_memory()) { // deprecate after 3.0.0
             tablet_meta.set_is_in_memory(tablet_meta_info.is_in_memory());
@@ -1711,11 +1717,16 @@ static void fill_schema_from_dict(MetaServiceCode& 
code, std::string& msg,
 
 bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& 
msg,
                        const std::string& instance_id, int64_t tablet_id,
-                       const std::string& rowset_id, const std::string& 
job_id) {
+                       const std::string& rowset_id, const std::string& job_id,
+                       bool is_versioned_read) {
     TabletIndexPB tablet_idx;
-    get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
-    if (code != MetaServiceCode::OK) {
-        return false;
+    if (!is_versioned_read) {
+        get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
+        if (code != MetaServiceCode::OK) {
+            return false;
+        }
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
 
     std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
@@ -1875,8 +1886,9 @@ void 
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
     // Check if the compaction/sc tablet job has finished
     if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
         !request->tablet_job_id().empty()) {
+        bool is_versioned_read = is_version_read_enabled(instance_id);
         if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, 
rowset_id,
-                               request->tablet_job_id())) {
+                               request->tablet_job_id(), is_versioned_read)) {
             return;
         }
     }
@@ -2020,8 +2032,9 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
     // Check if the compaction/sc tablet job has finished
     if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
         !request->tablet_job_id().empty()) {
+        bool is_versioned_read = is_version_read_enabled(instance_id);
         if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, 
rowset_id,
-                               request->tablet_job_id())) {
+                               request->tablet_job_id(), is_versioned_read)) {
             return;
         }
     }
@@ -2409,6 +2422,45 @@ static bool try_fetch_and_parse_schema(Transaction* txn, 
RowsetMetaCloudPB& rows
     return true;
 }
 
+void MetaServiceImpl::get_partition_pending_txn_id(std::string_view 
instance_id, int64_t db_id,
+                                                   int64_t table_id, int64_t 
partition_id,
+                                                   int64_t tablet_id, 
std::stringstream& ss,
+                                                   MetaServiceCode& code, 
std::string& msg,
+                                                   int64_t& first_txn_id, 
Transaction* txn) {
+    std::string ver_val;
+    std::string ver_key = partition_version_key({instance_id, db_id, table_id, 
partition_id});
+    TxnErrorCode err = txn->get(ver_key, &ver_val);
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        // No pending txn, return empty
+        first_txn_id = -1;
+        return;
+    } else if (TxnErrorCode::TXN_OK != err) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get partiton version, tablet_id=" << tablet_id << " 
key=" << hex(ver_key)
+           << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    VersionPB version_pb;
+    if (!version_pb.ParseFromString(ver_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse version pb db_id=" << db_id << " table_id=" << 
table_id
+           << " partition_id" << partition_id << " key=" << hex(ver_key);
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (version_pb.pending_txn_ids_size() > 0) {
+        DCHECK(version_pb.pending_txn_ids_size() == 1);
+        first_txn_id = version_pb.pending_txn_ids(0);
+    } else {
+        first_txn_id = -1;
+    }
+}
+
 void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
                                  const GetRowsetRequest* request, 
GetRowsetResponse* response,
                                  ::google::protobuf::Closure* done) {
@@ -2440,6 +2492,7 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
     int64_t req_cc_cnt = request->cumulative_compaction_cnt();
     int64_t req_cp = request->cumulative_point();
 
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     do {
         TEST_SYNC_POINT_CALLBACK("get_rowset:begin", &tablet_id);
         std::unique_ptr<Transaction> txn;
@@ -2457,67 +2510,57 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
         };
         TabletIndexPB idx;
         // Get tablet id index from kv
-        get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
-        if (code != MetaServiceCode::OK) {
-            return;
+        if (!is_versioned_read) {
+            get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
+            if (code != MetaServiceCode::OK) {
+                return;
+            }
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
         }
         DCHECK(request->has_idx());
 
         if (idx.has_db_id()) {
             // there is maybe a lazy commit txn when call get_rowset
             // we need advance lazy commit txn here
-            std::string ver_val;
-            std::string ver_key = partition_version_key(
-                    {instance_id, idx.db_id(), idx.table_id(), 
idx.partition_id()});
-            err = txn->get(ver_key, &ver_val);
-            if (TxnErrorCode::TXN_OK != err && TxnErrorCode::TXN_KEY_NOT_FOUND 
!= err) {
-                code = cast_as<ErrCategory::READ>(err);
-                ss << "failed to get partiton version, tablet_id=" << tablet_id
-                   << " key=" << hex(ver_key) << " err=" << err;
-                msg = ss.str();
-                LOG(WARNING) << msg;
-                return;
-            }
-
-            if (TxnErrorCode::TXN_OK == err) {
-                VersionPB version_pb;
-                if (!version_pb.ParseFromString(ver_val)) {
-                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                    ss << "failed to parse version pb db_id=" << idx.db_id()
-                       << " table_id=" << idx.table_id() << " partition_id" << 
idx.partition_id()
-                       << " key=" << hex(ver_key);
-                    msg = ss.str();
-                    LOG(WARNING) << msg;
+            int64_t first_txn_id = -1;
+            if (!is_versioned_read) {
+                get_partition_pending_txn_id(instance_id, idx.db_id(), 
idx.table_id(),
+                                             idx.partition_id(), tablet_id, 
ss, code, msg,
+                                             first_txn_id, txn.get());
+                if (code != MetaServiceCode::OK) {
                     return;
                 }
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
+            }
+            if (first_txn_id >= 0) {
+                stats.get_bytes += txn->get_bytes();
+                stats.get_counter += txn->num_get_keys();
+                txn.reset();
+                
TEST_SYNC_POINT_CALLBACK("get_rowset::advance_last_pending_txn_id", 
&first_txn_id);
+                std::shared_ptr<TxnLazyCommitTask> task =
+                        txn_lazy_committer_->submit(instance_id, first_txn_id);
 
-                if (version_pb.pending_txn_ids_size() > 0) {
-                    DCHECK(version_pb.pending_txn_ids_size() == 1);
-                    stats.get_bytes += txn->get_bytes();
-                    stats.get_counter += txn->num_get_keys();
-                    txn.reset();
-                    
TEST_SYNC_POINT_CALLBACK("get_rowset::advance_last_pending_txn_id",
-                                             &version_pb);
-                    std::shared_ptr<TxnLazyCommitTask> task =
-                            txn_lazy_committer_->submit(instance_id, 
version_pb.pending_txn_ids(0));
-
-                    std::tie(code, msg) = task->wait();
-                    if (code != MetaServiceCode::OK) {
-                        LOG(WARNING) << "advance_last_txn failed last_txn="
-                                     << version_pb.pending_txn_ids(0) << " 
code=" << code
-                                     << " msg=" << msg;
-                        return;
-                    }
-                    continue;
+                std::tie(code, msg) = task->wait();
+                if (code != MetaServiceCode::OK) {
+                    LOG(WARNING) << "advance_last_txn failed last_txn=" << 
first_txn_id
+                                 << " code=" << code << " msg=" << msg;
+                    return;
                 }
+                continue;
             }
         }
 
         // TODO(plat1ko): Judge if tablet has been dropped (in dropped 
index/partition)
 
         TabletStatsPB tablet_stat;
-        internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat, true);
-        if (code != MetaServiceCode::OK) return;
+        if (!is_versioned_read) {
+            internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat);
+            if (code != MetaServiceCode::OK) return;
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
+        }
         VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << 
proto_to_json(tablet_stat);
 
         int64_t bc_cnt = tablet_stat.base_compaction_cnt();
@@ -2544,11 +2587,16 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
         }
         auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
                                            req_start, req_end);
-        for (auto [start, end] : versions) {
-            internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, 
code, msg, response);
-            if (code != MetaServiceCode::OK) {
-                return;
+        if (!is_versioned_read) {
+            for (auto [start, end] : versions) {
+                internal_get_rowset(txn.get(), start, end, instance_id, 
tablet_id, code, msg,
+                                    response);
+                if (code != MetaServiceCode::OK) {
+                    return;
+                }
             }
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
         }
 
         // get referenced schema
@@ -2586,8 +2634,12 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
             } else {
                 auto key = meta_schema_key(
                         {instance_id, idx.index_id(), 
rowset_meta.schema_version()});
-                if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, 
code, msg)) {
-                    return;
+                if (!is_versioned_read) {
+                    if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, 
key, code, msg)) {
+                        return;
+                    }
+                } else {
+                    CHECK(false) << "versioned read is not supported yet";
                 }
                 version_to_schema.emplace(rowset_meta.schema_version(),
                                           rowset_meta.mutable_tablet_schema());
@@ -2780,7 +2832,7 @@ static bool remove_pending_delete_bitmap(MetaServiceCode& 
code, std::string& msg
 static bool check_partition_version_when_update_delete_bitmap(
         MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>& 
txn,
         std::string& instance_id, int64_t table_id, int64_t partition_id, 
int64_t tablet_id,
-        int64_t txn_id, int64_t next_visible_version) {
+        int64_t txn_id, int64_t next_visible_version, bool is_versioned_read) {
     if (partition_id <= 0) {
         LOG(WARNING) << fmt::format(
                 "invalid partition_id, skip to check partition version. 
txn={}, "
@@ -2820,38 +2872,41 @@ static bool 
check_partition_version_when_update_delete_bitmap(
                 txn_id, table_id, partition_id, tablet_id, 
proto_to_json(index_pb));
         return true;
     }
-    int64_t db_id = index_pb.tablet_index().db_id();
-
-    std::string ver_key = partition_version_key({instance_id, db_id, table_id, 
partition_id});
-    std::string ver_val;
-    err = txn->get(ver_key, &ver_val);
-    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
-        code = cast_as<ErrCategory::READ>(err);
-        msg = fmt::format("failed to get partition version, txn_id={}, 
tablet={}, err={}", txn_id,
-                          tablet_id, err);
-        LOG(WARNING) << msg;
-        return false;
-    }
-
     int64_t cur_max_version {-1};
-    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-        cur_max_version = 1;
-    } else {
-        VersionPB version_pb;
-        if (!version_pb.ParseFromString(ver_val)) {
-            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            msg = fmt::format("failed to parse version_pb, txn_id={}, 
tablet={}, key={}", txn_id,
-                              tablet_id, hex(ver_key));
+    if (!is_versioned_read) {
+        int64_t db_id = index_pb.tablet_index().db_id();
+        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
+        std::string ver_val;
+        err = txn->get(ver_key, &ver_val);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get partition version, txn_id={}, 
tablet={}, err={}",
+                              txn_id, tablet_id, err);
             LOG(WARNING) << msg;
             return false;
         }
-        DCHECK(version_pb.has_version());
-        cur_max_version = version_pb.version();
 
-        if (version_pb.pending_txn_ids_size() > 0) {
-            DCHECK(version_pb.pending_txn_ids_size() == 1);
-            cur_max_version += version_pb.pending_txn_ids_size();
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            cur_max_version = 1;
+        } else {
+            VersionPB version_pb;
+            if (!version_pb.ParseFromString(ver_val)) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                msg = fmt::format("failed to parse version_pb, txn_id={}, 
tablet={}, key={}",
+                                  txn_id, tablet_id, hex(ver_key));
+                LOG(WARNING) << msg;
+                return false;
+            }
+            DCHECK(version_pb.has_version());
+            cur_max_version = version_pb.version();
+
+            if (version_pb.pending_txn_ids_size() > 0) {
+                DCHECK(version_pb.pending_txn_ids_size() == 1);
+                cur_max_version += version_pb.pending_txn_ids_size();
+            }
         }
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
 
     if (cur_max_version + 1 != next_visible_version) {
@@ -2980,9 +3035,10 @@ void 
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
     // 3. check if partition's version matches
     if (request->lock_id() > 0 && request->has_txn_id() && 
request->partition_id() &&
         request->has_next_visible_version()) {
+        bool is_versioned_read = is_version_read_enabled(instance_id);
         if (!check_partition_version_when_update_delete_bitmap(
                     code, msg, txn, instance_id, table_id, 
request->partition_id(), tablet_id,
-                    request->txn_id(), request->next_visible_version())) {
+                    request->txn_id(), request->next_visible_version(), 
is_versioned_read)) {
             return;
         }
     }
@@ -3438,10 +3494,15 @@ void 
MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
         };
         TabletIndexPB idx(request->idx());
         TabletStatsPB tablet_stat;
-        internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat,
-                                  true /*snapshot_read*/);
-        if (code != MetaServiceCode::OK) {
-            return;
+        bool is_versioned_read = is_version_read_enabled(instance_id);
+        if (!is_versioned_read) {
+            internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat,
+                                      true /*snapshot_read*/);
+            if (code != MetaServiceCode::OK) {
+                return;
+            }
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
         }
         // The requested compaction state and the actual compaction state are 
different, which indicates that
         // the requested rowsets are expired and their delete bitmap may have 
been deleted.
@@ -3531,69 +3592,79 @@ bool 
MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::
     };
     auto table_id = request->table_id();
     std::stringstream ss;
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     if (!config::enable_batch_get_mow_tablet_stats_and_meta) {
         for (const auto& tablet_idx : request->tablet_indexes()) {
             // 1. get compaction cnts
             TabletStatsPB tablet_stat;
-            std::string stats_key =
-                    stats_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
-                                      tablet_idx.partition_id(), 
tablet_idx.tablet_id()});
-            std::string stats_val;
-            TxnErrorCode err = txn->get(stats_key, &stats_val);
-            TEST_SYNC_POINT_CALLBACK(
-                    
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
-            if (err == TxnErrorCode::TXN_TOO_OLD) {
-                code = MetaServiceCode::OK;
-                err = txn_kv_->create_txn(&txn);
+            if (!is_versioned_read) {
+                std::string stats_key =
+                        stats_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                          tablet_idx.partition_id(), 
tablet_idx.tablet_id()});
+                std::string stats_val;
+                TxnErrorCode err = txn->get(stats_key, &stats_val);
+                TEST_SYNC_POINT_CALLBACK(
+                        
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
+                if (err == TxnErrorCode::TXN_TOO_OLD) {
+                    code = MetaServiceCode::OK;
+                    err = txn_kv_->create_txn(&txn);
+                    if (err != TxnErrorCode::TXN_OK) {
+                        code = cast_as<ErrCategory::CREATE>(err);
+                        ss << "failed to init txn when get tablet stats";
+                        msg = ss.str();
+                        return false;
+                    }
+                    err = txn->get(stats_key, &stats_val);
+                }
                 if (err != TxnErrorCode::TXN_OK) {
-                    code = cast_as<ErrCategory::CREATE>(err);
-                    ss << "failed to init txn when get tablet stats";
-                    msg = ss.str();
+                    code = cast_as<ErrCategory::READ>(err);
+                    msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
+                                      tablet_idx.tablet_id());
                     return false;
                 }
-                err = txn->get(stats_key, &stats_val);
-            }
-            if (err != TxnErrorCode::TXN_OK) {
-                code = cast_as<ErrCategory::READ>(err);
-                msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
-                                  tablet_idx.tablet_id());
-                return false;
-            }
-            if (!tablet_stat.ParseFromArray(stats_val.data(), 
stats_val.size())) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = fmt::format("marformed tablet stats value, key={}", 
hex(stats_key));
-                return false;
+                if (!tablet_stat.ParseFromArray(stats_val.data(), 
stats_val.size())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    msg = fmt::format("marformed tablet stats value, key={}", 
hex(stats_key));
+                    return false;
+                }
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
             }
             
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
             
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
             response->add_cumulative_points(tablet_stat.cumulative_point());
 
             // 2. get tablet states
-            std::string tablet_meta_key =
-                    meta_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
-                                     tablet_idx.partition_id(), 
tablet_idx.tablet_id()});
-            std::string tablet_meta_val;
-            err = txn->get(tablet_meta_key, &tablet_meta_val);
-            if (err != TxnErrorCode::TXN_OK) {
-                ss << "failed to get tablet meta"
-                   << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" 
: "")
-                   << " instance_id=" << instance_id << " tablet_id=" << 
tablet_idx.tablet_id()
-                   << " key=" << hex(tablet_meta_key) << " err=" << err;
-                msg = ss.str();
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                return false;
-            }
             doris::TabletMetaCloudPB tablet_meta;
-            if (!tablet_meta.ParseFromString(tablet_meta_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = "malformed tablet meta";
-                return false;
+            if (!is_versioned_read) {
+                std::string tablet_meta_key =
+                        meta_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                         tablet_idx.partition_id(), 
tablet_idx.tablet_id()});
+                std::string tablet_meta_val;
+                err = txn->get(tablet_meta_key, &tablet_meta_val);
+                if (err != TxnErrorCode::TXN_OK) {
+                    ss << "failed to get tablet meta"
+                       << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not 
found)" : "")
+                       << " instance_id=" << instance_id << " tablet_id=" << 
tablet_idx.tablet_id()
+                       << " key=" << hex(tablet_meta_key) << " err=" << err;
+                    msg = ss.str();
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TABLET_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    return false;
+                }
+                if (!tablet_meta.ParseFromString(tablet_meta_val)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    msg = "malformed tablet meta";
+                    return false;
+                }
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
             }
             response->add_tablet_states(
                     
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
         }
-    } else {
+    } else if (!is_versioned_read) {
         // 1. get compaction cnts
         std::vector<std::string> stats_tablet_keys;
         for (const auto& tablet_idx : request->tablet_indexes()) {
@@ -3677,6 +3748,8 @@ bool 
MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::
                     
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
         }
         DCHECK(request->tablet_indexes_size() == 
response->tablet_states_size());
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
 
     read_stats_sw.pause();
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index e719b368d8a..b5133bf4a6d 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -429,6 +429,13 @@ private:
                                  MetaServiceCode& code, std::string& msg,
                                  const std::string& instance_id, KVStats& 
stats);
 
+    // Get the first pending transaction ID for a partition. If there no any 
pending transaction,
+    // `first_txn_id` will be set to -1.
+    void get_partition_pending_txn_id(std::string_view instance_id, int64_t 
db_id, int64_t table_id,
+                                      int64_t partition_id, int64_t tablet_id,
+                                      std::stringstream& ss, MetaServiceCode& 
code,
+                                      std::string& msg, int64_t& first_txn_id, 
Transaction* txn);
+
     std::shared_ptr<TxnKv> txn_kv_;
     std::shared_ptr<ResourceManager> resource_mgr_;
     std::shared_ptr<RateLimiter> rate_limiter_;
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 0fdcce32a26..a64e26e5295 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -90,7 +90,7 @@ bool check_compaction_input_verions(const 
TabletCompactionJobPB& compaction,
 void start_compaction_job(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
                           std::unique_ptr<Transaction>& txn, const 
StartTabletJobRequest* request,
                           StartTabletJobResponse* response, std::string& 
instance_id,
-                          bool& need_commit) {
+                          bool& need_commit, bool is_versioned_read) {
     auto& compaction = request->job().compaction(0);
     if (!compaction.has_id() || compaction.id().empty()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
@@ -122,37 +122,42 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
     int64_t index_id = request->job().idx().index_id();
     int64_t partition_id = request->job().idx().partition_id();
     int64_t tablet_id = request->job().idx().tablet_id();
-    std::string stats_key =
-            stats_tablet_key({instance_id, table_id, index_id, partition_id, 
tablet_id});
-    std::string stats_val;
-    TxnErrorCode err = txn->get(stats_key, &stats_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
-                                                      : 
cast_as<ErrCategory::READ>(err);
-        SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get kv 
error")
-           << " when get tablet stats, tablet_id=" << tablet_id << " key=" << 
hex(stats_key)
-           << " err=" << err;
-        msg = ss.str();
-        return;
-    }
+
     TabletStatsPB stats;
-    CHECK(stats.ParseFromString(stats_val));
-    if (compaction.base_compaction_cnt() < stats.base_compaction_cnt() ||
-        compaction.cumulative_compaction_cnt() < 
stats.cumulative_compaction_cnt()) {
-        code = MetaServiceCode::STALE_TABLET_CACHE;
-        SS << "could not perform compaction on expired tablet cache."
-           << " req_base_compaction_cnt=" << compaction.base_compaction_cnt()
-           << ", base_compaction_cnt=" << stats.base_compaction_cnt()
-           << ", req_cumulative_compaction_cnt=" << 
compaction.cumulative_compaction_cnt()
-           << ", cumulative_compaction_cnt=" << 
stats.cumulative_compaction_cnt();
-        msg = ss.str();
-        return;
+    if (!is_versioned_read) {
+        std::string stats_key =
+                stats_tablet_key({instance_id, table_id, index_id, 
partition_id, tablet_id});
+        std::string stats_val;
+        TxnErrorCode err = txn->get(stats_key, &stats_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get 
kv error")
+               << " when get tablet stats, tablet_id=" << tablet_id << " key=" 
<< hex(stats_key)
+               << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+        CHECK(stats.ParseFromString(stats_val));
+        if (compaction.base_compaction_cnt() < stats.base_compaction_cnt() ||
+            compaction.cumulative_compaction_cnt() < 
stats.cumulative_compaction_cnt()) {
+            code = MetaServiceCode::STALE_TABLET_CACHE;
+            SS << "could not perform compaction on expired tablet cache."
+               << " req_base_compaction_cnt=" << 
compaction.base_compaction_cnt()
+               << ", base_compaction_cnt=" << stats.base_compaction_cnt()
+               << ", req_cumulative_compaction_cnt=" << 
compaction.cumulative_compaction_cnt()
+               << ", cumulative_compaction_cnt=" << 
stats.cumulative_compaction_cnt();
+            msg = ss.str();
+            return;
+        }
+    } else {
+        CHECK(true) << "versioned read is not supported yet";
     }
 
     auto job_key = job_tablet_key({instance_id, table_id, index_id, 
partition_id, tablet_id});
     std::string job_val;
     TabletJobInfoPB job_pb;
-    err = txn->get(job_key, &job_val);
+    TxnErrorCode err = txn->get(job_key, &job_val);
     if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
         SS << "failed to get tablet job, instance_id=" << instance_id << " 
tablet_id=" << tablet_id
            << " key=" << hex(job_key) << " err=" << err;
@@ -279,7 +284,7 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
 void start_schema_change_job(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
                              std::unique_ptr<Transaction>& txn,
                              const StartTabletJobRequest* request, 
StartTabletJobResponse* response,
-                             std::string& instance_id, bool& need_commit) {
+                             std::string& instance_id, bool& need_commit, bool 
is_versioned_read) {
     auto& schema_change = request->job().schema_change();
     if (!schema_change.has_id() || schema_change.id().empty()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
@@ -311,33 +316,38 @@ void start_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::strin
     auto& new_tablet_idx = 
const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
     if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
         !new_tablet_idx.has_partition_id()) {
-        get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, 
new_tablet_idx);
-        if (code != MetaServiceCode::OK) return;
+        if (!is_versioned_read) {
+            get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, 
new_tablet_idx);
+            if (code != MetaServiceCode::OK) return;
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
+        }
     }
-    MetaTabletKeyInfo new_tablet_key_info {instance_id, 
new_tablet_idx.table_id(),
-                                           new_tablet_idx.index_id(), 
new_tablet_idx.partition_id(),
-                                           new_tablet_id};
-    std::string new_tablet_key;
-    std::string new_tablet_val;
     doris::TabletMetaCloudPB new_tablet_meta;
-    meta_tablet_key(new_tablet_key_info, &new_tablet_key);
-    TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        SS << "failed to get new tablet meta"
-           << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
-           << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
-           << " key=" << hex(new_tablet_key) << " err=" << err;
-        msg = ss.str();
-        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
-                                                      : 
cast_as<ErrCategory::READ>(err);
-        return;
-    }
-    if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
-        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-        msg = "malformed tablet meta";
-        return;
+    if (!is_versioned_read) {
+        std::string new_tablet_key =
+                meta_tablet_key({instance_id, new_tablet_idx.table_id(), 
new_tablet_idx.index_id(),
+                                 new_tablet_idx.partition_id(), 
new_tablet_id});
+        std::string new_tablet_val;
+        TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            SS << "failed to get new tablet meta"
+               << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : 
"")
+               << " instance_id=" << instance_id << " tablet_id=" << 
new_tablet_id
+               << " key=" << hex(new_tablet_key) << " err=" << err;
+            msg = ss.str();
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            return;
+        }
+        if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = "malformed tablet meta";
+            return;
+        }
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
-
     if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
         code = MetaServiceCode::JOB_ALREADY_SUCCESS;
         msg = "schema_change job already success";
@@ -353,7 +363,7 @@ void start_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::strin
     auto job_key = job_tablet_key({instance_id, table_id, index_id, 
partition_id, tablet_id});
     std::string job_val;
     TabletJobInfoPB job_pb;
-    err = txn->get(job_key, &job_val);
+    TxnErrorCode err = txn->get(job_key, &job_val);
     if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
         SS << "failed to get tablet job, instance_id=" << instance_id << " 
tablet_id=" << tablet_id
            << " key=" << hex(job_key) << " err=" << err;
@@ -439,10 +449,15 @@ void 
MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* contro
         return;
     }
     auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
         !tablet_idx.has_partition_id()) {
-        get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, 
tablet_idx);
-        if (code != MetaServiceCode::OK) return;
+        if (!is_versioned_read) {
+            get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, 
tablet_idx);
+            if (code != MetaServiceCode::OK) return;
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
+        }
     }
     // Check if tablet has been dropped
     if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
@@ -465,12 +480,14 @@ void 
MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* contro
     };
 
     if (!request->job().compaction().empty()) {
-        start_compaction_job(code, msg, ss, txn, request, response, 
instance_id, need_commit);
+        start_compaction_job(code, msg, ss, txn, request, response, 
instance_id, need_commit,
+                             is_versioned_read);
         return;
     }
 
     if (request->job().has_schema_change()) {
-        start_schema_change_job(code, msg, ss, txn, request, response, 
instance_id, need_commit);
+        start_schema_change_job(code, msg, ss, txn, request, response, 
instance_id, need_commit,
+                                is_versioned_read);
         return;
     }
 }
@@ -671,7 +688,7 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
                             const FinishTabletJobRequest* request,
                             FinishTabletJobResponse* response, 
TabletJobInfoPB& recorded_job,
                             std::string& instance_id, std::string& job_key, 
bool& need_commit,
-                            std::string& use_version) {
+                            std::string& use_version, bool is_versioned_read) {
     
//==========================================================================
     //                                check
     
//==========================================================================
@@ -1078,7 +1095,7 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
                                const FinishTabletJobRequest* request,
                                FinishTabletJobResponse* response, 
TabletJobInfoPB& recorded_job,
                                std::string& instance_id, std::string& job_key, 
bool& need_commit,
-                               std::string& use_version) {
+                               std::string& use_version, bool 
is_versioned_read) {
     
//==========================================================================
     //                                check
     
//==========================================================================
@@ -1098,32 +1115,40 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
     auto& new_tablet_idx = 
const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
     if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
         !new_tablet_idx.has_partition_id()) {
-        get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, 
new_tablet_idx);
-        if (code != MetaServiceCode::OK) return;
+        if (!is_versioned_read) {
+            get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, 
new_tablet_idx);
+            if (code != MetaServiceCode::OK) return;
+        } else {
+            CHECK(false) << "versioned read is not supported yet";
+        }
     }
     int64_t new_table_id = new_tablet_idx.table_id();
     int64_t new_index_id = new_tablet_idx.index_id();
     int64_t new_partition_id = new_tablet_idx.partition_id();
 
+    doris::TabletMetaCloudPB new_tablet_meta;
     auto new_tablet_key = meta_tablet_key(
             {instance_id, new_table_id, new_index_id, new_partition_id, 
new_tablet_id});
     std::string new_tablet_val;
-    doris::TabletMetaCloudPB new_tablet_meta;
-    TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        SS << "failed to get new tablet meta"
-           << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
-           << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
-           << " key=" << hex(new_tablet_key) << " err=" << err;
-        msg = ss.str();
-        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
-                                                      : 
cast_as<ErrCategory::READ>(err);
-        return;
-    }
-    if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
-        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-        msg = "malformed tablet meta";
-        return;
+    if (!is_versioned_read) {
+        TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            SS << "failed to get new tablet meta"
+               << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : 
"")
+               << " instance_id=" << instance_id << " tablet_id=" << 
new_tablet_id
+               << " key=" << hex(new_tablet_key) << " err=" << err;
+            msg = ss.str();
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            return;
+        }
+        if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = "malformed tablet meta";
+            return;
+        }
+    } else {
+        CHECK(true) << "versioned read is not supported yet";
     }
 
     if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
@@ -1184,7 +1209,7 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
 
     std::string new_tablet_job_val;
     TabletJobInfoPB new_recorded_job;
-    err = txn->get(new_tablet_job_key, &new_tablet_job_val);
+    TxnErrorCode err = txn->get(new_tablet_job_key, &new_tablet_job_val);
     if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
         SS << "internal error,"
            << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
@@ -1492,6 +1517,7 @@ void 
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
         return;
     }
 
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     for (int retry = 0; retry <= 1; retry++) {
         bool need_commit = false;
         TxnErrorCode err = txn_kv_->create_txn(&txn);
@@ -1510,8 +1536,12 @@ void 
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
         auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
         if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
             !tablet_idx.has_partition_id()) {
-            get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, 
tablet_idx);
-            if (code != MetaServiceCode::OK) return;
+            if (!is_versioned_read) {
+                get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, 
tablet_idx);
+                if (code != MetaServiceCode::OK) return;
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
+            }
         }
         // Check if tablet has been dropped
         if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
@@ -1550,11 +1580,12 @@ void 
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
         if (!request->job().compaction().empty()) {
             // Process compaction commit
             process_compaction_job(code, msg, ss, txn, request, response, 
recorded_job, instance_id,
-                                   job_key, need_commit, use_version);
+                                   job_key, need_commit, use_version, 
is_versioned_read);
         } else if (request->job().has_schema_change()) {
             // Process schema change commit
             process_schema_change_job(code, msg, ss, txn, request, response, 
recorded_job,
-                                      instance_id, job_key, need_commit, 
use_version);
+                                      instance_id, job_key, need_commit, 
use_version,
+                                      is_versioned_read);
         }
 
         if (!need_commit) return;
diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index 6985152ca4f..b02ff532c4b 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -61,18 +61,22 @@ using check_create_table_type = std::function<const 
std::tuple<
 
 // Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
 static TxnErrorCode index_exists(Transaction* txn, const std::string& 
instance_id,
-                                 const IndexRequest* req) {
-    auto tablet_key = meta_tablet_key({instance_id, req->table_id(), 
req->index_ids(0), 0, 0});
-    auto tablet_key_end =
-            meta_tablet_key({instance_id, req->table_id(), req->index_ids(0), 
INT64_MAX, 0});
-    std::unique_ptr<RangeGetIterator> it;
-
-    TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to get kv").tag("err", err);
-        return err;
+                                 bool is_versioned_read, const IndexRequest* 
req) {
+    if (!is_versioned_read) {
+        auto tablet_key = meta_tablet_key({instance_id, req->table_id(), 
req->index_ids(0), 0, 0});
+        auto tablet_key_end =
+                meta_tablet_key({instance_id, req->table_id(), 
req->index_ids(0), INT64_MAX, 0});
+        std::unique_ptr<RangeGetIterator> it;
+
+        TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to get kv").tag("err", err);
+            return err;
+        }
+        return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
-    return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
 }
 
 static TxnErrorCode check_recycle_key_exist(Transaction* txn, const 
std::string& key) {
@@ -106,7 +110,8 @@ void 
MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controlle
         msg = "failed to create txn";
         return;
     }
-    err = index_exists(txn.get(), instance_id, request);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
+    err = index_exists(txn.get(), instance_id, is_versioned_read, request);
     // If index has existed, this might be a stale request
     if (err == TxnErrorCode::TXN_OK) {
         code = MetaServiceCode::ALREADY_EXISTED;
@@ -195,12 +200,13 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
     commit_index_log.set_db_id(request->db_id());
     commit_index_log.set_table_id(request->table_id());
 
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     for (auto index_id : request->index_ids()) {
         auto key = recycle_index_key({instance_id, index_id});
         std::string val;
         err = txn->get(key, &val);
         if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
-            err = index_exists(txn.get(), instance_id, request);
+            err = index_exists(txn.get(), instance_id, is_versioned_read, 
request);
             // If index has existed, this might be a duplicate request
             if (err == TxnErrorCode::TXN_OK) {
                 return; // Index committed, OK
@@ -407,19 +413,23 @@ void 
MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
 
 // Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
 static TxnErrorCode partition_exists(Transaction* txn, const std::string& 
instance_id,
-                                     const PartitionRequest* req) {
-    auto tablet_key = meta_tablet_key(
-            {instance_id, req->table_id(), req->index_ids(0), 
req->partition_ids(0), 0});
-    auto tablet_key_end = meta_tablet_key(
-            {instance_id, req->table_id(), req->index_ids(0), 
req->partition_ids(0), INT64_MAX});
-    std::unique_ptr<RangeGetIterator> it;
-
-    TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to get kv").tag("err", err);
-        return err;
+                                     bool is_versioned_read, const 
PartitionRequest* req) {
+    if (!is_versioned_read) {
+        auto tablet_key = meta_tablet_key(
+                {instance_id, req->table_id(), req->index_ids(0), 
req->partition_ids(0), 0});
+        auto tablet_key_end = meta_tablet_key({instance_id, req->table_id(), 
req->index_ids(0),
+                                               req->partition_ids(0), 
INT64_MAX});
+        std::unique_ptr<RangeGetIterator> it;
+
+        TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to get kv").tag("err", err);
+            return err;
+        }
+        return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
+    } else {
+        CHECK(false) << "versioned read is not supported yet";
     }
-    return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
 }
 
 void MetaServiceImpl::prepare_partition(::google::protobuf::RpcController* 
controller,
@@ -459,7 +469,8 @@ void 
MetaServiceImpl::prepare_partition(::google::protobuf::RpcController* contr
         msg = "failed to create txn";
         return;
     }
-    err = partition_exists(txn.get(), instance_id, request);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
+    err = partition_exists(txn.get(), instance_id, is_versioned_read, request);
     // If index has existed, this might be a stale request
     if (err == TxnErrorCode::TXN_OK) {
         code = MetaServiceCode::ALREADY_EXISTED;
@@ -577,6 +588,7 @@ void 
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
     commit_partition_log.set_table_id(request->table_id());
     commit_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
 
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     for (auto part_id : request->partition_ids()) {
         auto key = recycle_partition_key({instance_id, part_id});
         std::string val;
@@ -584,7 +596,7 @@ void 
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
         if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
             // Compatible with requests without `index_ids`
             if (!request->index_ids().empty()) {
-                err = partition_exists(txn.get(), instance_id, request);
+                err = partition_exists(txn.get(), instance_id, 
is_versioned_read, request);
                 // If partition has existed, this might be a duplicate request
                 if (err == TxnErrorCode::TXN_OK) {
                     return; // Partition committed, OK
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index eeba1f5d4ae..5e3bce80c6e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1293,6 +1293,7 @@ void MetaServiceImpl::commit_txn_immediately(
         }
 
         bool is_versioned_write = is_version_write_enabled(instance_id);
+        bool is_versioned_read = is_version_read_enabled(instance_id);
 
         // Save rowset meta
         for (auto& i : rowsets) {
@@ -1406,6 +1407,7 @@ void MetaServiceImpl::commit_txn_immediately(
         }
 
         txn_info.set_versioned_write(is_versioned_write);
+        txn_info.set_versioned_read(is_versioned_read);
 
         LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
         info_val.clear();
@@ -1907,7 +1909,9 @@ void MetaServiceImpl::commit_txn_eventually(
         txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
 
         bool is_versioned_write = is_version_write_enabled(instance_id);
+        bool is_versioned_read = is_version_read_enabled(instance_id);
         txn_info.set_versioned_write(is_versioned_write);
+        txn_info.set_versioned_read(is_versioned_read);
 
         LOG(INFO) << "after update txn_id= " << txn_id
                   << " txn_info=" << txn_info.ShortDebugString();
@@ -2447,6 +2451,7 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
     }
 
     bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
 
     // Save rowset meta
     for (auto& i : rowsets) {
@@ -2555,6 +2560,7 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
     }
     txn_info.set_versioned_write(is_versioned_write);
+    txn_info.set_versioned_read(is_versioned_read);
 
     LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
     info_val.clear();
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 5ea2851a014..ae5671bfb4a 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -125,7 +125,7 @@ void convert_tmp_rowsets(
         MetaServiceCode& code, std::string& msg, int64_t db_id,
         std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta,
         std::map<int64_t, TabletIndexPB>& tablet_ids, bool is_versioned_write,
-        Versionstamp versionstamp) {
+        bool is_versioned_read, Versionstamp versionstamp) {
     std::stringstream ss;
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv->create_txn(&txn);
@@ -161,55 +161,66 @@ void convert_tmp_rowsets(
         }
 
         if (!tablet_ids.contains(tmp_rowset_pb.tablet_id())) {
-            std::string tablet_idx_key =
-                    meta_tablet_idx_key({instance_id, 
tmp_rowset_pb.tablet_id()});
-            std::string tablet_idx_val;
-            err = txn->get(tablet_idx_key, &tablet_idx_val, true);
-            if (TxnErrorCode::TXN_OK != err) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get tablet idx, txn_id=" << txn_id
-                   << " key=" << hex(tablet_idx_key) << " err=" << err;
-                msg = ss.str();
-                LOG(WARNING) << msg;
-                return;
-            }
-
             TabletIndexPB tablet_idx_pb;
-            if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse tablet idx pb txn_id=" << txn_id
-                   << " key=" << hex(tablet_idx_key);
-                msg = ss.str();
-                return;
+            if (!is_versioned_read) {
+                std::string tablet_idx_key =
+                        meta_tablet_idx_key({instance_id, 
tmp_rowset_pb.tablet_id()});
+                std::string tablet_idx_val;
+                err = txn->get(tablet_idx_key, &tablet_idx_val, true);
+                if (TxnErrorCode::TXN_OK != err) {
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get tablet idx, txn_id=" << txn_id
+                       << " key=" << hex(tablet_idx_key) << " err=" << err;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+
+                if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "failed to parse tablet idx pb txn_id=" << txn_id
+                       << " key=" << hex(tablet_idx_key);
+                    msg = ss.str();
+                    return;
+                }
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
             }
             tablet_ids.emplace(tmp_rowset_pb.tablet_id(), tablet_idx_pb);
         }
         const TabletIndexPB& tablet_idx_pb = 
tablet_ids[tmp_rowset_pb.tablet_id()];
 
         if (!partition_versions.contains(tmp_rowset_pb.partition_id())) {
-            std::string ver_val;
-            std::string ver_key = partition_version_key(
-                    {instance_id, db_id, tablet_idx_pb.table_id(), 
tmp_rowset_pb.partition_id()});
-            err = txn->get(ver_key, &ver_val);
-            if (TxnErrorCode::TXN_OK != err) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get partiton version, txn_id=" << txn_id << " 
key=" << hex(ver_key)
-                   << " err=" << err;
-                msg = ss.str();
-                LOG(WARNING) << msg;
-                return;
-            }
             VersionPB version_pb;
-            if (!version_pb.ParseFromString(ver_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse version pb txn_id=" << txn_id << " 
key=" << hex(ver_key);
-                msg = ss.str();
-                return;
+            if (!is_versioned_read) {
+                std::string ver_val;
+                std::string ver_key =
+                        partition_version_key({instance_id, db_id, 
tablet_idx_pb.table_id(),
+                                               tmp_rowset_pb.partition_id()});
+                err = txn->get(ver_key, &ver_val);
+                if (TxnErrorCode::TXN_OK != err) {
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get partiton version, txn_id=" << txn_id
+                       << " key=" << hex(ver_key) << " err=" << err;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                if (!version_pb.ParseFromString(ver_val)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "failed to parse version pb txn_id=" << txn_id << " 
key=" << hex(ver_key);
+                    msg = ss.str();
+                    return;
+                }
+                LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
+                          << " version_pb:" << version_pb.ShortDebugString();
+            } else {
+                CHECK(false) << "versioned read is not supported yet";
             }
-            LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
-                      << " version_pb:" << version_pb.ShortDebugString();
             partition_versions.emplace(tmp_rowset_pb.partition_id(), 
version_pb);
             DCHECK_EQ(partition_versions.size(), 1) << 
partition_versions.size();
         }
@@ -222,21 +233,24 @@ void convert_tmp_rowsets(
 
         std::string rowset_key = meta_rowset_key({instance_id, 
tmp_rowset_pb.tablet_id(), version});
         std::string rowset_val;
-        err = txn->get(rowset_key, &rowset_val);
-        if (TxnErrorCode::TXN_OK == err) {
-            // tmp rowset key has been converted
-            continue;
-        }
-
-        if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            code = cast_as<ErrCategory::READ>(err);
-            ss << "failed to get rowset_key, txn_id=" << txn_id << " key=" << 
hex(rowset_key)
-               << " err=" << err;
-            msg = ss.str();
-            LOG(WARNING) << msg;
-            return;
+        if (!is_versioned_read) {
+            err = txn->get(rowset_key, &rowset_val);
+            if (TxnErrorCode::TXN_OK == err) {
+                // tmp rowset key has been converted
+                continue;
+            }
+            if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get rowset_key, txn_id=" << txn_id << " key=" 
<< hex(rowset_key)
+                   << " err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+            DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
+        } else {
+            CHECK(true) << "versioned read is not supported yet";
         }
-        DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
 
         tmp_rowset_pb.set_start_version(version);
         tmp_rowset_pb.set_end_version(version);
@@ -456,6 +470,7 @@ void TxnLazyCommitTask::commit() {
     }
 
     bool is_versioned_write = txn_info.versioned_write();
+    bool is_versioned_read = txn_info.versioned_read();
 
     std::stringstream ss;
     int retry_times = 0;
@@ -525,7 +540,7 @@ void TxnLazyCommitTask::commit() {
                                                            
tmp_rowset_metas.begin() + end);
                     convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code_, 
msg_, db_id,
                                         sub_partition_tmp_rowset_metas, 
tablet_ids,
-                                        is_versioned_write, versionstamp);
+                                        is_versioned_write, is_versioned_read, 
versionstamp);
                     if (code_ != MetaServiceCode::OK) break;
                 }
                 if (code_ != MetaServiceCode::OK) break;
@@ -546,7 +561,7 @@ void TxnLazyCommitTask::commit() {
                     if (tablet_ids.size() > 0) {
                         // get table_id from memory cache
                         table_id = tablet_ids.begin()->second.table_id();
-                    } else {
+                    } else if (!is_versioned_read) {
                         // get table_id from storage
                         int64_t first_tablet_id = 
tmp_rowset_metas.begin()->second.tablet_id();
                         std::string tablet_idx_key =
@@ -573,33 +588,39 @@ void TxnLazyCommitTask::commit() {
                             break;
                         }
                         table_id = tablet_idx_pb.table_id();
+                    } else {
+                        CHECK(false) << "versioned read is not supported yet";
                     }
                 }
 
                 DCHECK(table_id > 0);
                 DCHECK(partition_id > 0);
 
+                VersionPB version_pb;
                 std::string ver_val;
                 std::string ver_key =
                         partition_version_key({instance_id_, db_id, table_id, 
partition_id});
-                err = txn->get(ver_key, &ver_val);
-                if (TxnErrorCode::TXN_OK != err) {
-                    code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
-                                    ? MetaServiceCode::TXN_ID_NOT_FOUND
-                                    : cast_as<ErrCategory::READ>(err);
-                    ss << "failed to get partiton version, txn_id=" << txn_id_
-                       << " key=" << hex(ver_key) << " err=" << err;
-                    msg_ = ss.str();
-                    LOG(WARNING) << msg_;
-                    break;
-                }
-                VersionPB version_pb;
-                if (!version_pb.ParseFromString(ver_val)) {
-                    code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                    ss << "failed to parse version pb txn_id=" << txn_id_
-                       << " key=" << hex(ver_key);
-                    msg_ = ss.str();
-                    break;
+                if (!is_versioned_read) {
+                    err = txn->get(ver_key, &ver_val);
+                    if (TxnErrorCode::TXN_OK != err) {
+                        code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                        ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                        : cast_as<ErrCategory::READ>(err);
+                        ss << "failed to get partiton version, txn_id=" << 
txn_id_
+                           << " key=" << hex(ver_key) << " err=" << err;
+                        msg_ = ss.str();
+                        LOG(WARNING) << msg_;
+                        break;
+                    }
+                    if (!version_pb.ParseFromString(ver_val)) {
+                        code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                        ss << "failed to parse version pb txn_id=" << txn_id_
+                           << " key=" << hex(ver_key);
+                        msg_ = ss.str();
+                        break;
+                    }
+                } else {
+                    CHECK(false) << "versioned read is not supported yet";
                 }
 
                 if (version_pb.pending_txn_ids_size() > 0 &&
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 8f2d6ff06c2..328bedae7dc 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -1575,9 +1575,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
         std::unique_lock<std::mutex> _lock(go_mutex);
         last_pending_txn_id_count++;
         if (last_pending_txn_id_count == 1) {
-            auto version_pb = *try_any_cast<VersionPB*>(args[0]);
-            ASSERT_EQ(version_pb.pending_txn_ids(0), eventually_txn_id);
-            ASSERT_GT(version_pb.pending_txn_ids(0), 0);
+            auto txn_id = *try_any_cast<int64_t*>(args[0]);
+            ASSERT_EQ(txn_id, eventually_txn_id);
         }
     });
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 8d627f67cde..1cb4018e70a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -442,6 +442,7 @@ message TxnInfoPB {
     repeated int64 sub_txn_ids = 18;
     // TODO: There are more fields TBD
     optional bool versioned_write = 19;  // versioned write, don't need to 
write RecycleTxnPB again
+    optional bool versioned_read = 20;  // whether to read versioned keys
 }
 
 // For check txn conflict and txn timeout


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to